Perfect Chunk Sizes

Optimized 500-char chunks ideal for most embedding models

Rich Metadata

Page numbers, source files, document structure preserved

Zero Configuration

Works out-of-the-box with popular vector databases

Production Ready

Battle-tested patterns for enterprise RAG systems

Quick RAG Setup (5 Minutes)

Your First RAG Knowledge Base

from cerevox import Lexa

client = Lexa()

# Parse your knowledge base documents
documents = client.parse([
    "product-docs/*.pdf",
    "user-manuals/*.docx", 
    "faqs/*.html"
])

# Get vector DB optimized chunks
chunks = documents.get_all_text_chunks(
    target_size=500,        # Perfect for most embeddings
    overlap_size=50,        # Prevents context loss
    include_metadata=True   # Rich metadata included
)

print(f"✅ Ready for vector database: {len(chunks)} chunks")

# Each chunk has everything you need:
for chunk in chunks[:2]:
    print(f"Text: {chunk.content[:100]}...")
    print(f"Page: {chunk.page_number}")
    print(f"Source: {chunk.source_file}")
    print(f"Metadata: {chunk.metadata}")
    print("---")

Vector Database Examples

Pinecone Integration

import pinecone
from cerevox import Lexa
from sentence_transformers import SentenceTransformer

# 1. Setup Pinecone
pinecone.init(
    api_key="your-pinecone-key",
    environment="us-west1-gcp"  # Your environment
)

# Create index
index_name = "knowledge-base"
if index_name not in pinecone.list_indexes():
    pinecone.create_index(
        name=index_name,
        dimension=384,  # For all-MiniLM-L6-v2
        metric="cosine"
    )

index = pinecone.Index(index_name)

# 2. Setup embedding model
embedder = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')

# 3. Parse and upload documents
client = Lexa()
documents = client.parse(["knowledge-base/*.pdf"])

# Get optimized chunks
chunks = documents.get_all_text_chunks(target_size=500)

# 4. Upload to Pinecone
vectors_to_upsert = []
for chunk in chunks:
    # Create embedding
    embedding = embedder.encode(chunk.content).tolist()
    
    # Prepare for Pinecone
    vectors_to_upsert.append({
        'id': f"{chunk.source_file}_{chunk.page_number}_{len(vectors_to_upsert)}",
        'values': embedding,
        'metadata': {
            'text': chunk.content,
            'source': chunk.source_file,
            'page': chunk.page_number,
            # All Lexa metadata preserved
            **chunk.metadata
        }
    })

# Upload in batches
batch_size = 100
for i in range(0, len(vectors_to_upsert), batch_size):
    batch = vectors_to_upsert[i:i + batch_size]
    index.upsert(vectors=batch)

print(f"✅ Uploaded {len(vectors_to_upsert)} vectors to Pinecone")

Weaviate Integration

import weaviate
from cerevox import Lexa

# 1. Connect to Weaviate
client_weaviate = weaviate.Client(
    url="https://your-cluster.weaviate.network",
    auth_client_secret=weaviate.AuthApiKey(api_key="your-key")
)

# 2. Create schema
schema = {
    "classes": [{
        "class": "KnowledgeChunk",
        "description": "Document chunks from Lexa parsing",
        "vectorizer": "text2vec-openai",  # or your preferred vectorizer
        "properties": [
            {
                "name": "content",
                "dataType": ["text"],
                "description": "The chunk text content"
            },
            {
                "name": "source_file",
                "dataType": ["string"],
                "description": "Source document filename"
            },
            {
                "name": "page_number",
                "dataType": ["int"],
                "description": "Page number in source document"
            },
            {
                "name": "chunk_index",
                "dataType": ["int"],
                "description": "Index of chunk in document"
            }
        ]
    }]
}

# Create schema (run once)
try:
    client_weaviate.schema.create(schema)
    print("✅ Weaviate schema created")
except:
    print("ℹ️ Schema already exists")

# 3. Parse and upload documents
lexa_client = Lexa()
documents = lexa_client.parse(["documents/*.pdf"])

chunks = documents.get_all_text_chunks(target_size=500)

# 4. Upload to Weaviate
with client_weaviate.batch as batch:
    batch.batch_size = 100
    
    for chunk in chunks:
        batch.add_data_object(
            data_object={
                "content": chunk.content,
                "source_file": chunk.source_file,
                "page_number": chunk.page_number,
                "chunk_index": getattr(chunk, 'chunk_index', 0)
            },
            class_name="KnowledgeChunk"
        )

print(f"✅ Uploaded {len(chunks)} chunks to Weaviate")

Chroma Integration

import chromadb
from cerevox import Lexa

# 1. Initialize Chroma
chroma_client = chromadb.Client()

# Create collection
collection = chroma_client.create_collection(
    name="knowledge_base",
    metadata={"description": "Lexa processed documents"}
)

# 2. Parse documents with Lexa
lexa_client = Lexa()
documents = lexa_client.parse(["docs/*.pdf", "manuals/*.docx"])

chunks = documents.get_all_text_chunks(target_size=500)

# 3. Prepare data for Chroma
documents_list = []
metadatas_list = []
ids_list = []

for i, chunk in enumerate(chunks):
    documents_list.append(chunk.content)
    
    metadatas_list.append({
        "source_file": chunk.source_file,
        "page_number": chunk.page_number,
        "chunk_type": "text"
    })
    
    ids_list.append(f"chunk_{i}")

# 4. Add to Chroma
collection.add(
    documents=documents_list,
    metadatas=metadatas_list,
    ids=ids_list
)

print(f"✅ Added {len(chunks)} chunks to Chroma")

Production RAG Patterns

Advanced Chunking Strategies

from cerevox import Lexa

def create_multimodal_chunks(files):
    """Create specialized chunks for different content types"""
    
    client = Lexa()
    documents = client.parse(files)
    
    all_chunks = []
    
    for doc in documents:
        # Regular text chunks
        text_chunks = doc.get_text_chunks(target_size=500)
        for chunk in text_chunks:
            all_chunks.append({
                'content': chunk.content,
                'type': 'text',
                'source': chunk.source_file,
                'page': chunk.page_number,
                'metadata': chunk.metadata
            })
        
        # Table-specific chunks (larger for context)
        for table in doc.tables:
            table_content = f"Table from page {table.page_number}:\n{table.to_text()}"
            if table.caption:
                table_content = f"Table Caption: {table.caption}\n{table_content}"
            
            all_chunks.append({
                'content': table_content,
                'type': 'table',
                'source': doc.source_file,
                'page': table.page_number,
                'metadata': {
                    'rows': table.rows,
                    'columns': table.columns,
                    'table_id': table.id
                }
            })
        
        # Image descriptions (if available)
        for image in doc.images:
            if hasattr(image, 'description') and image.description:
                all_chunks.append({
                    'content': f"Image description: {image.description}",
                    'type': 'image',
                    'source': doc.source_file,
                    'page': image.page_number,
                    'metadata': {
                        'image_id': image.id,
                        'alt_text': getattr(image, 'alt_text', '')
                    }
                })
    
    print(f"📊 Created multimodal chunks:")
    print(f"  📝 Text: {len([c for c in all_chunks if c['type'] == 'text'])}")
    print(f"  📋 Tables: {len([c for c in all_chunks if c['type'] == 'table'])}")
    print(f"  🖼️  Images: {len([c for c in all_chunks if c['type'] == 'image'])}")
    
    return all_chunks

# Create multimodal knowledge base
multimodal_chunks = create_multimodal_chunks(["complex-report.pdf"])

High-Performance RAG Pipeline

import asyncio
from cerevox import AsyncLexa
from concurrent.futures import ThreadPoolExecutor
import time

class ProductionRAGPipeline:
    def __init__(self, vector_db_client, embedding_model):
        self.vector_db = vector_db_client
        self.embedder = embedding_model
        self.processed_docs = set()
        
    async def process_documents_async(self, files, batch_size=20):
        """Process documents in parallel batches"""
        
        async with AsyncLexa() as client:
            print(f"🚀 Processing {len(files)} documents in batches of {batch_size}")
            
            all_chunks = []
            
            # Process in batches
            for i in range(0, len(files), batch_size):
                batch = files[i:i + batch_size]
                
                print(f"📋 Processing batch {i//batch_size + 1}: {len(batch)} files")
                start_time = time.time()
                
                # Parse documents
                documents = await client.parse(batch)
                
                # Create chunks
                batch_chunks = []
                for doc in documents:
                    chunks = doc.get_text_chunks(target_size=500)
                    batch_chunks.extend(chunks)
                
                all_chunks.extend(batch_chunks)
                
                batch_time = time.time() - start_time
                print(f"✅ Batch complete: {len(batch_chunks)} chunks in {batch_time:.2f}s")
            
            return all_chunks
    
    async def upload_to_vector_db_async(self, chunks, batch_size=100):
        """Upload chunks to vector database with threading"""
        
        def embed_batch(batch_chunks):
            """Embed a batch of chunks (CPU intensive)"""
            texts = [chunk.content for chunk in batch_chunks]
            embeddings = self.embedder.encode(texts)
            return embeddings
        
        print(f"🔗 Creating embeddings for {len(chunks)} chunks...")
        
        # Use ThreadPoolExecutor for CPU-intensive embedding
        with ThreadPoolExecutor(max_workers=4) as executor:
            upload_futures = []
            
            for i in range(0, len(chunks), batch_size):
                batch = chunks[i:i + batch_size]
                
                # Create embeddings in thread
                future = executor.submit(embed_batch, batch)
                upload_futures.append((batch, future))
            
            # Process results and upload
            for batch, future in upload_futures:
                embeddings = future.result()
                
                # Prepare vectors for upload
                vectors = []
                for chunk, embedding in zip(batch, embeddings):
                    vectors.append({
                        'id': f"{chunk.source_file}_{chunk.page_number}_{len(vectors)}",
                        'values': embedding.tolist(),
                        'metadata': {
                            'text': chunk.content,
                            'source': chunk.source_file,
                            'page': chunk.page_number
                        }
                    })
                
                # Upload to vector database
                await self.upload_vectors_async(vectors)
        
        print(f"✅ All chunks uploaded to vector database")
    
    async def upload_vectors_async(self, vectors):
        """Upload vectors to database (implement for your vector DB)"""
        # Implement based on your vector database
        # This is a placeholder for async upload
        await asyncio.sleep(0.1)  # Simulate upload time
        print(f"📤 Uploaded batch of {len(vectors)} vectors")

# Usage example
async def run_production_pipeline():
    # Initialize your vector DB and embedding model
    # vector_db = YourVectorDBClient()
    # embedder = YourEmbeddingModel()
    
    # pipeline = ProductionRAGPipeline(vector_db, embedder)
    
    # Large document set
    large_doc_set = [f"documents/doc_{i:04d}.pdf" for i in range(1000)]
    
    # Process documents
    start_time = time.time()
    
    # chunks = await pipeline.process_documents_async(large_doc_set)
    # await pipeline.upload_to_vector_db_async(chunks)
    
    total_time = time.time() - start_time
    
    print(f"🎉 Production pipeline complete in {total_time:.2f} seconds")
    # print(f"📊 Processed {len(chunks)} chunks from {len(large_doc_set)} documents")

# Run the production pipeline
# asyncio.run(run_production_pipeline())

Vector Database Comparison


RAG Ready: Lexa chunks work out-of-the-box with any vector database. Start with Chroma for development, then scale to Pinecone or Weaviate for production.