Core Pipeline API

Reference for Pipeline, ExtractionResult, ParsedDocument, TextChunk, and Settings.

Pipeline

The primary entry point. Pipeline wires together all stages (ingestion → preprocessing → routing → inference → validation → storage).

Pipeline(schema_cls, task, config_path, *, provider, ingestion_manager, model_registry, vector_store, enable_rag)

Create a new pipeline instance.

Parameters

schema_cls Type[BaseModel] | None Pydantic model class defining the extraction schema. Pass None when using RAG-only mode. required
task TaskType Task type enum. Default: TaskType.EXTRACTION. optional
config_path str | Path | None Path to YAML config file. Falls back to configs/default.yaml. optional
provider BaseLLMProvider | None LLM provider instance. If None, resolved from config. optional
ingestion_manager IngestionManager | None Custom ingestion manager with a registered parser set. optional
model_registry ModelRegistry | None Custom model registry. Defaults to configs/models.yaml. optional
enable_rag bool | None Enable RAG indexing mode. Requires vector_store. optional

Pipeline.run()

async pipeline.run(file_path, *, parser_name, model, save_format, output_filename) → list[ExtractionResult]

Extract structured data from a single file.

Parameters

file_path Path Path to the input file. Format is auto-detected from extension. required
parser_name str | None Override auto-detected parser (e.g. "ocr_pdf"). optional
model str | None Override model routing (e.g. "deepseek-r1-70b"). optional
save_format "jsonl" | "csv" | None Automatically save results after extraction. optional

Returns

list[ExtractionResult] One result per document chunk. Check result.is_valid before using result.structured_output.

Example

results = await pipeline.run(
    Path("report.pdf"),
    save_format="jsonl",
)
for r in results:
    if r.is_valid:
        print(r.structured_output)

Pipeline.run_many()

async pipeline.run_many(file_paths, *, max_concurrent, **kwargs) → dict[str, list[ExtractionResult]]

Concurrently extract from multiple files.

Parameters

file_paths list[Path] List of file paths to process. required
max_concurrent int | None Maximum concurrent pipeline runs. Defaults to config value (4). optional
**kwargs Any All keyword args are forwarded to run() for each file. optional

Returns

dict[str, list[ExtractionResult]] Mapping from filename string to results list.

Pipeline.build_index()

async pipeline.build_index(file_path, index_type, *, vector_store, parser_name) → VectorStoreIndex | SummaryIndex

Parse a document, chunk it, and insert into a vector or summary index.

Parameters

file_path Path Document to index. required
index_type "vector" | "summary" Index backend. Default: "vector". optional

ExtractionResult

Returned by all pipeline methods. All fields are read-only.

python
class ExtractionResult(BaseModel):
    result_id: str               # unique UUID
    document_id: str             # source document UUID
    chunk_id: str | None         # chunk UUID if chunked
    source_format: DocumentFormat
    task: TaskType
    model_used: str              # "gpt-4o", "llama-3.1-8b", etc.
    raw_output: str              # raw LLM text response
    structured_output: dict | list   # your schema's data
    is_valid: bool               # True if schema validation passed
    validation_errors: list[str] # empty if is_valid=True
    latency_ms: float            # end-to-end latency
    token_usage: dict[str, int]  # prompt_tokens, completion_tokens, total_tokens
    created_at: datetime

ParsedDocument

Output of the ingestion stage, input to preprocessing.

python
class ParsedDocument(BaseModel):
    metadata: DocumentMetadata
    text: str              # full concatenated text
    pages: list[str]       # per-page text (empty for non-paginated formats)
    tables: list[dict]     # extracted tables
    images: list[str]      # base64-encoded images or paths

class DocumentMetadata(BaseModel):
    document_id: str       # uuid hex, auto-generated
    filename: str
    source: str            # "local", "s3://...", URL
    file_extension: str
    format: DocumentFormat
    file_size_bytes: int
    page_count: int | None
    ingested_at: datetime
    extra: dict[str, Any]

TextChunk

Output of the preprocessing/chunking stage.

python
class TextChunk(BaseModel):
    text: str
    metadata: ChunkMetadata

class ChunkMetadata(BaseModel):
    chunk_id: str
    document_id: str
    source_format: DocumentFormat
    page_number: int | None
    heading: str | None     # section heading this chunk falls under
    token_count: int

Settings

The settings object is a Pydantic BaseSettings model loaded from YAML + environment variables. Use get_settings() for the cached singleton or load_settings() for a fresh load.

python
from structure_d.config import get_settings

settings = get_settings()
print(settings.inference.provider.provider)   # "vllm"
print(settings.preprocessing.chunking.max_tokens)  # 1024