Performance Goal: Process 1000+ documents efficiently while maintaining accuracy and minimizing costs.

Quick Performance Wins

Processing Mode Optimization

from cerevox import Lexa, ProcessingMode

client = Lexa()

# ✅ DEFAULT mode - fast and efficient for most documents
documents = client.parse(
    "documents/*.pdf",
    mode=ProcessingMode.DEFAULT  # Fast processing for most use cases
)

# ✅ ADVANCED mode - maximum accuracy for complex documents
documents = client.parse(
    "complex-research-papers/*.pdf",
    mode=ProcessingMode.ADVANCED  # Use for complex layouts, research papers
)

print("💡 Rule: Start with DEFAULT, use ADVANCED for complex docs requiring maximum accuracy")

Async Processing (10x Faster)

import asyncio
import time
from cerevox import Lexa, AsyncLexa

def sync_processing_slow(files):
    """Slow synchronous processing"""
    client = Lexa()
    
    start_time = time.time()
    all_documents = []
    
    for file in files:
        documents = client.parse([file])  # One at a time
        all_documents.extend(documents)
    
    end_time = time.time()
    print(f"😴 Sync processing: {end_time - start_time:.2f} seconds")
    return all_documents

async def async_processing_fast(files):
    """Fast asynchronous processing"""
    async with AsyncLexa() as client:
        start_time = time.time()
        
        # Process all files concurrently
        documents = await client.parse(files)
        
        end_time = time.time()
        print(f"🚀 Async processing: {end_time - start_time:.2f} seconds")
        return documents

# Performance comparison
files = [f"document_{i}.pdf" for i in range(20)]

# Sync: ~100 seconds for 20 files
sync_docs = sync_processing_slow(files)

# Async: ~10 seconds for 20 files (10x faster!)
async_docs = asyncio.run(async_processing_fast(files))

print("💡 Always use async for multiple documents!")

Batch Processing Strategies

Intelligent Batching

import os
from cerevox import AsyncLexa
import asyncio

async def intelligent_batching(files):
    """Batch files based on size for optimal performance"""
    
    def analyze_files(file_list):
        file_info = []
        for file in file_list:
            if os.path.exists(file):
                size_mb = os.path.getsize(file) / (1024 * 1024)
                
                # Categorize by size
                if size_mb < 1:
                    category = 'small'
                    batch_size = 50  # Small files: large batches
                elif size_mb < 10:
                    category = 'medium' 
                    batch_size = 20  # Medium files: moderate batches
                else:
                    category = 'large'
                    batch_size = 5   # Large files: small batches
                
                file_info.append({
                    'file': file,
                    'size_mb': size_mb,
                    'category': category,
                    'batch_size': batch_size
                })
        
        return file_info
    
    # Analyze and group files
    file_info = analyze_files(files)
    
    # Group by category
    categories = {}
    for info in file_info:
        category = info['category']
        if category not in categories:
            categories[category] = []
        categories[category].append(info['file'])
    
    async with AsyncLexa() as client:
        all_documents = []
        
        for category, category_files in categories.items():
            batch_size = file_info[0]['batch_size'] if file_info else 20
            
            print(f"📋 Processing {len(category_files)} {category} files in batches of {batch_size}")
            
            for i in range(0, len(category_files), batch_size):
                batch = category_files[i:i + batch_size]
                
                start_time = time.time()
                documents = await client.parse(batch)
                batch_time = time.time() - start_time
                
                all_documents.extend(documents)
                
                docs_per_sec = len(documents) / batch_time
                print(f"  ✅ {category} batch: {len(documents)} docs in {batch_time:.2f}s ({docs_per_sec:.1f} docs/sec)")
        
        return all_documents

# Example usage
mixed_files = [
    "small-invoice.pdf",      # 100KB
    "medium-report.pdf",      # 5MB
    "large-presentation.pdf"  # 25MB
]

documents = asyncio.run(intelligent_batching(mixed_files))

Error Handling & Retry Strategies

Production-Ready Error Handling

from cerevox import AsyncLexa, LexaError
import asyncio
import time

async def robust_processing_with_retries(files, max_retries=3):
    """Production-ready processing with intelligent retries"""
    
    async def process_with_retry(client, file, attempt=0):
        try:
            documents = await client.parse([file])
            return {'file': file, 'documents': documents, 'success': True}
            
        except LexaError as e:
            if attempt < max_retries:
                # Exponential backoff
                wait_time = (2 ** attempt)
                print(f"⏳ Retry {attempt + 1} for {file} in {wait_time}s: {e.message}")
                await asyncio.sleep(wait_time)
                return await process_with_retry(client, file, attempt + 1)
            else:
                print(f"❌ Max retries exceeded for {file}: {e.message}")
                return {'file': file, 'error': str(e), 'success': False}
                
        except Exception as e:
            print(f"💥 Unexpected error for {file}: {e}")
            return {'file': file, 'error': str(e), 'success': False}
    
    async with AsyncLexa() as client:
        # Process all files with retries
        tasks = [process_with_retry(client, file) for file in files]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # Analyze results
        successful = [r for r in results if isinstance(r, dict) and r['success']]
        failed = [r for r in results if isinstance(r, dict) and not r['success']]
        exceptions = [r for r in results if isinstance(r, Exception)]
        
        print(f"📊 Processing Results:")
        print(f"  ✅ Successful: {len(successful)}")
        print(f"  ❌ Failed: {len(failed)}")
        print(f"  💥 Exceptions: {len(exceptions)}")
        
        return successful, failed, exceptions

# Process with robust error handling
files_with_issues = ["good-doc.pdf", "corrupted-doc.pdf", "missing-doc.pdf"]
successful, failed, exceptions = asyncio.run(robust_processing_with_retries(files_with_issues))

Cost Optimization

Smart Processing Strategies

from cerevox import Lexa, ProcessingMode
import time

def cost_optimized_processing(files):
    """Optimize for cost while maintaining quality"""
    
    client = Lexa()
    
    # Strategy 1: Use DEFAULT mode for simple documents
    simple_extensions = ['.txt', '.csv', '.md']
    complex_extensions = ['.pdf', '.docx', '.pptx']
    
    simple_files = [f for f in files if any(f.endswith(ext) for ext in simple_extensions)]
    complex_files = [f for f in files if any(f.endswith(ext) for ext in complex_extensions)]
    
    all_documents = []
    
    # Process simple files with DEFAULT mode (cheaper)
    if simple_files:
        print(f"💰 Processing {len(simple_files)} simple files with DEFAULT mode")
        start_time = time.time()
        
        simple_docs = client.parse(
            simple_files,
            mode=ProcessingMode.DEFAULT  # Fast processing for most use cases
        )
        
        processing_time = time.time() - start_time
        print(f"  ✅ DEFAULT mode: {len(simple_docs)} docs in {processing_time:.2f}s")
        all_documents.extend(simple_docs)
    
    # Process complex files with ADVANCED mode (balanced cost/quality)
    if complex_files:
        print(f"📄 Processing {len(complex_files)} complex files with ADVANCED mode")
        start_time = time.time()
        
        complex_docs = client.parse(
            complex_files,
            mode=ProcessingMode.ADVANCED  # Use for complex layouts, research papers
        )
        
        processing_time = time.time() - start_time
        print(f"  ✅ ADVANCED mode: {len(complex_docs)} docs in {processing_time:.2f}s")
        all_documents.extend(complex_docs)
    
    print(f"💡 Cost optimization: Used DEFAULT mode for {len(simple_files)} files, ADVANCED for {len(complex_files)} files")
    return all_documents

# Cost optimization example
mixed_files = [
    "simple-data.txt",      # Use DEFAULT mode
    "simple-list.csv",      # Use DEFAULT mode  
    "complex-report.pdf",   # Use ADVANCED mode
    "presentation.pptx"     # Use ADVANCED mode
]

documents = cost_optimized_processing(mixed_files)

Performance Monitoring

Real-Time Performance Tracking

import time
import asyncio
from cerevox import AsyncLexa

class PerformanceMonitor:
    def __init__(self):
        self.stats = {
            'total_documents': 0,
            'total_time': 0,
            'successful_documents': 0,
            'failed_documents': 0,
            'average_doc_size': 0,
            'docs_per_second': 0
        }
        self.start_time = None
    
    def start_monitoring(self):
        self.start_time = time.time()
        print("📊 Performance monitoring started")
    
    def record_batch(self, documents, processing_time):
        self.stats['total_documents'] += len(documents)
        self.stats['total_time'] += processing_time
        self.stats['successful_documents'] += len(documents)
        
        # Calculate running averages
        if self.stats['total_time'] > 0:
            self.stats['docs_per_second'] = self.stats['total_documents'] / self.stats['total_time']
        
        # Calculate average document size
        total_content = sum(len(doc.content) for doc in documents)
        self.stats['average_doc_size'] = total_content / len(documents) if documents else 0
    
    def record_failure(self, failed_count):
        self.stats['failed_documents'] += failed_count
    
    def print_stats(self):
        print(f"\n📊 Performance Statistics:")
        print(f"  📄 Total documents: {self.stats['total_documents']}")
        print(f"  ✅ Successful: {self.stats['successful_documents']}")
        print(f"  ❌ Failed: {self.stats['failed_documents']}")
        print(f"  ⚡ Speed: {self.stats['docs_per_second']:.2f} docs/second")
        print(f"  📏 Avg doc size: {self.stats['average_doc_size']:,.0f} chars")
        print(f"  ⏱️  Total time: {self.stats['total_time']:.2f} seconds")
        
        if self.start_time:
            elapsed = time.time() - self.start_time
            print(f"  🕐 Elapsed time: {elapsed:.2f} seconds")

async def monitored_processing(files, batch_size=20):
    """Process files with performance monitoring"""
    
    monitor = PerformanceMonitor()
    monitor.start_monitoring()
    
    async with AsyncLexa() as client:
        for i in range(0, len(files), batch_size):
            batch = files[i:i + batch_size]
            
            batch_start = time.time()
            
            try:
                documents = await client.parse(batch)
                batch_time = time.time() - batch_start
                
                monitor.record_batch(documents, batch_time)
                
                print(f"✅ Batch {i//batch_size + 1}: {len(documents)} docs in {batch_time:.2f}s")
                
            except Exception as e:
                batch_time = time.time() - batch_start
                monitor.record_failure(len(batch))
                print(f"❌ Batch {i//batch_size + 1} failed: {e}")
            
            # Print stats every 5 batches
            if (i // batch_size + 1) % 5 == 0:
                monitor.print_stats()
    
    # Final statistics
    monitor.print_stats()
    return monitor.stats

# Monitor processing performance
test_files = [f"document_{i:03d}.pdf" for i in range(100)]
stats = asyncio.run(monitored_processing(test_files))

Performance Best Practices


Performance Rule: Start with async processing + DEFAULT mode + batches of 20. This gives 80% of optimal performance with minimal tuning.