Skip to main content

🚀 Quick Wins

Async First

Use AsyncLexa for 3-5x better performance in production

Batch Processing

Process multiple documents together for optimal throughput

Smart Chunking

Leverage built-in chunking for vector database optimization

Error Handling

Implement proper retry logic and graceful degradation

Production Patterns

Async-First Architecture

Always prefer async operations for production workloads:
import asyncio
from cerevox import AsyncLexa, ProcessingMode

async def process_documents(file_paths):
    async with AsyncLexa(api_key="your-api-key") as client:
        # Process multiple documents concurrently
        documents = await client.parse(
            file_paths,
            mode=ProcessingMode.DEFAULT
        )
        
        # Get vector-ready chunks
        chunks = documents.get_all_text_chunks(target_size=512)
        return chunks

# Process 100+ documents efficiently
file_batch = ["doc1.pdf", "doc2.docx", "doc3.txt"]
chunks = asyncio.run(process_documents(file_batch))

Robust Error Handling

Implement comprehensive error handling with automatic retries:
import asyncio
from cerevox import AsyncLexa, LexaError
from tenacity import retry, stop_after_attempt, wait_exponential

class ProductionLexaClient:
    def __init__(self, api_key: str):
        self.api_key = api_key
    
    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=4, max=10)
    )
    async def safe_parse(self, files, **kwargs):
        """Parse with automatic retry on failures"""
        try:
            async with AsyncLexa(api_key=self.api_key) as client:
                return await client.parse(files, **kwargs)
        except LexaError as e:
            if e.status_code >= 500:  # Server errors - retry
                raise
            else:  # Client errors - don't retry
                return self._handle_client_error(e)
    
    def _handle_client_error(self, error):
        """Handle client errors gracefully"""
        print(f"Client error: {error.message}")
        return []  # Return empty result or default

# Usage
client = ProductionLexaClient("your-api-key")
documents = await client.safe_parse(["document.pdf"])

Progress Monitoring

Implement detailed progress tracking for long-running operations:
from datetime import datetime
import logging

def production_progress_callback(status):
    """Production-ready progress callback with logging"""
    timestamp = datetime.now().isoformat()
    
    # Log to your monitoring system
    logging.info(f"[{timestamp}] Job {status.job_id}: {status.status}")
    
    if hasattr(status, 'progress') and status.progress:
        logging.info(f"Progress: {status.progress}")
    
    # Send to monitoring service (DataDog, New Relic, etc.)
    # monitor.track('lexa.processing.progress', status.progress)

async def monitored_parsing():
    async with AsyncLexa(api_key="your-api-key") as client:
        documents = await client.parse(
            ["large_document.pdf"],
            progress_callback=production_progress_callback,
            timeout=300.0,  # 5 minutes
            poll_interval=2.0  # Check every 2 seconds
        )
        return documents

Performance Optimization

Optimal Batch Sizing

Balance throughput and memory usage:
async def optimize_batch_processing(file_paths: list):
    """Process files in optimal batch sizes"""
    OPTIMAL_BATCH_SIZE = 10  # Adjust based on file sizes
    
    async with AsyncLexa(api_key="your-api-key") as client:
        all_documents = []
        
        for i in range(0, len(file_paths), OPTIMAL_BATCH_SIZE):
            batch = file_paths[i:i + OPTIMAL_BATCH_SIZE]
            
            # Process batch
            documents = await client.parse(
                batch,
                mode=ProcessingMode.DEFAULT  # Fast processing for most use cases
            )
            
            all_documents.extend(documents)
            
            # Optional: Brief pause to avoid rate limits
            await asyncio.sleep(0.1)
        
        return all_documents

Processing Mode Selection

Choose the right mode for your use case:
# Best for: Most documents, general content processing
documents = await client.parse(
    files,
    mode=ProcessingMode.DEFAULT  # Fast and efficient
)

Memory Management

Handle large document batches efficiently:
async def memory_efficient_processing(file_paths):
    """Process large batches without memory issues"""
    
    async with AsyncLexa(api_key="your-api-key") as client:
        for batch in chunk_files(file_paths, batch_size=5):
            # Process batch
            documents = await client.parse(batch)
            
            # Process immediately and release memory
            chunks = documents.get_all_text_chunks(target_size=512)
            
            # Send to vector DB immediately
            await store_in_vector_db(chunks)
            
            # Explicitly release memory
            del documents, chunks

def chunk_files(files, batch_size):
    """Split files into batches"""
    for i in range(0, len(files), batch_size):
        yield files[i:i + batch_size]

Vector Database Integration

Optimal Chunking Strategy

Configure chunking for your vector database:
async def prepare_for_vector_db(documents):
    """Optimize chunks for different vector databases"""
    
    # For OpenAI embeddings (ada-002)
    openai_chunks = documents.get_all_text_chunks(
        target_size=1500,  # ~2000 tokens with overhead
        tolerance=0.1      # 10% size tolerance
    )
    
    # For sentence transformers
    sentence_chunks = documents.get_all_text_chunks(
        target_size=384,   # Typical context window
        tolerance=0.15
    )
    
    # For document-level embeddings
    doc_chunks = documents.get_all_text_chunks(
        target_size=2000,  # Larger chunks for context
        tolerance=0.2
    )
    
    return {
        'openai': openai_chunks,
        'sentence': sentence_chunks,
        'document': doc_chunks
    }

Rich Metadata Extraction

Include comprehensive metadata for better retrieval:
def extract_rich_metadata(documents):
    """Extract comprehensive metadata for vector storage"""
    
    chunks_with_metadata = []
    
    for doc in documents:
        text_chunks = doc.get_text_chunks(target_size=512)
        
        for chunk in text_chunks:
            metadata = {
                # Document metadata
                'filename': doc.filename,
                'file_type': doc.file_type,
                'total_pages': doc.total_pages,
                'processing_date': datetime.now().isoformat(),
                
                # Chunk metadata
                'chunk_size': len(chunk),
                'chunk_index': text_chunks.index(chunk),
                'total_chunks': len(text_chunks),
                
                # Content metadata
                'has_tables': len(doc.tables) > 0,
                'has_images': len(doc.images) > 0,
                'element_count': doc.total_elements,
                
                # Custom fields
                'document_category': classify_document(doc),
                'extraction_confidence': calculate_confidence(doc)
            }
            
            chunks_with_metadata.append({
                'content': chunk,
                'metadata': metadata
            })
    
    return chunks_with_metadata

Security Best Practices

API Key Management

Never hardcode API keys in your application:
import os
from cerevox import AsyncLexa

# Load from environment
api_key = os.getenv('CEREVOX_API_KEY')
if not api_key:
    raise ValueError("CEREVOX_API_KEY environment variable not set")

async with AsyncLexa(api_key=api_key) as client:
    documents = await client.parse(files)

Data Privacy

Handle sensitive documents securely:
import tempfile
import os
from pathlib import Path

async def secure_document_processing(sensitive_files):
    """Process sensitive documents with automatic cleanup"""
    
    temp_dir = None
    try:
        # Create secure temporary directory
        temp_dir = tempfile.mkdtemp(prefix='cerevox_secure_')
        
        # Process documents
        async with AsyncLexa(api_key=os.getenv('CEREVOX_API_KEY')) as client:
            documents = await client.parse(sensitive_files)
            
            # Process results immediately
            chunks = documents.get_all_text_chunks(target_size=512)
            
            # Store in secure location
            secure_results = encrypt_and_store(chunks)
            
            return secure_results
            
    finally:
        # Always cleanup temporary files
        if temp_dir and Path(temp_dir).exists():
            import shutil
            shutil.rmtree(temp_dir, ignore_errors=True)

Monitoring and Observability

Production Logging

Implement comprehensive logging:
import logging
import time
from functools import wraps

# Configure structured logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('cerevox.production')

def monitor_performance(func):
    """Decorator to monitor Lexa operations"""
    @wraps(func)
    async def wrapper(*args, **kwargs):
        start_time = time.time()
        operation_name = func.__name__
        
        try:
            logger.info(f"Starting {operation_name}")
            result = await func(*args, **kwargs)
            
            duration = time.time() - start_time
            logger.info(f"Completed {operation_name} in {duration:.2f}s")
            
            # Send metrics to monitoring service
            # metrics.timing(f'cerevox.{operation_name}.duration', duration)
            
            return result
            
        except Exception as e:
            duration = time.time() - start_time
            logger.error(f"Failed {operation_name} after {duration:.2f}s: {e}")
            
            # Track errors
            # metrics.increment(f'cerevox.{operation_name}.error')
            
            raise
    
    return wrapper

@monitor_performance
async def parse_documents_monitored(files):
    async with AsyncLexa(api_key=os.getenv('CEREVOX_API_KEY')) as client:
        return await client.parse(files)

Health Checks

Implement health monitoring:
async def health_check():
    """Health check for Lexa service availability"""
    try:
        async with AsyncLexa(api_key=os.getenv('CEREVOX_API_KEY')) as client:
            # Test with minimal document
            test_content = b"Health check test document."
            
            start_time = time.time()
            documents = await client.parse(test_content, timeout=10.0)
            response_time = time.time() - start_time
            
            return {
                'status': 'healthy',
                'response_time': response_time,
                'service': 'cerevox-lexa'
            }
            
    except Exception as e:
        return {
            'status': 'unhealthy',
            'error': str(e),
            'service': 'cerevox-lexa'
        }

# Use in your health endpoint
# @app.get("/health/cerevox")
# async def cerevox_health():
#     return await health_check()

Testing Strategies

Unit Testing

Test your Lexa integration thoroughly:
import pytest
from unittest.mock import AsyncMock, patch
from cerevox import AsyncLexa, LexaError

class TestLexaIntegration:
    
    @pytest.fixture
    async def client(self):
        async with AsyncLexa(api_key="test-key") as client:
            yield client
    
    @pytest.mark.asyncio
    async def test_successful_parsing(self, client):
        """Test successful document parsing"""
        test_file = b"Test document content"
        
        with patch.object(client, 'parse') as mock_parse:
            mock_parse.return_value = [Mock(content="Parsed content")]
            
            result = await client.parse(test_file)
            
            assert len(result) == 1
            assert result[0].content == "Parsed content"
    
    @pytest.mark.asyncio
    async def test_error_handling(self, client):
        """Test error handling"""
        with patch.object(client, 'parse') as mock_parse:
            mock_parse.side_effect = LexaError("API Error", status_code=400)
            
            with pytest.raises(LexaError):
                await client.parse("test.pdf")
    
    @pytest.mark.asyncio
    async def test_chunking_output(self, client):
        """Test chunking functionality"""
        # Test with mock document
        mock_doc = Mock()
        mock_doc.get_text_chunks.return_value = ["chunk1", "chunk2"]
        
        chunks = mock_doc.get_text_chunks(target_size=512)
        
        assert len(chunks) == 2
        assert chunks[0] == "chunk1"
Remember to test with realistic document sizes and types that match your production workload.

Common Pitfalls

Avoid These Mistakes

Don’t process files sequentially - Use async operations and batch processing for better performance.
Don’t ignore timeouts - Set appropriate timeouts based on your document sizes and processing requirements.
Don’t skip error handling - Always implement proper error handling and retry logic for production systems.

Performance Anti-Patterns

# ❌ Anti-pattern: Sequential processing
for file in large_file_list:
    doc = client.parse(file)  # Blocks thread
    process(doc)

# ✅ Better: Batch async processing  
async with AsyncLexa(api_key=api_key) as client:
    documents = await client.parse(large_file_list)
    for doc in documents:
        process(doc)

Production Checklist

Before deploying to production:
  • API keys stored securely (environment variables)
  • Timeout values configured appropriately
  • Processing modes selected for use case
  • Batch sizes optimized for your workload
  • Retry logic implemented
  • Graceful degradation on failures
  • Comprehensive error logging
  • Health checks in place
  • Async operations used throughout
  • Memory management for large batches
  • Progress monitoring implemented
  • Performance metrics tracked
  • No hardcoded credentials
  • Secure temporary file handling
  • Data privacy measures
  • Access controls configured

Next Steps

Vector Database Integration

Learn advanced patterns for RAG applications

Performance Optimization

Deep dive into performance tuning

API Reference

Explore the complete API documentation

Examples

See advanced implementation patterns