Ready for production? These patterns are designed for enterprise applications processing thousands of documents.

Custom Processing Workflows

Multi-Stage Processing Pipeline

from cerevox import Lexa, ProcessingMode
import asyncio

async def classify_and_process_documents(files):
    """Classify documents first, then process with appropriate settings"""
    
    async with Lexa() as client:
        # Stage 1: Fast classification pass
        print("🔍 Stage 1: Document classification...")
        classification_docs = await client.parse(
            files,
            mode=ProcessingMode.DEFAULT  # Fast pass for classification
        )
        
        # Classify documents by type
        financial_docs = []
        legal_docs = []
        research_docs = []
        
        for i, doc in enumerate(classification_docs):
            content_sample = doc.content[:500].lower()
            
            if any(word in content_sample for word in ['invoice', 'payment', 'financial', 'amount']):
                financial_docs.append(files[i])
            elif any(word in content_sample for word in ['contract', 'agreement', 'legal', 'party']):
                legal_docs.append(files[i])
            else:
                research_docs.append(files[i])
        
        print(f"📊 Classified: {len(financial_docs)} financial, {len(legal_docs)} legal, {len(research_docs)} research")
        
        # Stage 2: Process each type with optimized settings
        all_processed = []
        
        if financial_docs:
            print("💰 Stage 2a: Processing financial documents...")
            financial_processed = await client.parse(
                financial_docs,
                mode=ProcessingMode.ADVANCED,  # More accurate but slower for financial data
                preserve_tables=True,
                extract_entities=['amounts', 'dates', 'companies']
            )
            all_processed.extend(financial_processed)
        
        if legal_docs:
            print("⚖️ Stage 2b: Processing legal documents...")
            legal_processed = await client.parse(
                legal_docs,
                mode=ProcessingMode.ADVANCED,
                preserve_structure=True,
                extract_entities=['parties', 'dates', 'terms']
            )
            all_processed.extend(legal_processed)
        
        if research_docs:
            print("📚 Stage 2c: Processing research documents...")
            research_processed = await client.parse(
                research_docs,
                mode=ProcessingMode.DEFAULT,
                preserve_citations=True,
                extract_entities=['authors', 'publications', 'data']
            )
            all_processed.extend(research_processed)
        
        print(f"✅ Pipeline complete: {len(all_processed)} documents processed")
        return all_processed

# Process with intelligent classification
files = ["invoice.pdf", "contract.docx", "research-paper.pdf"]
documents = asyncio.run(classify_and_process_documents(files))

Performance Optimization Patterns

Intelligent Batching

import os
from cerevox import Lexa
import asyncio

async def intelligent_batch_processing(files):
    """Batch files intelligently based on size and type"""
    
    def analyze_files(file_list):
        """Analyze files to create optimal batches"""
        file_info = []
        
        for file in file_list:
            if os.path.exists(file):
                size = os.path.getsize(file)
                ext = os.path.splitext(file)[1].lower()
                
                # Estimate processing complexity
                complexity = 1
                if ext in ['.pdf', '.docx']:
                    complexity = 2
                elif ext in ['.pptx', '.xlsx']:
                    complexity = 3
                
                file_info.append({
                    'file': file,
                    'size': size,
                    'complexity': complexity,
                    'estimated_time': size / (1024 * 1024) * complexity  # MB * complexity
                })
        
        return file_info
    
    def create_optimal_batches(file_info, max_batch_time=60):
        """Create batches optimized for processing time"""
        # Sort by estimated processing time
        sorted_files = sorted(file_info, key=lambda x: x['estimated_time'])
        
        batches = []
        current_batch = []
        current_time = 0
        
        for file_data in sorted_files:
            if current_time + file_data['estimated_time'] <= max_batch_time:
                current_batch.append(file_data['file'])
                current_time += file_data['estimated_time']
            else:
                if current_batch:
                    batches.append(current_batch)
                current_batch = [file_data['file']]
                current_time = file_data['estimated_time']
        
        if current_batch:
            batches.append(current_batch)
        
        return batches
    
    # Analyze and batch files
    file_info = analyze_files(files)
    batches = create_optimal_batches(file_info)
    
    print(f"📊 Created {len(batches)} optimized batches from {len(files)} files")
    
    async with Lexa() as client:
        all_documents = []
        
        for i, batch in enumerate(batches, 1):
            print(f"🔄 Processing batch {i}/{len(batches)}: {len(batch)} files")
            
            batch_start = asyncio.get_event_loop().time()
            documents = await client.parse(batch)
            batch_time = asyncio.get_event_loop().time() - batch_start
            
            print(f"✅ Batch {i} complete in {batch_time:.2f}s")
            all_documents.extend(documents)
        
        print(f"🎉 Intelligent batching complete: {len(all_documents)} documents")
        return all_documents

# Process with intelligent batching
mixed_files = [
    "small-text.txt",      # 1KB
    "medium-doc.docx",     # 500KB 
    "large-pdf.pdf",       # 5MB
    "complex-sheet.xlsx",  # 2MB
    "presentation.pptx"    # 10MB
]
documents = asyncio.run(intelligent_batch_processing(mixed_files))

Custom Content Processing

Specialized Extraction Patterns

from cerevox import Lexa
import re
import asyncio

async def extract_financial_insights(files):
    """Extract structured financial data from documents"""
    
    def extract_financial_entities(content):
        """Extract financial entities from document content"""
        
        # Currency amounts pattern
        currency_pattern = r'\$[\d,]+\.?\d*|\$\d+(?:,\d{3})*(?:\.\d{2})?'
        amounts = re.findall(currency_pattern, content)
        
        # Date patterns
        date_pattern = r'\b\d{1,2}[\/\-]\d{1,2}[\/\-]\d{2,4}\b|\b\d{4}[\/\-]\d{1,2}[\/\-]\d{1,2}\b'
        dates = re.findall(date_pattern, content)
        
        # Company names (capitalized sequences)
        company_pattern = r'\b[A-Z][a-z]+ [A-Z][a-z]+(?:\s+(?:Inc|Corp|LLC|Ltd|Co)\.?)?'
        companies = re.findall(company_pattern, content)
        
        # Invoice/Account numbers
        invoice_pattern = r'(?:Invoice|Account|Reference)\s*#?\s*:?\s*([A-Z0-9\-]+)'
        numbers = re.findall(invoice_pattern, content, re.IGNORECASE)
        
        return {
            'amounts': amounts,
            'dates': dates,
            'companies': list(set(companies)),  # Remove duplicates
            'reference_numbers': numbers
        }
    
    def analyze_financial_tables(tables):
        """Analyze tables for financial data patterns"""
        financial_tables = []
        
        for table in tables:
            table_analysis = {
                'table_index': tables.index(table),
                'rows': table.rows,
                'columns': table.columns,
                'likely_financial': False,
                'table_type': 'unknown'
            }
            
            # Analyze table content for financial indicators
            table_text = str(table.content).lower()
            
            financial_keywords = ['amount', 'total', 'subtotal', 'tax', 'payment', 'balance', 'invoice']
            financial_score = sum(1 for keyword in financial_keywords if keyword in table_text)
            
            if financial_score >= 2:
                table_analysis['likely_financial'] = True
                
                # Determine table type
                if 'invoice' in table_text:
                    table_analysis['table_type'] = 'invoice'
                elif 'payment' in table_text:
                    table_analysis['table_type'] = 'payment_schedule'
                elif 'balance' in table_text:
                    table_analysis['table_type'] = 'balance_sheet'
                else:
                    table_analysis['table_type'] = 'financial_summary'
            
            financial_tables.append(table_analysis)
        
        return financial_tables
    
    async with Lexa() as client:
        documents = await client.parse(files, mode=ProcessingMode.ADVANCED)
        
        financial_insights = []
        
        for i, doc in enumerate(documents):
            print(f"💰 Analyzing financial document {i+1}: {files[i]}")
            
            # Extract entities from text
            entities = extract_financial_entities(doc.content)
            
            # Analyze tables
            table_analysis = analyze_financial_tables(doc.tables)
            
            # Calculate financial document score
            financial_score = 0
            financial_score += len(entities['amounts']) * 2
            financial_score += len(entities['reference_numbers']) * 3
            financial_score += sum(1 for table in table_analysis if table['likely_financial']) * 5
            
            insight = {
                'source_file': files[i],
                'financial_score': financial_score,
                'entities': entities,
                'table_analysis': table_analysis,
                'total_amounts': len(entities['amounts']),
                'total_financial_tables': sum(1 for table in table_analysis if table['likely_financial']),
                'document_type': 'high_financial' if financial_score > 10 else 'low_financial'
            }
            
            financial_insights.append(insight)
            print(f"  📊 Financial score: {financial_score}")
            print(f"  💵 Found {len(entities['amounts'])} amounts")
            print(f"  📋 Found {len([t for t in table_analysis if t['likely_financial']])} financial tables")
        
        return financial_insights

# Extract financial insights
financial_files = ["invoice.pdf", "financial-statement.xlsx", "payment-report.pdf"]
insights = asyncio.run(extract_financial_insights(financial_files))

# Print summary
total_amounts = sum(len(insight['entities']['amounts']) for insight in insights)
print(f"\n💰 Financial Analysis Summary:")
print(f"📄 Documents processed: {len(insights)}")
print(f"💵 Total amounts found: {total_amounts}")
print(f"📊 High financial documents: {len([i for i in insights if i['document_type'] == 'high_financial'])}")

Enterprise Integration Patterns

Workflow Orchestration

from cerevox import Lexa
import asyncio
from datetime import datetime
import json

class DocumentWorkflow:
    def __init__(self):
        self.workflow_id = f"workflow_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
        self.stages = []
        self.results = {}
        
    async def execute_stage(self, stage_name, stage_func, *args, **kwargs):
        """Execute a workflow stage with error handling and logging"""
        
        stage_start = datetime.now()
        print(f"🔄 [{self.workflow_id}] Starting stage: {stage_name}")
        
        try:
            result = await stage_func(*args, **kwargs)
            stage_end = datetime.now()
            duration = (stage_end - stage_start).total_seconds()
            
            stage_info = {
                'name': stage_name,
                'status': 'success',
                'start_time': stage_start.isoformat(),
                'end_time': stage_end.isoformat(),
                'duration_seconds': duration,
                'result_summary': self._summarize_result(result)
            }
            
            self.stages.append(stage_info)
            self.results[stage_name] = result
            
            print(f"✅ [{self.workflow_id}] Completed stage: {stage_name} ({duration:.2f}s)")
            return result
            
        except Exception as e:
            stage_end = datetime.now()
            duration = (stage_end - stage_start).total_seconds()
            
            stage_info = {
                'name': stage_name,
                'status': 'error',
                'start_time': stage_start.isoformat(),
                'end_time': stage_end.isoformat(),
                'duration_seconds': duration,
                'error': str(e)
            }
            
            self.stages.append(stage_info)
            
            print(f"❌ [{self.workflow_id}] Failed stage: {stage_name} - {e}")
            raise e
    
    def _summarize_result(self, result):
        """Summarize stage results for logging"""
        if isinstance(result, list):
            return f"{len(result)} items"
        elif hasattr(result, '__len__'):
            return f"Length: {len(result)}"
        else:
            return str(type(result).__name__)
    
    def get_workflow_summary(self):
        """Get complete workflow summary"""
        total_duration = sum(stage['duration_seconds'] for stage in self.stages)
        successful_stages = [s for s in self.stages if s['status'] == 'success']
        
        return {
            'workflow_id': self.workflow_id,
            'total_stages': len(self.stages),
            'successful_stages': len(successful_stages),
            'total_duration_seconds': total_duration,
            'stages': self.stages
        }

async def enterprise_document_workflow(files):
    """Complete enterprise document processing workflow"""
    
    workflow = DocumentWorkflow()
    
    # Stage 1: Document Ingestion
    async def ingestion_stage(files):
        print(f"📥 Ingesting {len(files)} files...")
        # Validate files exist and are accessible
        validated_files = []
        for file in files:
            if os.path.exists(file):
                validated_files.append(file)
            else:
                print(f"⚠️ File not found: {file}")
        return validated_files
    
    # Stage 2: Document Parsing
    async def parsing_stage(files):
        async with Lexa() as client:
            documents = await client.parse(files, mode=ProcessingMode.ADVANCED)
            return documents
    
    # Stage 3: Content Analysis
    async def analysis_stage(documents):
        analyzed_docs = []
        for doc in documents:
            analysis = {
                'content_length': len(doc.content),
                'table_count': len(doc.tables),
                'image_count': len(doc.images),
                'word_count': len(doc.content.split()),
                'complexity_score': len(doc.tables) * 2 + len(doc.images) * 1.5,
                'content_type': 'complex' if len(doc.tables) > 5 else 'simple'
            }
            analyzed_docs.append({**doc.__dict__, 'analysis': analysis})
        return analyzed_docs
    
    # Stage 4: Data Transformation
    async def transformation_stage(analyzed_docs):
        transformed_data = []
        for doc_data in analyzed_docs:
            # Transform for downstream systems
            transformed = {
                'document_id': f"doc_{len(transformed_data) + 1}",
                'source_file': doc_data.get('source_file', 'unknown'),
                'processed_content': doc_data['content'][:1000],  # First 1000 chars
                'metadata': {
                    'analysis': doc_data['analysis'],
                    'processing_timestamp': datetime.now().isoformat(),
                    'workflow_id': workflow.workflow_id
                },
                'chunks': doc_data.get('text_chunks', [])[:10]  # First 10 chunks
            }
            transformed_data.append(transformed)
        return transformed_data
    
    # Stage 5: Data Export
    async def export_stage(transformed_data):
        export_file = f"workflow_results_{workflow.workflow_id}.json"
        with open(export_file, 'w') as f:
            json.dump(transformed_data, f, indent=2, default=str)
        
        return {
            'export_file': export_file,
            'exported_documents': len(transformed_data),
            'total_size_mb': os.path.getsize(export_file) / (1024 * 1024)
        }
    
    try:
        # Execute workflow stages
        validated_files = await workflow.execute_stage(
            "ingestion", ingestion_stage, files
        )
        
        documents = await workflow.execute_stage(
            "parsing", parsing_stage, validated_files
        )
        
        analyzed_docs = await workflow.execute_stage(
            "analysis", analysis_stage, documents
        )
        
        transformed_data = await workflow.execute_stage(
            "transformation", transformation_stage, analyzed_docs
        )
        
        export_result = await workflow.execute_stage(
            "export", export_stage, transformed_data
        )
        
        # Generate workflow summary
        summary = workflow.get_workflow_summary()
        
        print(f"\n🎉 Workflow Complete: {workflow.workflow_id}")
        print(f"📊 Total duration: {summary['total_duration_seconds']:.2f} seconds")
        print(f"✅ Successful stages: {summary['successful_stages']}/{summary['total_stages']}")
        print(f"📄 Exported: {export_result['exported_documents']} documents")
        print(f"💾 Export file: {export_result['export_file']}")
        
        return summary, export_result
        
    except Exception as e:
        print(f"💥 Workflow failed: {e}")
        return workflow.get_workflow_summary(), None

# Execute enterprise workflow
import os
enterprise_files = ["financial-report.pdf", "contracts.docx", "data-analysis.xlsx"]
summary, export_result = asyncio.run(enterprise_document_workflow(enterprise_files))

Enterprise Ready: These patterns are designed for production environments processing thousands of documents. Use them as templates for your own complex workflows.