Examples
Async Processing
Process multiple documents concurrently - 10x faster than sync
Why Async? Process 100 documents in the time it takes to process 10 synchronously. Essential for high-volume applications.
Getting Started with Async
Your First Async Parse
Copy
import asyncio
from cerevox import AsyncLexa
async def main():
async with AsyncLexa() as client: # Uses CEREVOX_API_KEY
documents = await client.parse("document.pdf")
print(f"✅ Async parsing complete: {len(documents[0].content)} chars")
# Run it
asyncio.run(main())
Real-World Performance Examples
High-Volume Document Processing
Copy
import asyncio
from cerevox import AsyncLexa, ProcessingMode
async def process_financial_documents():
"""Process hundreds of financial documents efficiently"""
# Financial documents that need processing
financial_docs = [
"invoices/batch_1/*.pdf", # 100+ invoices
"statements/q1_2024/*.pdf", # Bank statements
"contracts/2024/*.docx", # Legal contracts
"reports/financial/*.xlsx" # Financial reports
]
async with AsyncLexa() as client:
# Process all document types concurrently
start_time = asyncio.get_event_loop().time()
documents = await client.parse(
financial_docs,
mode=ProcessingMode.ADVANCED, # More accurate but slower for financial data
timeout=600.0 # 10 minute timeout for large batches
)
processing_time = asyncio.get_event_loop().time() - start_time
print(f"✅ Processed {len(documents)} financial documents")
print(f"⚡ Processing time: {processing_time:.2f} seconds")
print(f"📊 Average: {processing_time/len(documents):.2f} seconds per document")
# Extract structured financial data
total_tables = sum(len(doc.tables) for doc in documents)
print(f"💰 Extracted {total_tables} financial tables")
return documents
# Process financial documents at scale
documents = asyncio.run(process_financial_documents())
RAG System Document Processing
Copy
import asyncio
from cerevox import AsyncLexa
async def build_knowledge_base():
"""Process documents for RAG knowledge base"""
knowledge_docs = [
"knowledge_base/product_docs/*.pdf",
"knowledge_base/user_manuals/*.docx",
"knowledge_base/faqs/*.html",
"knowledge_base/support_articles/*.md"
]
async with AsyncLexa() as client:
# Process all knowledge base documents
documents = await client.parse(knowledge_docs)
# Generate RAG-optimized chunks
rag_chunks = []
for doc in documents:
chunks = doc.get_text_chunks(
target_size=500, # Perfect for embeddings
overlap_size=50, # Prevent context loss
include_metadata=True # Rich metadata for retrieval
)
rag_chunks.extend(chunks)
print(f"📚 Processed knowledge base: {len(documents)} documents")
print(f"🔗 Generated {len(rag_chunks)} RAG chunks")
print(f"💾 Ready for vector database: {sum(len(chunk.content) for chunk in rag_chunks)} total characters")
# Each chunk is ready for your vector database
return rag_chunks
# Build your RAG knowledge base
rag_chunks = asyncio.run(build_knowledge_base())
# Ready for vector database insertion
print(f"✅ {len(rag_chunks)} chunks ready for embedding and storage")
Controlled Concurrency Patterns
Production-Grade Concurrency Control
Copy
import asyncio
from cerevox import AsyncLexa, LexaError
async def process_with_concurrency_limit(files, max_concurrent=5):
"""Process files with controlled concurrency - prevents overwhelming the API"""
semaphore = asyncio.Semaphore(max_concurrent)
results = []
async def process_single_file(client, file):
async with semaphore: # Limit concurrent operations
try:
documents = await client.parse([file])
print(f"✅ Processed: {file}")
return documents[0] if documents else None
except LexaError as e:
print(f"❌ Failed {file}: {e.message}")
return None
async with AsyncLexa() as client:
# Create tasks for all files
tasks = [process_single_file(client, file) for file in files]
# Process with controlled concurrency
completed_results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter successful results
successful_docs = [r for r in completed_results if r and not isinstance(r, Exception)]
print(f"✅ Successfully processed {len(successful_docs)}/{len(files)} files")
return successful_docs
# Process large document sets safely
files = [f"documents/batch_{i}.pdf" for i in range(100)]
documents = asyncio.run(process_with_concurrency_limit(files, max_concurrent=10))
Advanced Async Patterns
Progress Monitoring & Real-time Updates
Copy
import asyncio
from cerevox import AsyncLexa
async def process_with_realtime_progress(files):
"""Process files with real-time progress updates"""
progress_data = {
'total': len(files),
'completed': 0,
'failed': 0,
'in_progress': 0
}
def update_progress(status, file_name):
"""Update progress based on status"""
if status == 'started':
progress_data['in_progress'] += 1
elif status == 'completed':
progress_data['in_progress'] -= 1
progress_data['completed'] += 1
elif status == 'failed':
progress_data['in_progress'] -= 1
progress_data['failed'] += 1
# Print progress bar
total = progress_data['total']
completed = progress_data['completed']
failed = progress_data['failed']
in_progress = progress_data['in_progress']
progress_pct = (completed + failed) / total * 100
print(f"\r📊 Progress: {progress_pct:.1f}% | ✅ {completed} | ❌ {failed} | 🔄 {in_progress}", end='')
async def process_single_with_progress(client, file):
update_progress('started', file)
try:
documents = await client.parse([file])
update_progress('completed', file)
return documents[0] if documents else None
except Exception as e:
update_progress('failed', file)
return None
async with AsyncLexa() as client:
# Process all files with progress tracking
tasks = [process_single_with_progress(client, file) for file in files]
results = await asyncio.gather(*tasks, return_exceptions=True)
print() # New line after progress bar
successful_docs = [r for r in results if r and not isinstance(r, Exception)]
print(f"🎉 Processing complete!")
print(f"✅ Successful: {len(successful_docs)}")
print(f"❌ Failed: {len(files) - len(successful_docs)}")
return successful_docs
# Process with real-time progress
files = [f"documents/file_{i}.pdf" for i in range(50)]
documents = asyncio.run(process_with_realtime_progress(files))
Integration with Web Frameworks
FastAPI Integration
Copy
from fastapi import FastAPI, UploadFile, File, BackgroundTasks
from cerevox import AsyncLexa
import asyncio
app = FastAPI()
# Global client (reuse connection)
lexa_client = None
@app.on_event("startup")
async def startup_event():
global lexa_client
lexa_client = AsyncLexa()
@app.on_event("shutdown")
async def shutdown_event():
if lexa_client:
await lexa_client.close()
@app.post("/parse-documents/")
async def parse_documents(files: list[UploadFile] = File(...)):
"""Parse uploaded documents asynchronously"""
# Read file contents
file_contents = []
for file in files:
content = await file.read()
file_contents.append(content)
# Parse documents concurrently
documents = await lexa_client.parse(file_contents)
# Return structured results
results = []
for i, doc in enumerate(documents):
results.append({
'filename': files[i].filename,
'content_length': len(doc.content),
'tables': len(doc.tables),
'images': len(doc.images),
'content_preview': doc.content[:200]
})
return {
'status': 'success',
'processed': len(results),
'results': results
}
# Run with: uvicorn main:app --reload
Performance Tip: Async processing is 10x faster for multiple documents. Always use async in production for document batches larger than 5 files.
On this page
- Getting Started with Async
- Your First Async Parse
- Real-World Performance Examples
- High-Volume Document Processing
- RAG System Document Processing
- Controlled Concurrency Patterns
- Production-Grade Concurrency Control
- Advanced Async Patterns
- Progress Monitoring & Real-time Updates
- Integration with Web Frameworks
- FastAPI Integration
Assistant
Responses are generated using AI and may contain mistakes.