Advanced Usage

RAG indexing, LlamaIndex-style querying, custom parsers, cloud connectors, and batch processing at scale.

RAG pipeline

Build a retrieval-augmented generation pipeline in a few lines. Structure-D handles document ingestion, chunking, embedding, and storage.

python
from pathlib import Path
from structure_d.pipeline import Pipeline
from structure_d.retrieval.vector_store import ChromaVectorStore
from structure_d.inference.providers import OpenAIProvider

async def main():
    vector_store = ChromaVectorStore(
        collection_name="contracts",
        persist_directory="./chroma_db",
    )

    pipeline = Pipeline(
        schema_cls=None,         # not needed for RAG
        enable_rag=True,
        vector_store=vector_store,
        provider=OpenAIProvider(),
    )

    # Build an index from a document
    index = await pipeline.build_index(
        Path("contracts/master_agreement.pdf"),
        index_type="vector",     # or "summary"
    )

    # Query the index
    engine = index.as_query_engine(provider=pipeline.provider)
    answer = await engine.query("What are the termination clauses?")
    print(answer)

asyncio.run(main())

LlamaIndex-style indexing

Use the DocumentReader and VectorStoreIndex directly for a LlamaIndex-compatible indexing workflow:

python
from structure_d.indexing import DocumentReader, VectorStoreIndex
from structure_d.retrieval.vector_store import ChromaVectorStore
from structure_d.retrieval.embeddings import EmbeddingService
from structure_d.inference.providers import OpenAIProvider

reader = DocumentReader()

# Load and chunk from a directory
nodes = await reader.load_directory_and_chunk(
    Path("knowledge_base/"),
    glob="**/*.pdf",
)

# Build the index
index = VectorStoreIndex(
    vector_store=ChromaVectorStore(),
    embedding_service=EmbeddingService(),
)
await index.insert_nodes(nodes)

# Query
engine = index.as_query_engine(
    provider=OpenAIProvider(),
    top_k=5,
    response_mode="simple",  # or "tree_summarize"
)
answer = await engine.query("Summarize the key findings")
print(answer)

Fallback providers

Chain two providers — the primary runs first, and the fallback takes over automatically when the primary raises an InferenceError:

python
from structure_d.inference.providers import VLLMProvider, AnthropicProvider, FallbackProvider

provider = FallbackProvider(
    primary=VLLMProvider(api_base="http://vllm-server:8000"),
    fallback=AnthropicProvider(),
)

pipeline = Pipeline(schema_cls=MySchema, provider=provider)

Or configure via YAML (no code changes required):

yaml
inference:
  provider:
    provider: "vllm"
    fallback_provider: "anthropic"

Custom parsers

Register your own parser by extending BaseParser:

python
from pathlib import Path
from structure_d.ingestion.base import BaseParser
from structure_d.schemas.base import ParsedDocument, DocumentMetadata, DocumentFormat

class MyXMLParser(BaseParser):
    supported_extensions = [".xml", ".xsd"]

    async def parse(self, file_path: Path, **kwargs) -> ParsedDocument:
        import xml.etree.ElementTree as ET
        tree = ET.parse(file_path)
        text = "\n".join(el.text or "" for el in tree.iter() if el.text)

        return ParsedDocument(
            metadata=DocumentMetadata(
                filename=file_path.name,
                source="local",
                file_extension=".xml",
                format=DocumentFormat.PLAIN_TEXT,
                file_size_bytes=file_path.stat().st_size,
                page_count=None,
                ingested_at=__import__("datetime").datetime.now(),
                extra=,
            ),
            text=text,
            pages=[text],
            tables=[],
            images=[],
        )

# Register with the pipeline
from structure_d.ingestion.manager import build_default_registry

registry = build_default_registry()
registry.register("my_xml", MyXMLParser())

pipeline = Pipeline(
    schema_cls=MySchema,
    ingestion_manager=IngestionManager(registry=registry),
    provider=OpenAIProvider(),
)

Cloud connectors

Ingest documents directly from cloud storage:

python
from structure_d.ingestion.connectors import S3Connector
from structure_d.ingestion.manager import IngestionManager

connector = S3Connector(
    bucket="my-documents",
    prefix="invoices/2025/",
    region="us-east-1",
    # credentials from env: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY
)

manager = IngestionManager(connector=connector)
documents = await manager.ingest_from_connector()
ConnectorImport nameAuth
Local filesystemLocalConnector
AWS S3S3ConnectorAWS_ACCESS_KEY_ID
Google Cloud StorageGCSConnectorGOOGLE_APPLICATION_CREDENTIALS
Azure Blob StorageAzureConnectorAZURE_STORAGE_CONNECTION_STRING
SFTPSFTPConnectorhost + key/password

Batch processing

For very large document sets, use BatchProcessor directly to control concurrency and error handling:

python
from structure_d.inference.batch import BatchProcessor

processor = BatchProcessor(
    provider=OpenAIProvider(),
    max_concurrent=16,
    retry_failed=True,
)

chunks = [...]   # list[TextChunk] from preprocessing

results = await processor.process_batch(
    chunks,
    schema_cls=MySchema,
    task=TaskType.EXTRACTION,
)

successful = [r for r in results if r.is_valid]
print(f"Success rate: {len(successful)}/{len(results)}")