Advanced Usage
RAG indexing, LlamaIndex-style querying, custom parsers, cloud connectors, and batch processing at scale.
RAG pipeline
Build a retrieval-augmented generation pipeline in a few lines. Structure-D handles document ingestion, chunking, embedding, and storage.
from pathlib import Path
from structure_d.pipeline import Pipeline
from structure_d.retrieval.vector_store import ChromaVectorStore
from structure_d.inference.providers import OpenAIProvider
async def main():
vector_store = ChromaVectorStore(
collection_name="contracts",
persist_directory="./chroma_db",
)
pipeline = Pipeline(
schema_cls=None, # not needed for RAG
enable_rag=True,
vector_store=vector_store,
provider=OpenAIProvider(),
)
# Build an index from a document
index = await pipeline.build_index(
Path("contracts/master_agreement.pdf"),
index_type="vector", # or "summary"
)
# Query the index
engine = index.as_query_engine(provider=pipeline.provider)
answer = await engine.query("What are the termination clauses?")
print(answer)
asyncio.run(main()) LlamaIndex-style indexing
Use the DocumentReader and VectorStoreIndex directly for
a LlamaIndex-compatible indexing workflow:
from structure_d.indexing import DocumentReader, VectorStoreIndex
from structure_d.retrieval.vector_store import ChromaVectorStore
from structure_d.retrieval.embeddings import EmbeddingService
from structure_d.inference.providers import OpenAIProvider
reader = DocumentReader()
# Load and chunk from a directory
nodes = await reader.load_directory_and_chunk(
Path("knowledge_base/"),
glob="**/*.pdf",
)
# Build the index
index = VectorStoreIndex(
vector_store=ChromaVectorStore(),
embedding_service=EmbeddingService(),
)
await index.insert_nodes(nodes)
# Query
engine = index.as_query_engine(
provider=OpenAIProvider(),
top_k=5,
response_mode="simple", # or "tree_summarize"
)
answer = await engine.query("Summarize the key findings")
print(answer) Fallback providers
Chain two providers — the primary runs first, and the fallback takes over automatically
when the primary raises an InferenceError:
from structure_d.inference.providers import VLLMProvider, AnthropicProvider, FallbackProvider
provider = FallbackProvider(
primary=VLLMProvider(api_base="http://vllm-server:8000"),
fallback=AnthropicProvider(),
)
pipeline = Pipeline(schema_cls=MySchema, provider=provider) Or configure via YAML (no code changes required):
inference:
provider:
provider: "vllm"
fallback_provider: "anthropic" Custom parsers
Register your own parser by extending BaseParser:
from pathlib import Path
from structure_d.ingestion.base import BaseParser
from structure_d.schemas.base import ParsedDocument, DocumentMetadata, DocumentFormat
class MyXMLParser(BaseParser):
supported_extensions = [".xml", ".xsd"]
async def parse(self, file_path: Path, **kwargs) -> ParsedDocument:
import xml.etree.ElementTree as ET
tree = ET.parse(file_path)
text = "\n".join(el.text or "" for el in tree.iter() if el.text)
return ParsedDocument(
metadata=DocumentMetadata(
filename=file_path.name,
source="local",
file_extension=".xml",
format=DocumentFormat.PLAIN_TEXT,
file_size_bytes=file_path.stat().st_size,
page_count=None,
ingested_at=__import__("datetime").datetime.now(),
extra=,
),
text=text,
pages=[text],
tables=[],
images=[],
)
# Register with the pipeline
from structure_d.ingestion.manager import build_default_registry
registry = build_default_registry()
registry.register("my_xml", MyXMLParser())
pipeline = Pipeline(
schema_cls=MySchema,
ingestion_manager=IngestionManager(registry=registry),
provider=OpenAIProvider(),
) Cloud connectors
Ingest documents directly from cloud storage:
from structure_d.ingestion.connectors import S3Connector
from structure_d.ingestion.manager import IngestionManager
connector = S3Connector(
bucket="my-documents",
prefix="invoices/2025/",
region="us-east-1",
# credentials from env: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY
)
manager = IngestionManager(connector=connector)
documents = await manager.ingest_from_connector() | Connector | Import name | Auth |
|---|---|---|
| Local filesystem | LocalConnector | — |
| AWS S3 | S3Connector | AWS_ACCESS_KEY_ID |
| Google Cloud Storage | GCSConnector | GOOGLE_APPLICATION_CREDENTIALS |
| Azure Blob Storage | AzureConnector | AZURE_STORAGE_CONNECTION_STRING |
| SFTP | SFTPConnector | host + key/password |
Batch processing
For very large document sets, use BatchProcessor directly to control concurrency and error handling:
from structure_d.inference.batch import BatchProcessor
processor = BatchProcessor(
provider=OpenAIProvider(),
max_concurrent=16,
retry_failed=True,
)
chunks = [...] # list[TextChunk] from preprocessing
results = await processor.process_batch(
chunks,
schema_cls=MySchema,
task=TaskType.EXTRACTION,
)
successful = [r for r in results if r.is_valid]
print(f"Success rate: {len(successful)}/{len(results)}")