๐Ÿš€ Quick Wins

Async First

Use AsyncLexa for 3-5x better performance in production

Batch Processing

Process multiple documents together for optimal throughput

Smart Chunking

Leverage built-in chunking for vector database optimization

Error Handling

Implement proper retry logic and graceful degradation

Production Patterns

Async-First Architecture

Always prefer async operations for production workloads:

import asyncio
from cerevox import AsyncLexa, ProcessingMode

async def process_documents(file_paths):
    async with AsyncLexa(api_key="your-api-key") as client:
        # Process multiple documents concurrently
        documents = await client.parse(
            file_paths,
            mode=ProcessingMode.DEFAULT
        )
        
        # Get vector-ready chunks
        chunks = documents.get_all_text_chunks(target_size=512)
        return chunks

# Process 100+ documents efficiently
file_batch = ["doc1.pdf", "doc2.docx", "doc3.txt"]
chunks = asyncio.run(process_documents(file_batch))

Robust Error Handling

Implement comprehensive error handling with automatic retries:

import asyncio
from cerevox import AsyncLexa, LexaError
from tenacity import retry, stop_after_attempt, wait_exponential

class ProductionLexaClient:
    def __init__(self, api_key: str):
        self.api_key = api_key
    
    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=4, max=10)
    )
    async def safe_parse(self, files, **kwargs):
        """Parse with automatic retry on failures"""
        try:
            async with AsyncLexa(api_key=self.api_key) as client:
                return await client.parse(files, **kwargs)
        except LexaError as e:
            if e.status_code >= 500:  # Server errors - retry
                raise
            else:  # Client errors - don't retry
                return self._handle_client_error(e)
    
    def _handle_client_error(self, error):
        """Handle client errors gracefully"""
        print(f"Client error: {error.message}")
        return []  # Return empty result or default

# Usage
client = ProductionLexaClient("your-api-key")
documents = await client.safe_parse(["document.pdf"])

Progress Monitoring

Implement detailed progress tracking for long-running operations:

from datetime import datetime
import logging

def production_progress_callback(status):
    """Production-ready progress callback with logging"""
    timestamp = datetime.now().isoformat()
    
    # Log to your monitoring system
    logging.info(f"[{timestamp}] Job {status.job_id}: {status.status}")
    
    if hasattr(status, 'progress') and status.progress:
        logging.info(f"Progress: {status.progress}")
    
    # Send to monitoring service (DataDog, New Relic, etc.)
    # monitor.track('lexa.processing.progress', status.progress)

async def monitored_parsing():
    async with AsyncLexa(api_key="your-api-key") as client:
        documents = await client.parse(
            ["large_document.pdf"],
            progress_callback=production_progress_callback,
            timeout=300.0,  # 5 minutes
            poll_interval=2.0  # Check every 2 seconds
        )
        return documents

Performance Optimization

Optimal Batch Sizing

Balance throughput and memory usage:

async def optimize_batch_processing(file_paths: list):
    """Process files in optimal batch sizes"""
    OPTIMAL_BATCH_SIZE = 10  # Adjust based on file sizes
    
    async with AsyncLexa(api_key="your-api-key") as client:
        all_documents = []
        
        for i in range(0, len(file_paths), OPTIMAL_BATCH_SIZE):
            batch = file_paths[i:i + OPTIMAL_BATCH_SIZE]
            
            # Process batch
            documents = await client.parse(
                batch,
                mode=ProcessingMode.DEFAULT  # Fast processing for most use cases
            )
            
            all_documents.extend(documents)
            
            # Optional: Brief pause to avoid rate limits
            await asyncio.sleep(0.1)
        
        return all_documents

Processing Mode Selection

Choose the right mode for your use case:

# Best for: Most documents, general content processing
documents = await client.parse(
    files,
    mode=ProcessingMode.DEFAULT  # Fast and efficient
)

Memory Management

Handle large document batches efficiently:

async def memory_efficient_processing(file_paths):
    """Process large batches without memory issues"""
    
    async with AsyncLexa(api_key="your-api-key") as client:
        for batch in chunk_files(file_paths, batch_size=5):
            # Process batch
            documents = await client.parse(batch)
            
            # Process immediately and release memory
            chunks = documents.get_all_text_chunks(target_size=512)
            
            # Send to vector DB immediately
            await store_in_vector_db(chunks)
            
            # Explicitly release memory
            del documents, chunks

def chunk_files(files, batch_size):
    """Split files into batches"""
    for i in range(0, len(files), batch_size):
        yield files[i:i + batch_size]

Vector Database Integration

Optimal Chunking Strategy

Configure chunking for your vector database:

async def prepare_for_vector_db(documents):
    """Optimize chunks for different vector databases"""
    
    # For OpenAI embeddings (ada-002)
    openai_chunks = documents.get_all_text_chunks(
        target_size=1500,  # ~2000 tokens with overhead
        tolerance=0.1      # 10% size tolerance
    )
    
    # For sentence transformers
    sentence_chunks = documents.get_all_text_chunks(
        target_size=384,   # Typical context window
        tolerance=0.15
    )
    
    # For document-level embeddings
    doc_chunks = documents.get_all_text_chunks(
        target_size=2000,  # Larger chunks for context
        tolerance=0.2
    )
    
    return {
        'openai': openai_chunks,
        'sentence': sentence_chunks,
        'document': doc_chunks
    }

Rich Metadata Extraction

Include comprehensive metadata for better retrieval:

def extract_rich_metadata(documents):
    """Extract comprehensive metadata for vector storage"""
    
    chunks_with_metadata = []
    
    for doc in documents:
        text_chunks = doc.get_text_chunks(target_size=512)
        
        for chunk in text_chunks:
            metadata = {
                # Document metadata
                'filename': doc.filename,
                'file_type': doc.file_type,
                'total_pages': doc.total_pages,
                'processing_date': datetime.now().isoformat(),
                
                # Chunk metadata
                'chunk_size': len(chunk),
                'chunk_index': text_chunks.index(chunk),
                'total_chunks': len(text_chunks),
                
                # Content metadata
                'has_tables': len(doc.tables) > 0,
                'has_images': len(doc.images) > 0,
                'element_count': doc.total_elements,
                
                # Custom fields
                'document_category': classify_document(doc),
                'extraction_confidence': calculate_confidence(doc)
            }
            
            chunks_with_metadata.append({
                'content': chunk,
                'metadata': metadata
            })
    
    return chunks_with_metadata

Security Best Practices

API Key Management

Never hardcode API keys in your application:

import os
from cerevox import AsyncLexa

# Load from environment
api_key = os.getenv('CEREVOX_API_KEY')
if not api_key:
    raise ValueError("CEREVOX_API_KEY environment variable not set")

async with AsyncLexa(api_key=api_key) as client:
    documents = await client.parse(files)

Data Privacy

Handle sensitive documents securely:

import tempfile
import os
from pathlib import Path

async def secure_document_processing(sensitive_files):
    """Process sensitive documents with automatic cleanup"""
    
    temp_dir = None
    try:
        # Create secure temporary directory
        temp_dir = tempfile.mkdtemp(prefix='cerevox_secure_')
        
        # Process documents
        async with AsyncLexa(api_key=os.getenv('CEREVOX_API_KEY')) as client:
            documents = await client.parse(sensitive_files)
            
            # Process results immediately
            chunks = documents.get_all_text_chunks(target_size=512)
            
            # Store in secure location
            secure_results = encrypt_and_store(chunks)
            
            return secure_results
            
    finally:
        # Always cleanup temporary files
        if temp_dir and Path(temp_dir).exists():
            import shutil
            shutil.rmtree(temp_dir, ignore_errors=True)

Monitoring and Observability

Production Logging

Implement comprehensive logging:

import logging
import time
from functools import wraps

# Configure structured logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('cerevox.production')

def monitor_performance(func):
    """Decorator to monitor Lexa operations"""
    @wraps(func)
    async def wrapper(*args, **kwargs):
        start_time = time.time()
        operation_name = func.__name__
        
        try:
            logger.info(f"Starting {operation_name}")
            result = await func(*args, **kwargs)
            
            duration = time.time() - start_time
            logger.info(f"Completed {operation_name} in {duration:.2f}s")
            
            # Send metrics to monitoring service
            # metrics.timing(f'cerevox.{operation_name}.duration', duration)
            
            return result
            
        except Exception as e:
            duration = time.time() - start_time
            logger.error(f"Failed {operation_name} after {duration:.2f}s: {e}")
            
            # Track errors
            # metrics.increment(f'cerevox.{operation_name}.error')
            
            raise
    
    return wrapper

@monitor_performance
async def parse_documents_monitored(files):
    async with AsyncLexa(api_key=os.getenv('CEREVOX_API_KEY')) as client:
        return await client.parse(files)

Health Checks

Implement health monitoring:

async def health_check():
    """Health check for Lexa service availability"""
    try:
        async with AsyncLexa(api_key=os.getenv('CEREVOX_API_KEY')) as client:
            # Test with minimal document
            test_content = b"Health check test document."
            
            start_time = time.time()
            documents = await client.parse(test_content, timeout=10.0)
            response_time = time.time() - start_time
            
            return {
                'status': 'healthy',
                'response_time': response_time,
                'service': 'cerevox-lexa'
            }
            
    except Exception as e:
        return {
            'status': 'unhealthy',
            'error': str(e),
            'service': 'cerevox-lexa'
        }

# Use in your health endpoint
# @app.get("/health/cerevox")
# async def cerevox_health():
#     return await health_check()

Testing Strategies

Unit Testing

Test your Lexa integration thoroughly:

import pytest
from unittest.mock import AsyncMock, patch
from cerevox import AsyncLexa, LexaError

class TestLexaIntegration:
    
    @pytest.fixture
    async def client(self):
        async with AsyncLexa(api_key="test-key") as client:
            yield client
    
    @pytest.mark.asyncio
    async def test_successful_parsing(self, client):
        """Test successful document parsing"""
        test_file = b"Test document content"
        
        with patch.object(client, 'parse') as mock_parse:
            mock_parse.return_value = [Mock(content="Parsed content")]
            
            result = await client.parse(test_file)
            
            assert len(result) == 1
            assert result[0].content == "Parsed content"
    
    @pytest.mark.asyncio
    async def test_error_handling(self, client):
        """Test error handling"""
        with patch.object(client, 'parse') as mock_parse:
            mock_parse.side_effect = LexaError("API Error", status_code=400)
            
            with pytest.raises(LexaError):
                await client.parse("test.pdf")
    
    @pytest.mark.asyncio
    async def test_chunking_output(self, client):
        """Test chunking functionality"""
        # Test with mock document
        mock_doc = Mock()
        mock_doc.get_text_chunks.return_value = ["chunk1", "chunk2"]
        
        chunks = mock_doc.get_text_chunks(target_size=512)
        
        assert len(chunks) == 2
        assert chunks[0] == "chunk1"

Remember to test with realistic document sizes and types that match your production workload.

Common Pitfalls

Avoid These Mistakes

Donโ€™t process files sequentially - Use async operations and batch processing for better performance.

Donโ€™t ignore timeouts - Set appropriate timeouts based on your document sizes and processing requirements.

Donโ€™t skip error handling - Always implement proper error handling and retry logic for production systems.

Performance Anti-Patterns

# โŒ Anti-pattern: Sequential processing
for file in large_file_list:
    doc = client.parse(file)  # Blocks thread
    process(doc)

# โœ… Better: Batch async processing  
async with AsyncLexa(api_key=api_key) as client:
    documents = await client.parse(large_file_list)
    for doc in documents:
        process(doc)

Production Checklist

Before deploying to production:

Next Steps