Utilities
Reference for IngestionManager, Chunker, SchemaValidator, RetryHandler, storage writers, and the RAG indexing layer.
IngestionManager
Manages parser selection and coordinates multi-file ingestion with optional concurrency.
Parameters
build_default_registry(). optional LocalConnector. optional from structure_d.ingestion.manager import IngestionManager
manager = IngestionManager()
# Single file
doc = await manager.ingest(Path("report.pdf"))
# doc is a ParsedDocument
# Multiple files (concurrent)
docs = await manager.ingest_many(
[Path("a.pdf"), Path("b.html"), Path("c.docx")],
max_concurrent=4,
)
# From a connector (S3, GCS, Azure, SFTP)
docs = await manager.ingest_from_connector(prefix="invoices/") Chunker
Splits a document's text into TextChunk objects. Used internally by the pipeline, but accessible directly.
Parameters
"semantic". optional 1024. optional 128. optional from structure_d.preprocessing.chunker import Chunker
chunker = Chunker(strategy="semantic", max_tokens=512)
chunks = chunker.chunk(text, document_id="doc-123")
for chunk in chunks:
print(f"[{chunk.metadata.heading}] {chunk.text[:80]}...")
print(f" tokens: {chunk.metadata.token_count}, page: {chunk.metadata.page_number}") normalize_text()
Clean extracted text before chunking. Removes control characters, collapses whitespace, strips page headers/footers.
Parameters
True. optional True. optional True. optional SchemaValidator
Validates raw LLM text output against a Pydantic schema using a multi-step extraction strategy.
from structure_d.validation.validator import SchemaValidator
validator = SchemaValidator(schema_cls=Invoice)
# Validate raw LLM text
data, errors = validator.validate(raw_text)
# data: dict | list
# errors: list[str] — empty if valid
# Validate a pre-parsed dict
data, errors = validator.validate_dict({"vendor": "Acme", "total": 1200.0})
The validator tries multiple extraction strategies in order:
direct JSON parse → extract from code fence → extract first {...} or [...] block.
RetryHandler
Wraps an ExtractionResult and retries with the LLM provider if validation failed.
from structure_d.validation.retry import RetryHandler
handler = RetryHandler(
schema_cls=Invoice,
task=TaskType.EXTRACTION,
provider=OpenAIProvider(),
max_retries=3,
)
# If result.is_valid is already True, this is a fast no-op
result = await handler.validate_and_retry(
result=initial_result,
original_text=chunk.text,
) Storage writers
from structure_d.storage.jsonl import JSONLWriter, save_as_jsonl
from structure_d.storage.csv_store import CSVWriter, save_as_csv
# JSONL
writer = JSONLWriter(output_dir=Path("output/"), indent=None)
path = writer.write(results, filename="out.jsonl") # list[ExtractionResult]
path = writer.write_dicts([{"key": "val"}], "raw.jsonl") # raw dicts
# CSV (flattens structured_output into dot-separated columns)
writer = CSVWriter(output_dir=Path("output/"))
path = writer.write(results, filename="out.csv")
# Convenience functions
save_as_jsonl(results, "output.jsonl")
save_as_csv(results, "output.csv") DocumentReader
High-level interface for loading and chunking documents into Node objects ready for indexing.
from structure_d.indexing.loading import DocumentReader
reader = DocumentReader(
ingestion_manager=IngestionManager(),
chunker=Chunker(strategy="semantic"),
)
# Load a single document
docs = await reader.load(Path("report.pdf")) # list[Document]
nodes = await reader.load_and_chunk(Path("report.pdf")) # list[Node]
# Load an entire directory
docs = await reader.load_directory(
Path("knowledge_base/"),
glob="**/*.pdf",
)
nodes = await reader.load_directory_and_chunk(
Path("knowledge_base/"),
glob="**/*.{pdf,html,docx}",
) QueryEngine
Combines a BaseRetriever with an LLM provider to answer natural language questions over indexed documents.
from structure_d.indexing.query_engine import QueryEngine
engine = index.as_query_engine(
provider=OpenAIProvider(),
top_k=5,
response_mode="simple", # or "tree_summarize"
)
# Retrieve relevant nodes
nodes = await engine.retrieve("termination clause", top_k=3)
# Answer a question (retrieve + synthesize)
answer = await engine.query(
"What are the payment terms?",
model="gpt-4o-mini",
temperature=0.0,
max_tokens=512,
)
print(answer) # natural language response string