Documentation Index Fetch the complete documentation index at: https://docs.cerevox.ai/llms.txt
Use this file to discover all available pages before exploring further.
Financial Document Analysis with Lexa
Transform complex financial documents into structured, analyzable data in seconds. Lexa’s AI-powered parsing extracts critical financial metrics, tables, and insights with highest accuracy.
Why Lexa for Financial Analysis?
99.8% Table Accuracy Preserve complex financial tables and calculations
Regulatory Compliance SOC 2 compliant with enterprise security
Multi-Format Support Process 10-K filings, earnings reports, and prospectuses
Real-Time Processing Extract insights from 100+ page reports in under 30 seconds
Supported Financial Documents
SEC Filings (10-K, 10-Q, 8-K, proxy statements)
Earnings Reports and quarterly statements
Annual Reports with complex layouts
Financial Statements (income, balance sheet, cash flow)
Research Reports from analysts
Prospectuses and offering memoranda
Credit Reports and risk assessments
Quick Start: Parse SEC 10-K Filing
Transform a complex SEC filing into structured data:
Sync Example
Async Example (Recommended)
from cerevox import Lexa
# Initialize client
client = Lexa( api_key = "your-api-key" )
# Parse SEC 10-K filing
documents = client.parse( "tesla_10k.pdf" )
doc = documents[ 0 ]
# Extract financial metrics
revenue_chunks = doc.search_content( "revenue|total revenue" )
profit_chunks = doc.search_content( "net income|profit" )
print ( f "Document: { doc.filename } " )
print ( f "Pages: { doc.total_pages } " )
print ( f "Tables found: { len (doc.tables) } " )
print ( f "Financial metrics extracted: { len (revenue_chunks + profit_chunks) } " )
Advanced Financial Analysis Patterns
Process complex financial statements and preserve structure:
from cerevox import AsyncLexa
import pandas as pd
async def extract_financial_tables ( filing_path ):
async with AsyncLexa( api_key = "your-api-key" ) as client:
documents = await client.parse(filing_path)
doc = documents[ 0 ]
# Convert tables to pandas DataFrames
financial_tables = []
for table in doc.tables:
try :
# Convert table to structured data
df = pd.DataFrame(table.to_dict())
# Identify table type based on content
table_type = identify_table_type(df)
financial_tables.append({
'type' : table_type,
'data' : df,
'page' : table.page_number,
'rows' : len (df),
'columns' : len (df.columns)
})
except Exception as e:
print ( f "Table processing error: { e } " )
continue
return financial_tables
def identify_table_type ( df ):
"""Identify financial table type based on content"""
columns_text = ' ' .join(df.columns.astype( str )).lower()
if 'revenue' in columns_text or 'income' in columns_text:
return 'income_statement'
elif 'assets' in columns_text or 'liabilities' in columns_text:
return 'balance_sheet'
elif 'cash flow' in columns_text or 'operating activities' in columns_text:
return 'cash_flow'
else :
return 'other_financial'
# Usage
tables = await extract_financial_tables( "company_10k.pdf" )
for table in tables:
print ( f "Found { table[ 'type' ] } with { table[ 'rows' ] } rows" )
Build a comprehensive financial metrics extractor:
import re
from typing import Dict, List
class FinancialMetricsExtractor :
def __init__ ( self , api_key : str ):
self .client = AsyncLexa( api_key = api_key)
# Define financial metric patterns
self .metric_patterns = {
'revenue' : [
r 'total revenue [ : \s ] + \$ ? ([ \d , \. ] + ) ' ,
r 'net sales [ : \s ] + \$ ? ([ \d , \. ] + ) ' ,
r 'revenue [ : \s ] + \$ ? ([ \d , \. ] + ) \s * ( million | billion ) ? '
],
'profit' : [
r 'net income [ : \s ] + \$ ? ([ \d , \. ] + ) ' ,
r 'profit [ : \s ] + \$ ? ([ \d , \. ] + ) ' ,
r 'earnings [ : \s ] + \$ ? ([ \d , \. ] + ) '
],
'assets' : [
r 'total assets [ : \s ] + \$ ? ([ \d , \. ] + ) ' ,
r 'assets [ : \s ] + \$ ? ([ \d , \. ] + ) '
],
'debt' : [
r 'total debt [ : \s ] + \$ ? ([ \d , \. ] + ) ' ,
r 'long . term debt [ : \s ] + \$ ? ([ \d , \. ] + ) '
]
}
async def extract_metrics ( self , document_path : str ) -> Dict:
"""Extract financial metrics from document"""
async with self .client:
documents = await self .client.parse(document_path)
doc = documents[ 0 ]
metrics = {}
content = doc.content.lower()
for metric_type, patterns in self .metric_patterns.items():
values = []
for pattern in patterns:
matches = re.findall(pattern, content, re. IGNORECASE )
values.extend(matches)
# Clean and convert values
cleaned_values = []
for value in values:
if isinstance (value, tuple ):
value = value[ 0 ] # Extract number from regex group
# Remove commas and convert to float
try :
clean_value = float (value.replace( ',' , '' ))
cleaned_values.append(clean_value)
except ValueError :
continue
metrics[metric_type] = cleaned_values
return {
'document' : document_path,
'metrics' : metrics,
'summary' : self ._summarize_metrics(metrics)
}
def _summarize_metrics ( self , metrics : Dict) -> Dict:
"""Generate summary statistics"""
summary = {}
for metric_type, values in metrics.items():
if values:
summary[metric_type] = {
'count' : len (values),
'max' : max (values),
'min' : min (values),
'avg' : sum (values) / len (values)
}
return summary
# Usage
extractor = FinancialMetricsExtractor( "your-api-key" )
results = await extractor.extract_metrics( "annual_report.pdf" )
print ( f "Revenue mentions: { len (results[ 'metrics' ][ 'revenue' ]) } " )
print ( f "Profit data points: { len (results[ 'metrics' ][ 'profit' ]) } " )
RAG for Financial Q&A
Build a financial document Q&A system:
from cerevox import AsyncLexa
import openai
from typing import List
class FinancialRAGSystem :
def __init__ ( self , cerevox_api_key : str , openai_api_key : str ):
self .cerevox_client = AsyncLexa( api_key = cerevox_api_key)
openai.api_key = openai_api_key
self .document_chunks = []
async def ingest_documents ( self , financial_docs : List[ str ]):
"""Ingest and chunk financial documents"""
async with self .cerevox_client:
documents = await self .cerevox_client.parse(financial_docs)
# Create vector-ready chunks
all_chunks = documents.get_all_text_chunks(
target_size = 1000 , # Optimal for financial context
tolerance = 0.15
)
# Add metadata for better retrieval
for i, chunk in enumerate (all_chunks):
chunk_with_metadata = {
'id' : f 'chunk_ { i } ' ,
'content' : chunk,
'document' : documents[i // len (all_chunks) * len (documents)].filename,
'embedding' : await self ._get_embedding(chunk)
}
self .document_chunks.append(chunk_with_metadata)
print ( f "Ingested { len ( self .document_chunks) } chunks from financial documents" )
async def _get_embedding ( self , text : str ):
"""Get embedding for text chunk"""
response = await openai.Embedding.acreate(
model = "text-embedding-ada-002" ,
input = text
)
return response[ 'data' ][ 0 ][ 'embedding' ]
async def query_financials ( self , question : str , top_k : int = 5 ):
"""Answer questions about financial documents"""
# Get question embedding
question_embedding = await self ._get_embedding(question)
# Find relevant chunks (simplified - use proper vector DB in production)
relevant_chunks = self ._find_similar_chunks(
question_embedding,
top_k
)
# Build context for LLM
context = " \n\n " .join([
f "Document: { chunk[ 'document' ] } \n Content: { chunk[ 'content' ] } "
for chunk in relevant_chunks
])
# Generate answer
response = await openai.ChatCompletion.acreate(
model = "gpt-4" ,
messages = [
{
"role" : "system" ,
"content" : "You are a financial analyst assistant. Answer questions based on the provided financial document context. Be precise and cite specific numbers when available."
},
{
"role" : "user" ,
"content" : f "Context: \n { context } \n\n Question: { question } "
}
]
)
return {
'answer' : response.choices[ 0 ].message.content,
'sources' : [chunk[ 'document' ] for chunk in relevant_chunks],
'relevant_chunks' : len (relevant_chunks)
}
def _find_similar_chunks ( self , query_embedding , top_k ):
"""Find most similar chunks (simplified implementation)"""
# In production, use a proper vector database like Pinecone or Weaviate
import numpy as np
similarities = []
for chunk in self .document_chunks:
similarity = np.dot(query_embedding, chunk[ 'embedding' ])
similarities.append((similarity, chunk))
# Sort by similarity and return top k
similarities.sort( key = lambda x : x[ 0 ], reverse = True )
return [chunk for _, chunk in similarities[:top_k]]
# Usage
rag_system = FinancialRAGSystem(
cerevox_api_key = "your-cerevox-key" ,
openai_api_key = "your-openai-key"
)
# Ingest financial documents
await rag_system.ingest_documents([
"tesla_10k_2023.pdf" ,
"tesla_q3_earnings.pdf" ,
"tesla_annual_report.pdf"
])
# Ask questions
result = await rag_system.query_financials(
"What was Tesla's revenue growth in 2023?"
)
print ( f "Answer: { result[ 'answer' ] } " )
print ( f "Sources: { result[ 'sources' ] } " )
Real-World Financial Use Cases
1. Investment Research Automation
async def automated_investment_research ( company_docs ):
"""Automate investment research from company filings"""
async with AsyncLexa( api_key = "your-api-key" ) as client:
documents = await client.parse(company_docs)
research_data = {}
for doc in documents:
# Extract key investment metrics
research_data[doc.filename] = {
'revenue_growth' : extract_growth_metrics(doc, 'revenue' ),
'profit_margins' : extract_margins(doc),
'risk_factors' : doc.search_content( 'risk factor|material risk' ),
'management_discussion' : extract_md_a(doc),
'financial_highlights' : extract_highlights(doc)
}
return research_data
def extract_growth_metrics ( doc , metric ):
"""Extract growth metrics from financial documents"""
# Search for year-over-year comparisons
growth_patterns = [
f ' { metric } .*increased.*(\d+\.?\d*)%' ,
f ' { metric } .*growth.*(\d+\.?\d*)%' ,
f ' { metric } .*up.*(\d+\.?\d*)%'
]
growth_data = []
for pattern in growth_patterns:
matches = re.findall(pattern, doc.content, re. IGNORECASE )
growth_data.extend(matches)
return growth_data
2. Risk Assessment Pipeline
class FinancialRiskAnalyzer :
def __init__ ( self , api_key : str ):
self .client = AsyncLexa( api_key = api_key)
async def analyze_risk_factors ( self , financial_docs : List[ str ]):
"""Comprehensive risk factor analysis"""
async with self .client:
documents = await self .client.parse(financial_docs)
risk_analysis = {}
for doc in documents:
# Extract risk sections
risk_chunks = doc.search_content( "risk factor|risks|uncertainties" )
# Categorize risks
categorized_risks = self ._categorize_risks(risk_chunks)
# Calculate risk scores
risk_scores = self ._calculate_risk_scores(categorized_risks)
risk_analysis[doc.filename] = {
'total_risk_mentions' : len (risk_chunks),
'risk_categories' : categorized_risks,
'risk_scores' : risk_scores,
'high_priority_risks' : self ._identify_high_priority(risk_chunks)
}
return risk_analysis
def _categorize_risks ( self , risk_chunks ):
"""Categorize financial risks"""
categories = {
'market_risk' : [ 'market' , 'competition' , 'demand' ],
'operational_risk' : [ 'operations' , 'supply chain' , 'manufacturing' ],
'financial_risk' : [ 'liquidity' , 'credit' , 'debt' , 'cash flow' ],
'regulatory_risk' : [ 'regulation' , 'compliance' , 'legal' ],
'technology_risk' : [ 'cyber' , 'technology' , 'data breach' ]
}
categorized = {cat: [] for cat in categories}
for chunk in risk_chunks:
chunk_lower = chunk.lower()
for category, keywords in categories.items():
if any (keyword in chunk_lower for keyword in keywords):
categorized[category].append(chunk)
return categorized
3. Earnings Call Analysis
async def analyze_earnings_transcripts ( transcript_files ):
"""Analyze earnings call transcripts for sentiment and insights"""
async with AsyncLexa( api_key = "your-api-key" ) as client:
documents = await client.parse(transcript_files)
earnings_insights = {}
for doc in documents:
# Extract Q&A sections
qa_sections = doc.search_content( "questions and answers|q&a" )
# Management guidance
guidance = doc.search_content( "guidance|outlook|forecast" )
# Key metrics mentioned
metrics = extract_financial_metrics(doc.content)
earnings_insights[doc.filename] = {
'qa_insights' : len (qa_sections),
'forward_guidance' : guidance,
'key_metrics' : metrics,
'sentiment_indicators' : analyze_sentiment(doc.content)
}
return earnings_insights
Lexa delivers exceptional performance for financial document processing:
Processing Speed 30 seconds average for 100+ page 10-K filing
Table Accuracy 99.8% accuracy on complex financial tables
Batch Throughput 500+ documents/hour with async processing
Integration Examples
Pinecone Vector Database
import pinecone
from cerevox import AsyncLexa
# Initialize Pinecone
pinecone.init( api_key = "your-pinecone-key" , environment = "your-env" )
index = pinecone.Index( "financial-docs" )
async def index_financial_documents ( document_paths ):
async with AsyncLexa( api_key = "your-api-key" ) as client:
documents = await client.parse(document_paths)
# Get optimized chunks for financial documents
chunks = documents.get_all_text_chunks(
target_size = 1000 , # Good for financial context
tolerance = 0.1
)
# Create embeddings and upsert to Pinecone
vectors = []
for i, chunk in enumerate (chunks):
vector = {
'id' : f 'financial_chunk_ { i } ' ,
'values' : get_embedding(chunk),
'metadata' : {
'content' : chunk,
'document_type' : 'financial' ,
'source' : documents[i // len (chunks) * len (documents)].filename
}
}
vectors.append(vector)
# Batch upsert to Pinecone
index.upsert( vectors = vectors)
return f "Indexed { len (vectors) } financial document chunks"
Security & Compliance
Lexa is SOC 2 Type II certified and provides enterprise-grade security for sensitive financial documents.
Data Encryption : End-to-end encryption in transit and at rest
Access Controls : Role-based access with audit logging
Compliance : SOC 2, GDPR, and financial industry standards
Data Residency : Control where your financial data is processed
Next Steps
Ready to transform your financial document analysis?
Get Started Process your first financial document in 3 minutes
API Reference Explore advanced parsing methods
Vector Database Guide Build financial RAG applications
Performance Tips Optimize for large financial datasets