Utilities

Reference for IngestionManager, Chunker, SchemaValidator, RetryHandler, storage writers, and the RAG indexing layer.

IngestionManager

Manages parser selection and coordinates multi-file ingestion with optional concurrency.

IngestionManager(registry, connector)

Parameters

registry ParserRegistry | None Parser registry. Defaults to build_default_registry(). optional
connector BaseConnector | None File source connector. Defaults to LocalConnector. optional
python
from structure_d.ingestion.manager import IngestionManager

manager = IngestionManager()

# Single file
doc = await manager.ingest(Path("report.pdf"))
# doc is a ParsedDocument

# Multiple files (concurrent)
docs = await manager.ingest_many(
    [Path("a.pdf"), Path("b.html"), Path("c.docx")],
    max_concurrent=4,
)

# From a connector (S3, GCS, Azure, SFTP)
docs = await manager.ingest_from_connector(prefix="invoices/")

Chunker

Splits a document's text into TextChunk objects. Used internally by the pipeline, but accessible directly.

Chunker(strategy, max_tokens, overlap_tokens, heading_level)

Parameters

strategy "fixed" | "sentence" | "heading" | "semantic" Chunking algorithm. Default: "semantic". optional
max_tokens int Maximum tokens per chunk. Default: 1024. optional
overlap_tokens int Token overlap between adjacent chunks. Default: 128. optional
python
from structure_d.preprocessing.chunker import Chunker

chunker = Chunker(strategy="semantic", max_tokens=512)
chunks = chunker.chunk(text, document_id="doc-123")

for chunk in chunks:
    print(f"[{chunk.metadata.heading}] {chunk.text[:80]}...")
    print(f"  tokens: {chunk.metadata.token_count}, page: {chunk.metadata.page_number}")

normalize_text()

normalize_text(text, *, normalize_unicode, strip_boilerplate, collapse_whitespace) → str

Clean extracted text before chunking. Removes control characters, collapses whitespace, strips page headers/footers.

Parameters

normalize_unicode bool Apply NFC normalization and strip non-printable control characters. Default: True. optional
strip_boilerplate bool Remove page numbers, running headers/footers. Default: True. optional
collapse_whitespace bool Collapse multiple spaces/newlines. Default: True. optional

SchemaValidator

Validates raw LLM text output against a Pydantic schema using a multi-step extraction strategy.

python
from structure_d.validation.validator import SchemaValidator

validator = SchemaValidator(schema_cls=Invoice)

# Validate raw LLM text
data, errors = validator.validate(raw_text)
# data: dict | list
# errors: list[str] — empty if valid

# Validate a pre-parsed dict
data, errors = validator.validate_dict({"vendor": "Acme", "total": 1200.0})

The validator tries multiple extraction strategies in order: direct JSON parse → extract from code fence → extract first {...} or [...] block.

RetryHandler

Wraps an ExtractionResult and retries with the LLM provider if validation failed.

python
from structure_d.validation.retry import RetryHandler

handler = RetryHandler(
    schema_cls=Invoice,
    task=TaskType.EXTRACTION,
    provider=OpenAIProvider(),
    max_retries=3,
)

# If result.is_valid is already True, this is a fast no-op
result = await handler.validate_and_retry(
    result=initial_result,
    original_text=chunk.text,
)

Storage writers

python
from structure_d.storage.jsonl import JSONLWriter, save_as_jsonl
from structure_d.storage.csv_store import CSVWriter, save_as_csv

# JSONL
writer = JSONLWriter(output_dir=Path("output/"), indent=None)
path = writer.write(results, filename="out.jsonl")           # list[ExtractionResult]
path = writer.write_dicts([{"key": "val"}], "raw.jsonl")     # raw dicts

# CSV (flattens structured_output into dot-separated columns)
writer = CSVWriter(output_dir=Path("output/"))
path = writer.write(results, filename="out.csv")

# Convenience functions
save_as_jsonl(results, "output.jsonl")
save_as_csv(results, "output.csv")

DocumentReader

High-level interface for loading and chunking documents into Node objects ready for indexing.

python
from structure_d.indexing.loading import DocumentReader

reader = DocumentReader(
    ingestion_manager=IngestionManager(),
    chunker=Chunker(strategy="semantic"),
)

# Load a single document
docs = await reader.load(Path("report.pdf"))          # list[Document]
nodes = await reader.load_and_chunk(Path("report.pdf"))  # list[Node]

# Load an entire directory
docs = await reader.load_directory(
    Path("knowledge_base/"),
    glob="**/*.pdf",
)
nodes = await reader.load_directory_and_chunk(
    Path("knowledge_base/"),
    glob="**/*.{pdf,html,docx}",
)

QueryEngine

Combines a BaseRetriever with an LLM provider to answer natural language questions over indexed documents.

python
from structure_d.indexing.query_engine import QueryEngine

engine = index.as_query_engine(
    provider=OpenAIProvider(),
    top_k=5,
    response_mode="simple",   # or "tree_summarize"
)

# Retrieve relevant nodes
nodes = await engine.retrieve("termination clause", top_k=3)

# Answer a question (retrieve + synthesize)
answer = await engine.query(
    "What are the payment terms?",
    model="gpt-4o-mini",
    temperature=0.0,
    max_tokens=512,
)
print(answer)  # natural language response string