Guides
Vector Database Integration
Ready-to-use patterns for popular vector databases - Pinecone, Weaviate, Chroma, and more
Perfect Chunk Sizes
Optimized 500-char chunks ideal for most embedding models
Rich Metadata
Page numbers, source files, document structure preserved
Zero Configuration
Works out-of-the-box with popular vector databases
Production Ready
Battle-tested patterns for enterprise RAG systems
Quick RAG Setup (5 Minutes)
Your First RAG Knowledge Base
Copy
from cerevox import Lexa
client = Lexa()
# Parse your knowledge base documents
documents = client.parse([
"product-docs/*.pdf",
"user-manuals/*.docx",
"faqs/*.html"
])
# Get vector DB optimized chunks
chunks = documents.get_all_text_chunks(
target_size=500, # Perfect for most embeddings
overlap_size=50, # Prevents context loss
include_metadata=True # Rich metadata included
)
print(f"✅ Ready for vector database: {len(chunks)} chunks")
# Each chunk has everything you need:
for chunk in chunks[:2]:
print(f"Text: {chunk.content[:100]}...")
print(f"Page: {chunk.page_number}")
print(f"Source: {chunk.source_file}")
print(f"Metadata: {chunk.metadata}")
print("---")
Vector Database Examples
Pinecone Integration
Copy
import pinecone
from cerevox import Lexa
from sentence_transformers import SentenceTransformer
# 1. Setup Pinecone
pinecone.init(
api_key="your-pinecone-key",
environment="us-west1-gcp" # Your environment
)
# Create index
index_name = "knowledge-base"
if index_name not in pinecone.list_indexes():
pinecone.create_index(
name=index_name,
dimension=384, # For all-MiniLM-L6-v2
metric="cosine"
)
index = pinecone.Index(index_name)
# 2. Setup embedding model
embedder = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
# 3. Parse and upload documents
client = Lexa()
documents = client.parse(["knowledge-base/*.pdf"])
# Get optimized chunks
chunks = documents.get_all_text_chunks(target_size=500)
# 4. Upload to Pinecone
vectors_to_upsert = []
for chunk in chunks:
# Create embedding
embedding = embedder.encode(chunk.content).tolist()
# Prepare for Pinecone
vectors_to_upsert.append({
'id': f"{chunk.source_file}_{chunk.page_number}_{len(vectors_to_upsert)}",
'values': embedding,
'metadata': {
'text': chunk.content,
'source': chunk.source_file,
'page': chunk.page_number,
# All Lexa metadata preserved
**chunk.metadata
}
})
# Upload in batches
batch_size = 100
for i in range(0, len(vectors_to_upsert), batch_size):
batch = vectors_to_upsert[i:i + batch_size]
index.upsert(vectors=batch)
print(f"✅ Uploaded {len(vectors_to_upsert)} vectors to Pinecone")
Weaviate Integration
Copy
import weaviate
from cerevox import Lexa
# 1. Connect to Weaviate
client_weaviate = weaviate.Client(
url="https://your-cluster.weaviate.network",
auth_client_secret=weaviate.AuthApiKey(api_key="your-key")
)
# 2. Create schema
schema = {
"classes": [{
"class": "KnowledgeChunk",
"description": "Document chunks from Lexa parsing",
"vectorizer": "text2vec-openai", # or your preferred vectorizer
"properties": [
{
"name": "content",
"dataType": ["text"],
"description": "The chunk text content"
},
{
"name": "source_file",
"dataType": ["string"],
"description": "Source document filename"
},
{
"name": "page_number",
"dataType": ["int"],
"description": "Page number in source document"
},
{
"name": "chunk_index",
"dataType": ["int"],
"description": "Index of chunk in document"
}
]
}]
}
# Create schema (run once)
try:
client_weaviate.schema.create(schema)
print("✅ Weaviate schema created")
except:
print("ℹ️ Schema already exists")
# 3. Parse and upload documents
lexa_client = Lexa()
documents = lexa_client.parse(["documents/*.pdf"])
chunks = documents.get_all_text_chunks(target_size=500)
# 4. Upload to Weaviate
with client_weaviate.batch as batch:
batch.batch_size = 100
for chunk in chunks:
batch.add_data_object(
data_object={
"content": chunk.content,
"source_file": chunk.source_file,
"page_number": chunk.page_number,
"chunk_index": getattr(chunk, 'chunk_index', 0)
},
class_name="KnowledgeChunk"
)
print(f"✅ Uploaded {len(chunks)} chunks to Weaviate")
Chroma Integration
Copy
import chromadb
from cerevox import Lexa
# 1. Initialize Chroma
chroma_client = chromadb.Client()
# Create collection
collection = chroma_client.create_collection(
name="knowledge_base",
metadata={"description": "Lexa processed documents"}
)
# 2. Parse documents with Lexa
lexa_client = Lexa()
documents = lexa_client.parse(["docs/*.pdf", "manuals/*.docx"])
chunks = documents.get_all_text_chunks(target_size=500)
# 3. Prepare data for Chroma
documents_list = []
metadatas_list = []
ids_list = []
for i, chunk in enumerate(chunks):
documents_list.append(chunk.content)
metadatas_list.append({
"source_file": chunk.source_file,
"page_number": chunk.page_number,
"chunk_type": "text"
})
ids_list.append(f"chunk_{i}")
# 4. Add to Chroma
collection.add(
documents=documents_list,
metadatas=metadatas_list,
ids=ids_list
)
print(f"✅ Added {len(chunks)} chunks to Chroma")
Production RAG Patterns
Advanced Chunking Strategies
Copy
from cerevox import Lexa
def create_multimodal_chunks(files):
"""Create specialized chunks for different content types"""
client = Lexa()
documents = client.parse(files)
all_chunks = []
for doc in documents:
# Regular text chunks
text_chunks = doc.get_text_chunks(target_size=500)
for chunk in text_chunks:
all_chunks.append({
'content': chunk.content,
'type': 'text',
'source': chunk.source_file,
'page': chunk.page_number,
'metadata': chunk.metadata
})
# Table-specific chunks (larger for context)
for table in doc.tables:
table_content = f"Table from page {table.page_number}:\n{table.to_text()}"
if table.caption:
table_content = f"Table Caption: {table.caption}\n{table_content}"
all_chunks.append({
'content': table_content,
'type': 'table',
'source': doc.source_file,
'page': table.page_number,
'metadata': {
'rows': table.rows,
'columns': table.columns,
'table_id': table.id
}
})
# Image descriptions (if available)
for image in doc.images:
if hasattr(image, 'description') and image.description:
all_chunks.append({
'content': f"Image description: {image.description}",
'type': 'image',
'source': doc.source_file,
'page': image.page_number,
'metadata': {
'image_id': image.id,
'alt_text': getattr(image, 'alt_text', '')
}
})
print(f"📊 Created multimodal chunks:")
print(f" 📝 Text: {len([c for c in all_chunks if c['type'] == 'text'])}")
print(f" 📋 Tables: {len([c for c in all_chunks if c['type'] == 'table'])}")
print(f" 🖼️ Images: {len([c for c in all_chunks if c['type'] == 'image'])}")
return all_chunks
# Create multimodal knowledge base
multimodal_chunks = create_multimodal_chunks(["complex-report.pdf"])
High-Performance RAG Pipeline
Copy
import asyncio
from cerevox import AsyncLexa
from concurrent.futures import ThreadPoolExecutor
import time
class ProductionRAGPipeline:
def __init__(self, vector_db_client, embedding_model):
self.vector_db = vector_db_client
self.embedder = embedding_model
self.processed_docs = set()
async def process_documents_async(self, files, batch_size=20):
"""Process documents in parallel batches"""
async with AsyncLexa() as client:
print(f"🚀 Processing {len(files)} documents in batches of {batch_size}")
all_chunks = []
# Process in batches
for i in range(0, len(files), batch_size):
batch = files[i:i + batch_size]
print(f"📋 Processing batch {i//batch_size + 1}: {len(batch)} files")
start_time = time.time()
# Parse documents
documents = await client.parse(batch)
# Create chunks
batch_chunks = []
for doc in documents:
chunks = doc.get_text_chunks(target_size=500)
batch_chunks.extend(chunks)
all_chunks.extend(batch_chunks)
batch_time = time.time() - start_time
print(f"✅ Batch complete: {len(batch_chunks)} chunks in {batch_time:.2f}s")
return all_chunks
async def upload_to_vector_db_async(self, chunks, batch_size=100):
"""Upload chunks to vector database with threading"""
def embed_batch(batch_chunks):
"""Embed a batch of chunks (CPU intensive)"""
texts = [chunk.content for chunk in batch_chunks]
embeddings = self.embedder.encode(texts)
return embeddings
print(f"🔗 Creating embeddings for {len(chunks)} chunks...")
# Use ThreadPoolExecutor for CPU-intensive embedding
with ThreadPoolExecutor(max_workers=4) as executor:
upload_futures = []
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i + batch_size]
# Create embeddings in thread
future = executor.submit(embed_batch, batch)
upload_futures.append((batch, future))
# Process results and upload
for batch, future in upload_futures:
embeddings = future.result()
# Prepare vectors for upload
vectors = []
for chunk, embedding in zip(batch, embeddings):
vectors.append({
'id': f"{chunk.source_file}_{chunk.page_number}_{len(vectors)}",
'values': embedding.tolist(),
'metadata': {
'text': chunk.content,
'source': chunk.source_file,
'page': chunk.page_number
}
})
# Upload to vector database
await self.upload_vectors_async(vectors)
print(f"✅ All chunks uploaded to vector database")
async def upload_vectors_async(self, vectors):
"""Upload vectors to database (implement for your vector DB)"""
# Implement based on your vector database
# This is a placeholder for async upload
await asyncio.sleep(0.1) # Simulate upload time
print(f"📤 Uploaded batch of {len(vectors)} vectors")
# Usage example
async def run_production_pipeline():
# Initialize your vector DB and embedding model
# vector_db = YourVectorDBClient()
# embedder = YourEmbeddingModel()
# pipeline = ProductionRAGPipeline(vector_db, embedder)
# Large document set
large_doc_set = [f"documents/doc_{i:04d}.pdf" for i in range(1000)]
# Process documents
start_time = time.time()
# chunks = await pipeline.process_documents_async(large_doc_set)
# await pipeline.upload_to_vector_db_async(chunks)
total_time = time.time() - start_time
print(f"🎉 Production pipeline complete in {total_time:.2f} seconds")
# print(f"📊 Processed {len(chunks)} chunks from {len(large_doc_set)} documents")
# Run the production pipeline
# asyncio.run(run_production_pipeline())
Vector Database Comparison
Best for: Production applications, auto-scaling, minimal setup
Copy
# Pros: Fully managed, excellent performance, auto-scaling
# Cons: Cost scales with usage, vendor lock-in
# Use when: Building production RAG applications
Best for: Flexibility, custom schemas, hybrid search
Copy
# Pros: Open source, hybrid search, flexible schemas
# Cons: More complex setup, resource intensive
# Use when: Need hybrid search or custom data models
Best for: Development, small to medium datasets
Copy
# Pros: Simple setup, lightweight, great for development
# Cons: Limited scalability for very large datasets
# Use when: Prototyping or smaller applications
Best for: High-performance requirements, filtering
Copy
# Pros: Excellent performance, advanced filtering, Rust-based
# Cons: Newer ecosystem, fewer integrations
# Use when: Performance is critical
RAG Ready: Lexa chunks work out-of-the-box with any vector database. Start with Chroma for development, then scale to Pinecone or Weaviate for production.
Assistant
Responses are generated using AI and may contain mistakes.