Basic Usage
Extract structured data from PDFs, DOCX, HTML, images, and more. Save as JSONL, CSV, or Markdown.
Single file extraction
The Pipeline.run() method takes a file path and returns a list of ExtractionResult objects — one per document chunk.
import asyncio
from pathlib import Path
from structure_d.pipeline import Pipeline
from structure_d.schemas.generic import KeyValueExtraction
from structure_d.inference.providers import AnthropicProvider
async def main():
pipeline = Pipeline(
schema_cls=KeyValueExtraction,
provider=AnthropicProvider(),
)
results = await pipeline.run(Path("contract.pdf"))
for r in results:
if r.is_valid:
print(r.structured_output) # dict
print(f"Latency: {r.latency_ms:.0f}ms")
print(f"Tokens: {r.token_usage['total_tokens']}")
asyncio.run(main()) Built-in schemas
Structure-D ships with eight task-oriented schemas. Use them by name:
| Name | Schema class | Output shape |
|---|---|---|
key_value | KeyValueExtraction | List of {key, value, confidence, page} |
table | TableExtraction | Headers + list of row dicts |
entity | EntityExtraction | List of {text, label, start, end} |
classification | ClassificationResult | Label, confidence, all labels + scores |
summary | SummaryResult | Summary string + bullet points |
form | FormExtraction | List of form fields with bounding box + page |
document_structure | DocumentStructure | Title, sections with heading/body/level |
generic | GenericExtraction | Open — accepts any JSON fields |
python from structure_d.schemas.generic import get_schema, BUILTIN_SCHEMAS
# By name
TableSchema = get_schema("table")
# List all available
print(list(BUILTIN_SCHEMAS.keys()))
# ['generic', 'key_value', 'table', 'entity', 'classification', 'summary', 'form', 'document_structure']
Custom schemas
Any Pydantic v2 model works as an extraction schema. Nest models for complex structures:
python from pydantic import BaseModel, Field
class LineItem(BaseModel):
description: str
quantity: int
unit_price: float
total: float
class Invoice(BaseModel):
vendor: str
invoice_number: str = Field(description="Invoice or receipt number")
issue_date: str | None = None
due_date: str | None = None
currency: str = "USD"
subtotal: float
tax: float = 0.0
total_amount: float
line_items: list[LineItem] = []
pipeline = Pipeline(schema_cls=Invoice, provider=OpenAIProvider())
Use Field descriptions
Pydantic's Field(description=...) is included in the JSON schema sent to the model. More descriptive field descriptions lead to better extraction accuracy.
Multiple files
Use run_many() to process a list of files concurrently:
python files = list(Path("invoices/").glob("*.pdf"))
results = await pipeline.run_many(
files,
max_concurrent=8, # default from config
save_format="jsonl", # save each result automatically
)
# results is a dict: {filename: [ExtractionResult, ...]}
for filename, file_results in results.items():
valid = [r for r in file_results if r.is_valid]
print(f"{filename}: {len(valid)}/{len(file_results)} valid")
Saving output
Pass save_format to automatically write results after extraction. Three formats are supported:
Value Output Use when "jsonl"Pretty-printed JSON objects, one per chunk, blank-line separated Pipelines, downstream processing "csv"One row per result; structured_output flattened to dot-notation columns Spreadsheets, BI tools "markdown"Human-readable .md — ## Result N with metadata table + fenced JSON Human review, Git-tracked output
python # Auto-save to JSONL (default)
results = await pipeline.run(
Path("report.pdf"),
save_format="jsonl",
output_filename="report_extracted", # .jsonl appended automatically
)
# Save as Markdown for human review
results = await pipeline.run(
Path("invoice.docx"),
save_format="markdown",
output_filename="invoice_extracted", # .md appended automatically
)
# Save as CSV for spreadsheet import
results = await pipeline.run(
Path("data.html"),
save_format="csv",
)
# Or write manually using the storage writers
from structure_d.storage.jsonl import JSONLWriter
from structure_d.storage.csv_store import CSVWriter
from structure_d.storage.markdown import MarkdownWriter
JSONLWriter(output_dir=Path("output/")).write(results, "output.jsonl")
CSVWriter(output_dir=Path("output/")).write(results, "output.csv")
MarkdownWriter(output_dir=Path("output/")).write(results, "output.md")
Choosing a parser
By default, Structure-D auto-selects the best parser for each file extension.
Override this with parser_name:
python # Force OCR even for a regular PDF
results = await pipeline.run(
Path("scanned_form.pdf"),
parser_name="ocr_pdf", # uses Tesseract-based parser
)
# Available parsers:
# pymupdf, pdfplumber, ocr_pdf — PDF
# tesseract_image — images (PNG, JPG, TIFF)
# html — HTML/web pages
# docx, xlsx, pptx — Office documents
# email — .eml / .msg files
# transcript — audio transcripts
# plaintext — .txt, .md, .csv
Reading results
Each ExtractionResult contains everything you need for downstream processing:
python result = results[0]
result.result_id # unique UUID string
result.document_id # UUID of the source document
result.source_format # DocumentFormat enum
result.task # TaskType enum
result.model_used # "gpt-4o", "llama-3.1-8b", etc.
result.structured_output # dict or list — your schema's data
result.is_valid # True if schema validation passed
result.validation_errors # list of error messages (empty if valid)
result.raw_output # raw LLM text response
result.latency_ms # end-to-end latency in milliseconds
result.token_usage # {"prompt_tokens": 320, "completion_tokens": 84, "total_tokens": 404}
result.created_at # datetime