Why Async? Process 100 documents in the time it takes to process 10 synchronously. Essential for high-volume applications.

Getting Started with Async

Your First Async Parse

import asyncio
from cerevox import AsyncLexa

async def main():
    async with AsyncLexa() as client:  # Uses CEREVOX_API_KEY
        documents = await client.parse("document.pdf")
        print(f"✅ Async parsing complete: {len(documents[0].content)} chars")

# Run it
asyncio.run(main())

Real-World Performance Examples

High-Volume Document Processing

import asyncio
from cerevox import AsyncLexa, ProcessingMode

async def process_financial_documents():
    """Process hundreds of financial documents efficiently"""
    
    # Financial documents that need processing
    financial_docs = [
        "invoices/batch_1/*.pdf",      # 100+ invoices
        "statements/q1_2024/*.pdf",    # Bank statements 
        "contracts/2024/*.docx",       # Legal contracts
        "reports/financial/*.xlsx"     # Financial reports
    ]
    
    async with AsyncLexa() as client:
        # Process all document types concurrently
        start_time = asyncio.get_event_loop().time()
        
        documents = await client.parse(
            financial_docs,
            mode=ProcessingMode.ADVANCED,  # More accurate but slower for financial data
            timeout=600.0  # 10 minute timeout for large batches
        )
        
        processing_time = asyncio.get_event_loop().time() - start_time
        
        print(f"✅ Processed {len(documents)} financial documents")
        print(f"⚡ Processing time: {processing_time:.2f} seconds")
        print(f"📊 Average: {processing_time/len(documents):.2f} seconds per document")
        
        # Extract structured financial data
        total_tables = sum(len(doc.tables) for doc in documents)
        print(f"💰 Extracted {total_tables} financial tables")
        
        return documents

# Process financial documents at scale
documents = asyncio.run(process_financial_documents())

RAG System Document Processing

import asyncio
from cerevox import AsyncLexa

async def build_knowledge_base():
    """Process documents for RAG knowledge base"""
    
    knowledge_docs = [
        "knowledge_base/product_docs/*.pdf",
        "knowledge_base/user_manuals/*.docx", 
        "knowledge_base/faqs/*.html",
        "knowledge_base/support_articles/*.md"
    ]
    
    async with AsyncLexa() as client:
        # Process all knowledge base documents
        documents = await client.parse(knowledge_docs)
        
        # Generate RAG-optimized chunks
        rag_chunks = []
        for doc in documents:
            chunks = doc.get_text_chunks(
                target_size=500,        # Perfect for embeddings
                overlap_size=50,        # Prevent context loss
                include_metadata=True   # Rich metadata for retrieval
            )
            rag_chunks.extend(chunks)
        
        print(f"📚 Processed knowledge base: {len(documents)} documents")
        print(f"🔗 Generated {len(rag_chunks)} RAG chunks")
        print(f"💾 Ready for vector database: {sum(len(chunk.content) for chunk in rag_chunks)} total characters")
        
        # Each chunk is ready for your vector database
        return rag_chunks

# Build your RAG knowledge base
rag_chunks = asyncio.run(build_knowledge_base())

# Ready for vector database insertion
print(f"✅ {len(rag_chunks)} chunks ready for embedding and storage")

Controlled Concurrency Patterns

Production-Grade Concurrency Control

import asyncio
from cerevox import AsyncLexa, LexaError

async def process_with_concurrency_limit(files, max_concurrent=5):
    """Process files with controlled concurrency - prevents overwhelming the API"""
    
    semaphore = asyncio.Semaphore(max_concurrent)
    results = []
    
    async def process_single_file(client, file):
        async with semaphore:  # Limit concurrent operations
            try:
                documents = await client.parse([file])
                print(f"✅ Processed: {file}")
                return documents[0] if documents else None
            except LexaError as e:
                print(f"❌ Failed {file}: {e.message}")
                return None
    
    async with AsyncLexa() as client:
        # Create tasks for all files
        tasks = [process_single_file(client, file) for file in files]
        
        # Process with controlled concurrency
        completed_results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # Filter successful results
        successful_docs = [r for r in completed_results if r and not isinstance(r, Exception)]
        
        print(f"✅ Successfully processed {len(successful_docs)}/{len(files)} files")
        return successful_docs

# Process large document sets safely
files = [f"documents/batch_{i}.pdf" for i in range(100)]
documents = asyncio.run(process_with_concurrency_limit(files, max_concurrent=10))

Advanced Async Patterns

Progress Monitoring & Real-time Updates

import asyncio
from cerevox import AsyncLexa

async def process_with_realtime_progress(files):
    """Process files with real-time progress updates"""
    
    progress_data = {
        'total': len(files),
        'completed': 0,
        'failed': 0,
        'in_progress': 0
    }
    
    def update_progress(status, file_name):
        """Update progress based on status"""
        if status == 'started':
            progress_data['in_progress'] += 1
        elif status == 'completed':
            progress_data['in_progress'] -= 1
            progress_data['completed'] += 1
        elif status == 'failed':
            progress_data['in_progress'] -= 1
            progress_data['failed'] += 1
        
        # Print progress bar
        total = progress_data['total']
        completed = progress_data['completed']
        failed = progress_data['failed']
        in_progress = progress_data['in_progress']
        
        progress_pct = (completed + failed) / total * 100
        print(f"\r📊 Progress: {progress_pct:.1f}% | ✅ {completed} | ❌ {failed} | 🔄 {in_progress}", end='')
    
    async def process_single_with_progress(client, file):
        update_progress('started', file)
        try:
            documents = await client.parse([file])
            update_progress('completed', file)
            return documents[0] if documents else None
        except Exception as e:
            update_progress('failed', file)
            return None
    
    async with AsyncLexa() as client:
        # Process all files with progress tracking
        tasks = [process_single_with_progress(client, file) for file in files]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        print()  # New line after progress bar
        successful_docs = [r for r in results if r and not isinstance(r, Exception)]
        
        print(f"🎉 Processing complete!")
        print(f"✅ Successful: {len(successful_docs)}")
        print(f"❌ Failed: {len(files) - len(successful_docs)}")
        
        return successful_docs

# Process with real-time progress
files = [f"documents/file_{i}.pdf" for i in range(50)]
documents = asyncio.run(process_with_realtime_progress(files))

Integration with Web Frameworks

FastAPI Integration

from fastapi import FastAPI, UploadFile, File, BackgroundTasks
from cerevox import AsyncLexa
import asyncio

app = FastAPI()

# Global client (reuse connection)
lexa_client = None

@app.on_event("startup")
async def startup_event():
    global lexa_client
    lexa_client = AsyncLexa()

@app.on_event("shutdown") 
async def shutdown_event():
    if lexa_client:
        await lexa_client.close()

@app.post("/parse-documents/")
async def parse_documents(files: list[UploadFile] = File(...)):
    """Parse uploaded documents asynchronously"""
    
    # Read file contents
    file_contents = []
    for file in files:
        content = await file.read()
        file_contents.append(content)
    
    # Parse documents concurrently
    documents = await lexa_client.parse(file_contents)
    
    # Return structured results
    results = []
    for i, doc in enumerate(documents):
        results.append({
            'filename': files[i].filename,
            'content_length': len(doc.content),
            'tables': len(doc.tables),
            'images': len(doc.images),
            'content_preview': doc.content[:200]
        })
    
    return {
        'status': 'success',
        'processed': len(results),
        'results': results
    }

# Run with: uvicorn main:app --reload

Performance Tip: Async processing is 10x faster for multiple documents. Always use async in production for document batches larger than 5 files.