Guides
Performance Optimization
Maximize Lexa performance for high-volume document processing
Performance Goal: Process 1000+ documents efficiently while maintaining accuracy and minimizing costs.
Quick Performance Wins
Processing Mode Optimization
Copy
from cerevox import Lexa, ProcessingMode
client = Lexa()
# ✅ DEFAULT mode - fast and efficient for most documents
documents = client.parse(
"documents/*.pdf",
mode=ProcessingMode.DEFAULT # Fast processing for most use cases
)
# ✅ ADVANCED mode - maximum accuracy for complex documents
documents = client.parse(
"complex-research-papers/*.pdf",
mode=ProcessingMode.ADVANCED # Use for complex layouts, research papers
)
print("💡 Rule: Start with DEFAULT, use ADVANCED for complex docs requiring maximum accuracy")
Async Processing (10x Faster)
Copy
import asyncio
import time
from cerevox import Lexa, AsyncLexa
def sync_processing_slow(files):
"""Slow synchronous processing"""
client = Lexa()
start_time = time.time()
all_documents = []
for file in files:
documents = client.parse([file]) # One at a time
all_documents.extend(documents)
end_time = time.time()
print(f"😴 Sync processing: {end_time - start_time:.2f} seconds")
return all_documents
async def async_processing_fast(files):
"""Fast asynchronous processing"""
async with AsyncLexa() as client:
start_time = time.time()
# Process all files concurrently
documents = await client.parse(files)
end_time = time.time()
print(f"🚀 Async processing: {end_time - start_time:.2f} seconds")
return documents
# Performance comparison
files = [f"document_{i}.pdf" for i in range(20)]
# Sync: ~100 seconds for 20 files
sync_docs = sync_processing_slow(files)
# Async: ~10 seconds for 20 files (10x faster!)
async_docs = asyncio.run(async_processing_fast(files))
print("💡 Always use async for multiple documents!")
Batch Processing Strategies
Intelligent Batching
Copy
import os
from cerevox import AsyncLexa
import asyncio
async def intelligent_batching(files):
"""Batch files based on size for optimal performance"""
def analyze_files(file_list):
file_info = []
for file in file_list:
if os.path.exists(file):
size_mb = os.path.getsize(file) / (1024 * 1024)
# Categorize by size
if size_mb < 1:
category = 'small'
batch_size = 50 # Small files: large batches
elif size_mb < 10:
category = 'medium'
batch_size = 20 # Medium files: moderate batches
else:
category = 'large'
batch_size = 5 # Large files: small batches
file_info.append({
'file': file,
'size_mb': size_mb,
'category': category,
'batch_size': batch_size
})
return file_info
# Analyze and group files
file_info = analyze_files(files)
# Group by category
categories = {}
for info in file_info:
category = info['category']
if category not in categories:
categories[category] = []
categories[category].append(info['file'])
async with AsyncLexa() as client:
all_documents = []
for category, category_files in categories.items():
batch_size = file_info[0]['batch_size'] if file_info else 20
print(f"📋 Processing {len(category_files)} {category} files in batches of {batch_size}")
for i in range(0, len(category_files), batch_size):
batch = category_files[i:i + batch_size]
start_time = time.time()
documents = await client.parse(batch)
batch_time = time.time() - start_time
all_documents.extend(documents)
docs_per_sec = len(documents) / batch_time
print(f" ✅ {category} batch: {len(documents)} docs in {batch_time:.2f}s ({docs_per_sec:.1f} docs/sec)")
return all_documents
# Example usage
mixed_files = [
"small-invoice.pdf", # 100KB
"medium-report.pdf", # 5MB
"large-presentation.pdf" # 25MB
]
documents = asyncio.run(intelligent_batching(mixed_files))
Error Handling & Retry Strategies
Production-Ready Error Handling
Copy
from cerevox import AsyncLexa, LexaError
import asyncio
import time
async def robust_processing_with_retries(files, max_retries=3):
"""Production-ready processing with intelligent retries"""
async def process_with_retry(client, file, attempt=0):
try:
documents = await client.parse([file])
return {'file': file, 'documents': documents, 'success': True}
except LexaError as e:
if attempt < max_retries:
# Exponential backoff
wait_time = (2 ** attempt)
print(f"⏳ Retry {attempt + 1} for {file} in {wait_time}s: {e.message}")
await asyncio.sleep(wait_time)
return await process_with_retry(client, file, attempt + 1)
else:
print(f"❌ Max retries exceeded for {file}: {e.message}")
return {'file': file, 'error': str(e), 'success': False}
except Exception as e:
print(f"💥 Unexpected error for {file}: {e}")
return {'file': file, 'error': str(e), 'success': False}
async with AsyncLexa() as client:
# Process all files with retries
tasks = [process_with_retry(client, file) for file in files]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Analyze results
successful = [r for r in results if isinstance(r, dict) and r['success']]
failed = [r for r in results if isinstance(r, dict) and not r['success']]
exceptions = [r for r in results if isinstance(r, Exception)]
print(f"📊 Processing Results:")
print(f" ✅ Successful: {len(successful)}")
print(f" ❌ Failed: {len(failed)}")
print(f" 💥 Exceptions: {len(exceptions)}")
return successful, failed, exceptions
# Process with robust error handling
files_with_issues = ["good-doc.pdf", "corrupted-doc.pdf", "missing-doc.pdf"]
successful, failed, exceptions = asyncio.run(robust_processing_with_retries(files_with_issues))
Cost Optimization
Smart Processing Strategies
Copy
from cerevox import Lexa, ProcessingMode
import time
def cost_optimized_processing(files):
"""Optimize for cost while maintaining quality"""
client = Lexa()
# Strategy 1: Use DEFAULT mode for simple documents
simple_extensions = ['.txt', '.csv', '.md']
complex_extensions = ['.pdf', '.docx', '.pptx']
simple_files = [f for f in files if any(f.endswith(ext) for ext in simple_extensions)]
complex_files = [f for f in files if any(f.endswith(ext) for ext in complex_extensions)]
all_documents = []
# Process simple files with DEFAULT mode (cheaper)
if simple_files:
print(f"💰 Processing {len(simple_files)} simple files with DEFAULT mode")
start_time = time.time()
simple_docs = client.parse(
simple_files,
mode=ProcessingMode.DEFAULT # Fast processing for most use cases
)
processing_time = time.time() - start_time
print(f" ✅ DEFAULT mode: {len(simple_docs)} docs in {processing_time:.2f}s")
all_documents.extend(simple_docs)
# Process complex files with ADVANCED mode (balanced cost/quality)
if complex_files:
print(f"📄 Processing {len(complex_files)} complex files with ADVANCED mode")
start_time = time.time()
complex_docs = client.parse(
complex_files,
mode=ProcessingMode.ADVANCED # Use for complex layouts, research papers
)
processing_time = time.time() - start_time
print(f" ✅ ADVANCED mode: {len(complex_docs)} docs in {processing_time:.2f}s")
all_documents.extend(complex_docs)
print(f"💡 Cost optimization: Used DEFAULT mode for {len(simple_files)} files, ADVANCED for {len(complex_files)} files")
return all_documents
# Cost optimization example
mixed_files = [
"simple-data.txt", # Use DEFAULT mode
"simple-list.csv", # Use DEFAULT mode
"complex-report.pdf", # Use ADVANCED mode
"presentation.pptx" # Use ADVANCED mode
]
documents = cost_optimized_processing(mixed_files)
Performance Monitoring
Real-Time Performance Tracking
Copy
import time
import asyncio
from cerevox import AsyncLexa
class PerformanceMonitor:
def __init__(self):
self.stats = {
'total_documents': 0,
'total_time': 0,
'successful_documents': 0,
'failed_documents': 0,
'average_doc_size': 0,
'docs_per_second': 0
}
self.start_time = None
def start_monitoring(self):
self.start_time = time.time()
print("📊 Performance monitoring started")
def record_batch(self, documents, processing_time):
self.stats['total_documents'] += len(documents)
self.stats['total_time'] += processing_time
self.stats['successful_documents'] += len(documents)
# Calculate running averages
if self.stats['total_time'] > 0:
self.stats['docs_per_second'] = self.stats['total_documents'] / self.stats['total_time']
# Calculate average document size
total_content = sum(len(doc.content) for doc in documents)
self.stats['average_doc_size'] = total_content / len(documents) if documents else 0
def record_failure(self, failed_count):
self.stats['failed_documents'] += failed_count
def print_stats(self):
print(f"\n📊 Performance Statistics:")
print(f" 📄 Total documents: {self.stats['total_documents']}")
print(f" ✅ Successful: {self.stats['successful_documents']}")
print(f" ❌ Failed: {self.stats['failed_documents']}")
print(f" ⚡ Speed: {self.stats['docs_per_second']:.2f} docs/second")
print(f" 📏 Avg doc size: {self.stats['average_doc_size']:,.0f} chars")
print(f" ⏱️ Total time: {self.stats['total_time']:.2f} seconds")
if self.start_time:
elapsed = time.time() - self.start_time
print(f" 🕐 Elapsed time: {elapsed:.2f} seconds")
async def monitored_processing(files, batch_size=20):
"""Process files with performance monitoring"""
monitor = PerformanceMonitor()
monitor.start_monitoring()
async with AsyncLexa() as client:
for i in range(0, len(files), batch_size):
batch = files[i:i + batch_size]
batch_start = time.time()
try:
documents = await client.parse(batch)
batch_time = time.time() - batch_start
monitor.record_batch(documents, batch_time)
print(f"✅ Batch {i//batch_size + 1}: {len(documents)} docs in {batch_time:.2f}s")
except Exception as e:
batch_time = time.time() - batch_start
monitor.record_failure(len(batch))
print(f"❌ Batch {i//batch_size + 1} failed: {e}")
# Print stats every 5 batches
if (i // batch_size + 1) % 5 == 0:
monitor.print_stats()
# Final statistics
monitor.print_stats()
return monitor.stats
# Monitor processing performance
test_files = [f"document_{i:03d}.pdf" for i in range(100)]
stats = asyncio.run(monitored_processing(test_files))
Performance Best Practices
- DEFAULT mode: Fast processing for most use cases
- ADVANCED mode: Maximum accuracy for complex documents
- Sweet spot: 5-10 concurrent requests for most use cases
- Small files: Can handle 15-20 concurrent requests
- Large files: Reduce to 3-5 concurrent requests
- Monitor: Watch for rate limiting and adjust accordingly
- Small files (< 1MB): Batches of 30-50 files
- Medium files (1-10MB): Batches of 10-20 files
- Large files (>10MB): Batches of 3-5 files
- Mixed sizes: Group by size before batching
- Process large datasets in chunks (100-500 files per chunk)
- Clear document variables after processing
- Use garbage collection for long-running processes
- Save results immediately, don’t accumulate in memory
Performance Rule: Start with async processing + DEFAULT mode + batches of 20. This gives 80% of optimal performance with minimal tuning.
On this page
- Quick Performance Wins
- Processing Mode Optimization
- Async Processing (10x Faster)
- Batch Processing Strategies
- Intelligent Batching
- Error Handling & Retry Strategies
- Production-Ready Error Handling
- Cost Optimization
- Smart Processing Strategies
- Performance Monitoring
- Real-Time Performance Tracking
- Performance Best Practices
Assistant
Responses are generated using AI and may contain mistakes.