Examples
Advanced Patterns
Production-ready patterns for complex document processing workflows
Ready for production? These patterns are designed for enterprise applications processing thousands of documents.
Custom Processing Workflows
Multi-Stage Processing Pipeline
Copy
from cerevox import Lexa, ProcessingMode
import asyncio
async def classify_and_process_documents(files):
"""Classify documents first, then process with appropriate settings"""
async with Lexa() as client:
# Stage 1: Fast classification pass
print("🔍 Stage 1: Document classification...")
classification_docs = await client.parse(
files,
mode=ProcessingMode.DEFAULT # Fast pass for classification
)
# Classify documents by type
financial_docs = []
legal_docs = []
research_docs = []
for i, doc in enumerate(classification_docs):
content_sample = doc.content[:500].lower()
if any(word in content_sample for word in ['invoice', 'payment', 'financial', 'amount']):
financial_docs.append(files[i])
elif any(word in content_sample for word in ['contract', 'agreement', 'legal', 'party']):
legal_docs.append(files[i])
else:
research_docs.append(files[i])
print(f"📊 Classified: {len(financial_docs)} financial, {len(legal_docs)} legal, {len(research_docs)} research")
# Stage 2: Process each type with optimized settings
all_processed = []
if financial_docs:
print("💰 Stage 2a: Processing financial documents...")
financial_processed = await client.parse(
financial_docs,
mode=ProcessingMode.ADVANCED, # More accurate but slower for financial data
preserve_tables=True,
extract_entities=['amounts', 'dates', 'companies']
)
all_processed.extend(financial_processed)
if legal_docs:
print("⚖️ Stage 2b: Processing legal documents...")
legal_processed = await client.parse(
legal_docs,
mode=ProcessingMode.ADVANCED,
preserve_structure=True,
extract_entities=['parties', 'dates', 'terms']
)
all_processed.extend(legal_processed)
if research_docs:
print("📚 Stage 2c: Processing research documents...")
research_processed = await client.parse(
research_docs,
mode=ProcessingMode.DEFAULT,
preserve_citations=True,
extract_entities=['authors', 'publications', 'data']
)
all_processed.extend(research_processed)
print(f"✅ Pipeline complete: {len(all_processed)} documents processed")
return all_processed
# Process with intelligent classification
files = ["invoice.pdf", "contract.docx", "research-paper.pdf"]
documents = asyncio.run(classify_and_process_documents(files))
Performance Optimization Patterns
Intelligent Batching
Copy
import os
from cerevox import Lexa
import asyncio
async def intelligent_batch_processing(files):
"""Batch files intelligently based on size and type"""
def analyze_files(file_list):
"""Analyze files to create optimal batches"""
file_info = []
for file in file_list:
if os.path.exists(file):
size = os.path.getsize(file)
ext = os.path.splitext(file)[1].lower()
# Estimate processing complexity
complexity = 1
if ext in ['.pdf', '.docx']:
complexity = 2
elif ext in ['.pptx', '.xlsx']:
complexity = 3
file_info.append({
'file': file,
'size': size,
'complexity': complexity,
'estimated_time': size / (1024 * 1024) * complexity # MB * complexity
})
return file_info
def create_optimal_batches(file_info, max_batch_time=60):
"""Create batches optimized for processing time"""
# Sort by estimated processing time
sorted_files = sorted(file_info, key=lambda x: x['estimated_time'])
batches = []
current_batch = []
current_time = 0
for file_data in sorted_files:
if current_time + file_data['estimated_time'] <= max_batch_time:
current_batch.append(file_data['file'])
current_time += file_data['estimated_time']
else:
if current_batch:
batches.append(current_batch)
current_batch = [file_data['file']]
current_time = file_data['estimated_time']
if current_batch:
batches.append(current_batch)
return batches
# Analyze and batch files
file_info = analyze_files(files)
batches = create_optimal_batches(file_info)
print(f"📊 Created {len(batches)} optimized batches from {len(files)} files")
async with Lexa() as client:
all_documents = []
for i, batch in enumerate(batches, 1):
print(f"🔄 Processing batch {i}/{len(batches)}: {len(batch)} files")
batch_start = asyncio.get_event_loop().time()
documents = await client.parse(batch)
batch_time = asyncio.get_event_loop().time() - batch_start
print(f"✅ Batch {i} complete in {batch_time:.2f}s")
all_documents.extend(documents)
print(f"🎉 Intelligent batching complete: {len(all_documents)} documents")
return all_documents
# Process with intelligent batching
mixed_files = [
"small-text.txt", # 1KB
"medium-doc.docx", # 500KB
"large-pdf.pdf", # 5MB
"complex-sheet.xlsx", # 2MB
"presentation.pptx" # 10MB
]
documents = asyncio.run(intelligent_batch_processing(mixed_files))
Custom Content Processing
Specialized Extraction Patterns
Copy
from cerevox import Lexa
import re
import asyncio
async def extract_financial_insights(files):
"""Extract structured financial data from documents"""
def extract_financial_entities(content):
"""Extract financial entities from document content"""
# Currency amounts pattern
currency_pattern = r'\$[\d,]+\.?\d*|\$\d+(?:,\d{3})*(?:\.\d{2})?'
amounts = re.findall(currency_pattern, content)
# Date patterns
date_pattern = r'\b\d{1,2}[\/\-]\d{1,2}[\/\-]\d{2,4}\b|\b\d{4}[\/\-]\d{1,2}[\/\-]\d{1,2}\b'
dates = re.findall(date_pattern, content)
# Company names (capitalized sequences)
company_pattern = r'\b[A-Z][a-z]+ [A-Z][a-z]+(?:\s+(?:Inc|Corp|LLC|Ltd|Co)\.?)?'
companies = re.findall(company_pattern, content)
# Invoice/Account numbers
invoice_pattern = r'(?:Invoice|Account|Reference)\s*#?\s*:?\s*([A-Z0-9\-]+)'
numbers = re.findall(invoice_pattern, content, re.IGNORECASE)
return {
'amounts': amounts,
'dates': dates,
'companies': list(set(companies)), # Remove duplicates
'reference_numbers': numbers
}
def analyze_financial_tables(tables):
"""Analyze tables for financial data patterns"""
financial_tables = []
for table in tables:
table_analysis = {
'table_index': tables.index(table),
'rows': table.rows,
'columns': table.columns,
'likely_financial': False,
'table_type': 'unknown'
}
# Analyze table content for financial indicators
table_text = str(table.content).lower()
financial_keywords = ['amount', 'total', 'subtotal', 'tax', 'payment', 'balance', 'invoice']
financial_score = sum(1 for keyword in financial_keywords if keyword in table_text)
if financial_score >= 2:
table_analysis['likely_financial'] = True
# Determine table type
if 'invoice' in table_text:
table_analysis['table_type'] = 'invoice'
elif 'payment' in table_text:
table_analysis['table_type'] = 'payment_schedule'
elif 'balance' in table_text:
table_analysis['table_type'] = 'balance_sheet'
else:
table_analysis['table_type'] = 'financial_summary'
financial_tables.append(table_analysis)
return financial_tables
async with Lexa() as client:
documents = await client.parse(files, mode=ProcessingMode.ADVANCED)
financial_insights = []
for i, doc in enumerate(documents):
print(f"💰 Analyzing financial document {i+1}: {files[i]}")
# Extract entities from text
entities = extract_financial_entities(doc.content)
# Analyze tables
table_analysis = analyze_financial_tables(doc.tables)
# Calculate financial document score
financial_score = 0
financial_score += len(entities['amounts']) * 2
financial_score += len(entities['reference_numbers']) * 3
financial_score += sum(1 for table in table_analysis if table['likely_financial']) * 5
insight = {
'source_file': files[i],
'financial_score': financial_score,
'entities': entities,
'table_analysis': table_analysis,
'total_amounts': len(entities['amounts']),
'total_financial_tables': sum(1 for table in table_analysis if table['likely_financial']),
'document_type': 'high_financial' if financial_score > 10 else 'low_financial'
}
financial_insights.append(insight)
print(f" 📊 Financial score: {financial_score}")
print(f" 💵 Found {len(entities['amounts'])} amounts")
print(f" 📋 Found {len([t for t in table_analysis if t['likely_financial']])} financial tables")
return financial_insights
# Extract financial insights
financial_files = ["invoice.pdf", "financial-statement.xlsx", "payment-report.pdf"]
insights = asyncio.run(extract_financial_insights(financial_files))
# Print summary
total_amounts = sum(len(insight['entities']['amounts']) for insight in insights)
print(f"\n💰 Financial Analysis Summary:")
print(f"📄 Documents processed: {len(insights)}")
print(f"💵 Total amounts found: {total_amounts}")
print(f"📊 High financial documents: {len([i for i in insights if i['document_type'] == 'high_financial'])}")
Enterprise Integration Patterns
Workflow Orchestration
Copy
from cerevox import Lexa
import asyncio
from datetime import datetime
import json
class DocumentWorkflow:
def __init__(self):
self.workflow_id = f"workflow_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
self.stages = []
self.results = {}
async def execute_stage(self, stage_name, stage_func, *args, **kwargs):
"""Execute a workflow stage with error handling and logging"""
stage_start = datetime.now()
print(f"🔄 [{self.workflow_id}] Starting stage: {stage_name}")
try:
result = await stage_func(*args, **kwargs)
stage_end = datetime.now()
duration = (stage_end - stage_start).total_seconds()
stage_info = {
'name': stage_name,
'status': 'success',
'start_time': stage_start.isoformat(),
'end_time': stage_end.isoformat(),
'duration_seconds': duration,
'result_summary': self._summarize_result(result)
}
self.stages.append(stage_info)
self.results[stage_name] = result
print(f"✅ [{self.workflow_id}] Completed stage: {stage_name} ({duration:.2f}s)")
return result
except Exception as e:
stage_end = datetime.now()
duration = (stage_end - stage_start).total_seconds()
stage_info = {
'name': stage_name,
'status': 'error',
'start_time': stage_start.isoformat(),
'end_time': stage_end.isoformat(),
'duration_seconds': duration,
'error': str(e)
}
self.stages.append(stage_info)
print(f"❌ [{self.workflow_id}] Failed stage: {stage_name} - {e}")
raise e
def _summarize_result(self, result):
"""Summarize stage results for logging"""
if isinstance(result, list):
return f"{len(result)} items"
elif hasattr(result, '__len__'):
return f"Length: {len(result)}"
else:
return str(type(result).__name__)
def get_workflow_summary(self):
"""Get complete workflow summary"""
total_duration = sum(stage['duration_seconds'] for stage in self.stages)
successful_stages = [s for s in self.stages if s['status'] == 'success']
return {
'workflow_id': self.workflow_id,
'total_stages': len(self.stages),
'successful_stages': len(successful_stages),
'total_duration_seconds': total_duration,
'stages': self.stages
}
async def enterprise_document_workflow(files):
"""Complete enterprise document processing workflow"""
workflow = DocumentWorkflow()
# Stage 1: Document Ingestion
async def ingestion_stage(files):
print(f"📥 Ingesting {len(files)} files...")
# Validate files exist and are accessible
validated_files = []
for file in files:
if os.path.exists(file):
validated_files.append(file)
else:
print(f"⚠️ File not found: {file}")
return validated_files
# Stage 2: Document Parsing
async def parsing_stage(files):
async with Lexa() as client:
documents = await client.parse(files, mode=ProcessingMode.ADVANCED)
return documents
# Stage 3: Content Analysis
async def analysis_stage(documents):
analyzed_docs = []
for doc in documents:
analysis = {
'content_length': len(doc.content),
'table_count': len(doc.tables),
'image_count': len(doc.images),
'word_count': len(doc.content.split()),
'complexity_score': len(doc.tables) * 2 + len(doc.images) * 1.5,
'content_type': 'complex' if len(doc.tables) > 5 else 'simple'
}
analyzed_docs.append({**doc.__dict__, 'analysis': analysis})
return analyzed_docs
# Stage 4: Data Transformation
async def transformation_stage(analyzed_docs):
transformed_data = []
for doc_data in analyzed_docs:
# Transform for downstream systems
transformed = {
'document_id': f"doc_{len(transformed_data) + 1}",
'source_file': doc_data.get('source_file', 'unknown'),
'processed_content': doc_data['content'][:1000], # First 1000 chars
'metadata': {
'analysis': doc_data['analysis'],
'processing_timestamp': datetime.now().isoformat(),
'workflow_id': workflow.workflow_id
},
'chunks': doc_data.get('text_chunks', [])[:10] # First 10 chunks
}
transformed_data.append(transformed)
return transformed_data
# Stage 5: Data Export
async def export_stage(transformed_data):
export_file = f"workflow_results_{workflow.workflow_id}.json"
with open(export_file, 'w') as f:
json.dump(transformed_data, f, indent=2, default=str)
return {
'export_file': export_file,
'exported_documents': len(transformed_data),
'total_size_mb': os.path.getsize(export_file) / (1024 * 1024)
}
try:
# Execute workflow stages
validated_files = await workflow.execute_stage(
"ingestion", ingestion_stage, files
)
documents = await workflow.execute_stage(
"parsing", parsing_stage, validated_files
)
analyzed_docs = await workflow.execute_stage(
"analysis", analysis_stage, documents
)
transformed_data = await workflow.execute_stage(
"transformation", transformation_stage, analyzed_docs
)
export_result = await workflow.execute_stage(
"export", export_stage, transformed_data
)
# Generate workflow summary
summary = workflow.get_workflow_summary()
print(f"\n🎉 Workflow Complete: {workflow.workflow_id}")
print(f"📊 Total duration: {summary['total_duration_seconds']:.2f} seconds")
print(f"✅ Successful stages: {summary['successful_stages']}/{summary['total_stages']}")
print(f"📄 Exported: {export_result['exported_documents']} documents")
print(f"💾 Export file: {export_result['export_file']}")
return summary, export_result
except Exception as e:
print(f"💥 Workflow failed: {e}")
return workflow.get_workflow_summary(), None
# Execute enterprise workflow
import os
enterprise_files = ["financial-report.pdf", "contracts.docx", "data-analysis.xlsx"]
summary, export_result = asyncio.run(enterprise_document_workflow(enterprise_files))
Enterprise Ready: These patterns are designed for production environments processing thousands of documents. Use them as templates for your own complex workflows.
Assistant
Responses are generated using AI and may contain mistakes.