๐ Quick Wins
Async First Use AsyncLexa for 3-5x better performance in production
Batch Processing Process multiple documents together for optimal throughput
Smart Chunking Leverage built-in chunking for vector database optimization
Error Handling Implement proper retry logic and graceful degradation
Production Patterns
Async-First Architecture
Always prefer async operations for production workloads:
Recommended: Async
Avoid: Sync for Large Batches
import asyncio
from cerevox import AsyncLexa, ProcessingMode
async def process_documents ( file_paths ):
async with AsyncLexa( api_key = "your-api-key" ) as client:
# Process multiple documents concurrently
documents = await client.parse(
file_paths,
mode = ProcessingMode. DEFAULT
)
# Get vector-ready chunks
chunks = documents.get_all_text_chunks( target_size = 512 )
return chunks
# Process 100+ documents efficiently
file_batch = [ "doc1.pdf" , "doc2.docx" , "doc3.txt" ]
chunks = asyncio.run(process_documents(file_batch))
Robust Error Handling
Implement comprehensive error handling with automatic retries:
import asyncio
from cerevox import AsyncLexa, LexaError
from tenacity import retry, stop_after_attempt, wait_exponential
class ProductionLexaClient :
def __init__ ( self , api_key : str ):
self .api_key = api_key
@retry (
stop = stop_after_attempt( 3 ),
wait = wait_exponential( multiplier = 1 , min = 4 , max = 10 )
)
async def safe_parse ( self , files , ** kwargs ):
"""Parse with automatic retry on failures"""
try :
async with AsyncLexa( api_key = self .api_key) as client:
return await client.parse(files, ** kwargs)
except LexaError as e:
if e.status_code >= 500 : # Server errors - retry
raise
else : # Client errors - don't retry
return self ._handle_client_error(e)
def _handle_client_error ( self , error ):
"""Handle client errors gracefully"""
print ( f "Client error: { error.message } " )
return [] # Return empty result or default
# Usage
client = ProductionLexaClient( "your-api-key" )
documents = await client.safe_parse([ "document.pdf" ])
Progress Monitoring
Implement detailed progress tracking for long-running operations:
from datetime import datetime
import logging
def production_progress_callback ( status ):
"""Production-ready progress callback with logging"""
timestamp = datetime.now().isoformat()
# Log to your monitoring system
logging.info( f "[ { timestamp } ] Job { status.job_id } : { status.status } " )
if hasattr (status, 'progress' ) and status.progress:
logging.info( f "Progress: { status.progress } " )
# Send to monitoring service (DataDog, New Relic, etc.)
# monitor.track('lexa.processing.progress', status.progress)
async def monitored_parsing ():
async with AsyncLexa( api_key = "your-api-key" ) as client:
documents = await client.parse(
[ "large_document.pdf" ],
progress_callback = production_progress_callback,
timeout = 300.0 , # 5 minutes
poll_interval = 2.0 # Check every 2 seconds
)
return documents
Optimal Batch Sizing
Balance throughput and memory usage:
async def optimize_batch_processing ( file_paths : list ):
"""Process files in optimal batch sizes"""
OPTIMAL_BATCH_SIZE = 10 # Adjust based on file sizes
async with AsyncLexa( api_key = "your-api-key" ) as client:
all_documents = []
for i in range ( 0 , len (file_paths), OPTIMAL_BATCH_SIZE ):
batch = file_paths[i:i + OPTIMAL_BATCH_SIZE ]
# Process batch
documents = await client.parse(
batch,
mode = ProcessingMode. DEFAULT # Fast processing for most use cases
)
all_documents.extend(documents)
# Optional: Brief pause to avoid rate limits
await asyncio.sleep( 0.1 )
return all_documents
Processing Mode Selection
Choose the right mode for your use case:
Fast Processing (Default)
Maximum Accuracy (Advanced)
# Best for: Most documents, general content processing
documents = await client.parse(
files,
mode = ProcessingMode. DEFAULT # Fast and efficient
)
Memory Management
Handle large document batches efficiently:
async def memory_efficient_processing ( file_paths ):
"""Process large batches without memory issues"""
async with AsyncLexa( api_key = "your-api-key" ) as client:
for batch in chunk_files(file_paths, batch_size = 5 ):
# Process batch
documents = await client.parse(batch)
# Process immediately and release memory
chunks = documents.get_all_text_chunks( target_size = 512 )
# Send to vector DB immediately
await store_in_vector_db(chunks)
# Explicitly release memory
del documents, chunks
def chunk_files ( files , batch_size ):
"""Split files into batches"""
for i in range ( 0 , len (files), batch_size):
yield files[i:i + batch_size]
Vector Database Integration
Optimal Chunking Strategy
Configure chunking for your vector database:
async def prepare_for_vector_db ( documents ):
"""Optimize chunks for different vector databases"""
# For OpenAI embeddings (ada-002)
openai_chunks = documents.get_all_text_chunks(
target_size = 1500 , # ~2000 tokens with overhead
tolerance = 0.1 # 10% size tolerance
)
# For sentence transformers
sentence_chunks = documents.get_all_text_chunks(
target_size = 384 , # Typical context window
tolerance = 0.15
)
# For document-level embeddings
doc_chunks = documents.get_all_text_chunks(
target_size = 2000 , # Larger chunks for context
tolerance = 0.2
)
return {
'openai' : openai_chunks,
'sentence' : sentence_chunks,
'document' : doc_chunks
}
Include comprehensive metadata for better retrieval:
def extract_rich_metadata ( documents ):
"""Extract comprehensive metadata for vector storage"""
chunks_with_metadata = []
for doc in documents:
text_chunks = doc.get_text_chunks( target_size = 512 )
for chunk in text_chunks:
metadata = {
# Document metadata
'filename' : doc.filename,
'file_type' : doc.file_type,
'total_pages' : doc.total_pages,
'processing_date' : datetime.now().isoformat(),
# Chunk metadata
'chunk_size' : len (chunk),
'chunk_index' : text_chunks.index(chunk),
'total_chunks' : len (text_chunks),
# Content metadata
'has_tables' : len (doc.tables) > 0 ,
'has_images' : len (doc.images) > 0 ,
'element_count' : doc.total_elements,
# Custom fields
'document_category' : classify_document(doc),
'extraction_confidence' : calculate_confidence(doc)
}
chunks_with_metadata.append({
'content' : chunk,
'metadata' : metadata
})
return chunks_with_metadata
Security Best Practices
API Key Management
Never hardcode API keys in your application:
โ
Secure: Environment Variables
โ
Secure: Configuration Files
โ Insecure: Hardcoded Keys
import os
from cerevox import AsyncLexa
# Load from environment
api_key = os.getenv( 'CEREVOX_API_KEY' )
if not api_key:
raise ValueError ( "CEREVOX_API_KEY environment variable not set" )
async with AsyncLexa( api_key = api_key) as client:
documents = await client.parse(files)
Data Privacy
Handle sensitive documents securely:
import tempfile
import os
from pathlib import Path
async def secure_document_processing ( sensitive_files ):
"""Process sensitive documents with automatic cleanup"""
temp_dir = None
try :
# Create secure temporary directory
temp_dir = tempfile.mkdtemp( prefix = 'cerevox_secure_' )
# Process documents
async with AsyncLexa( api_key = os.getenv( 'CEREVOX_API_KEY' )) as client:
documents = await client.parse(sensitive_files)
# Process results immediately
chunks = documents.get_all_text_chunks( target_size = 512 )
# Store in secure location
secure_results = encrypt_and_store(chunks)
return secure_results
finally :
# Always cleanup temporary files
if temp_dir and Path(temp_dir).exists():
import shutil
shutil.rmtree(temp_dir, ignore_errors = True )
Monitoring and Observability
Production Logging
Implement comprehensive logging:
import logging
import time
from functools import wraps
# Configure structured logging
logging.basicConfig(
level = logging. INFO ,
format = ' %(asctime)s - %(name)s - %(levelname)s - %(message)s '
)
logger = logging.getLogger( 'cerevox.production' )
def monitor_performance ( func ):
"""Decorator to monitor Lexa operations"""
@wraps (func)
async def wrapper ( * args , ** kwargs ):
start_time = time.time()
operation_name = func. __name__
try :
logger.info( f "Starting { operation_name } " )
result = await func( * args, ** kwargs)
duration = time.time() - start_time
logger.info( f "Completed { operation_name } in { duration :.2f} s" )
# Send metrics to monitoring service
# metrics.timing(f'cerevox.{operation_name}.duration', duration)
return result
except Exception as e:
duration = time.time() - start_time
logger.error( f "Failed { operation_name } after { duration :.2f} s: { e } " )
# Track errors
# metrics.increment(f'cerevox.{operation_name}.error')
raise
return wrapper
@monitor_performance
async def parse_documents_monitored ( files ):
async with AsyncLexa( api_key = os.getenv( 'CEREVOX_API_KEY' )) as client:
return await client.parse(files)
Health Checks
Implement health monitoring:
async def health_check ():
"""Health check for Lexa service availability"""
try :
async with AsyncLexa( api_key = os.getenv( 'CEREVOX_API_KEY' )) as client:
# Test with minimal document
test_content = b "Health check test document."
start_time = time.time()
documents = await client.parse(test_content, timeout = 10.0 )
response_time = time.time() - start_time
return {
'status' : 'healthy' ,
'response_time' : response_time,
'service' : 'cerevox-lexa'
}
except Exception as e:
return {
'status' : 'unhealthy' ,
'error' : str (e),
'service' : 'cerevox-lexa'
}
# Use in your health endpoint
# @app.get("/health/cerevox")
# async def cerevox_health():
# return await health_check()
Testing Strategies
Unit Testing
Test your Lexa integration thoroughly:
import pytest
from unittest.mock import AsyncMock, patch
from cerevox import AsyncLexa, LexaError
class TestLexaIntegration :
@pytest.fixture
async def client ( self ):
async with AsyncLexa( api_key = "test-key" ) as client:
yield client
@pytest.mark.asyncio
async def test_successful_parsing ( self , client ):
"""Test successful document parsing"""
test_file = b "Test document content"
with patch.object(client, 'parse' ) as mock_parse:
mock_parse.return_value = [Mock( content = "Parsed content" )]
result = await client.parse(test_file)
assert len (result) == 1
assert result[ 0 ].content == "Parsed content"
@pytest.mark.asyncio
async def test_error_handling ( self , client ):
"""Test error handling"""
with patch.object(client, 'parse' ) as mock_parse:
mock_parse.side_effect = LexaError( "API Error" , status_code = 400 )
with pytest.raises(LexaError):
await client.parse( "test.pdf" )
@pytest.mark.asyncio
async def test_chunking_output ( self , client ):
"""Test chunking functionality"""
# Test with mock document
mock_doc = Mock()
mock_doc.get_text_chunks.return_value = [ "chunk1" , "chunk2" ]
chunks = mock_doc.get_text_chunks( target_size = 512 )
assert len (chunks) == 2
assert chunks[ 0 ] == "chunk1"
Remember to test with realistic document sizes and types that match your production workload.
Common Pitfalls
Avoid These Mistakes
Donโt process files sequentially - Use async operations and batch processing for better performance.
Donโt ignore timeouts - Set appropriate timeouts based on your document sizes and processing requirements.
Donโt skip error handling - Always implement proper error handling and retry logic for production systems.
# โ Anti-pattern: Sequential processing
for file in large_file_list:
doc = client.parse( file ) # Blocks thread
process(doc)
# โ
Better: Batch async processing
async with AsyncLexa( api_key = api_key) as client:
documents = await client.parse(large_file_list)
for doc in documents:
process(doc)
Production Checklist
Before deploying to production:
Next Steps