Extract structured insights from financial documents with enterprise-grade accuracy
Transform complex financial documents into structured, analyzable data in seconds. Lexa’s AI-powered parsing extracts critical financial metrics, tables, and insights with highest accuracy.
Preserve complex financial tables and calculations
SOC 2 compliant with enterprise security
Process 10-K filings, earnings reports, and prospectuses
Extract insights from 100+ page reports in under 30 seconds
Transform a complex SEC filing into structured data:
from cerevox import Lexa
# Initialize client
client = Lexa(api_key="your-api-key")
# Parse SEC 10-K filing
documents = client.parse("tesla_10k.pdf")
doc = documents[0]
# Extract financial metrics
revenue_chunks = doc.search_content("revenue|total revenue")
profit_chunks = doc.search_content("net income|profit")
print(f"Document: {doc.filename}")
print(f"Pages: {doc.total_pages}")
print(f"Tables found: {len(doc.tables)}")
print(f"Financial metrics extracted: {len(revenue_chunks + profit_chunks)}")
Process complex financial statements and preserve structure:
from cerevox import AsyncLexa
import pandas as pd
async def extract_financial_tables(filing_path):
async with AsyncLexa(api_key="your-api-key") as client:
documents = await client.parse(filing_path)
doc = documents[0]
# Convert tables to pandas DataFrames
financial_tables = []
for table in doc.tables:
try:
# Convert table to structured data
df = pd.DataFrame(table.to_dict())
# Identify table type based on content
table_type = identify_table_type(df)
financial_tables.append({
'type': table_type,
'data': df,
'page': table.page_number,
'rows': len(df),
'columns': len(df.columns)
})
except Exception as e:
print(f"Table processing error: {e}")
continue
return financial_tables
def identify_table_type(df):
"""Identify financial table type based on content"""
columns_text = ' '.join(df.columns.astype(str)).lower()
if 'revenue' in columns_text or 'income' in columns_text:
return 'income_statement'
elif 'assets' in columns_text or 'liabilities' in columns_text:
return 'balance_sheet'
elif 'cash flow' in columns_text or 'operating activities' in columns_text:
return 'cash_flow'
else:
return 'other_financial'
# Usage
tables = await extract_financial_tables("company_10k.pdf")
for table in tables:
print(f"Found {table['type']} with {table['rows']} rows")
Build a comprehensive financial metrics extractor:
import re
from typing import Dict, List
class FinancialMetricsExtractor:
def __init__(self, api_key: str):
self.client = AsyncLexa(api_key=api_key)
# Define financial metric patterns
self.metric_patterns = {
'revenue': [
r'total revenue[:\s]+\$?([\d,\.]+)',
r'net sales[:\s]+\$?([\d,\.]+)',
r'revenue[:\s]+\$?([\d,\.]+)\s*(million|billion)?'
],
'profit': [
r'net income[:\s]+\$?([\d,\.]+)',
r'profit[:\s]+\$?([\d,\.]+)',
r'earnings[:\s]+\$?([\d,\.]+)'
],
'assets': [
r'total assets[:\s]+\$?([\d,\.]+)',
r'assets[:\s]+\$?([\d,\.]+)'
],
'debt': [
r'total debt[:\s]+\$?([\d,\.]+)',
r'long.term debt[:\s]+\$?([\d,\.]+)'
]
}
async def extract_metrics(self, document_path: str) -> Dict:
"""Extract financial metrics from document"""
async with self.client:
documents = await self.client.parse(document_path)
doc = documents[0]
metrics = {}
content = doc.content.lower()
for metric_type, patterns in self.metric_patterns.items():
values = []
for pattern in patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
values.extend(matches)
# Clean and convert values
cleaned_values = []
for value in values:
if isinstance(value, tuple):
value = value[0] # Extract number from regex group
# Remove commas and convert to float
try:
clean_value = float(value.replace(',', ''))
cleaned_values.append(clean_value)
except ValueError:
continue
metrics[metric_type] = cleaned_values
return {
'document': document_path,
'metrics': metrics,
'summary': self._summarize_metrics(metrics)
}
def _summarize_metrics(self, metrics: Dict) -> Dict:
"""Generate summary statistics"""
summary = {}
for metric_type, values in metrics.items():
if values:
summary[metric_type] = {
'count': len(values),
'max': max(values),
'min': min(values),
'avg': sum(values) / len(values)
}
return summary
# Usage
extractor = FinancialMetricsExtractor("your-api-key")
results = await extractor.extract_metrics("annual_report.pdf")
print(f"Revenue mentions: {len(results['metrics']['revenue'])}")
print(f"Profit data points: {len(results['metrics']['profit'])}")
Build a financial document Q&A system:
from cerevox import AsyncLexa
import openai
from typing import List
class FinancialRAGSystem:
def __init__(self, cerevox_api_key: str, openai_api_key: str):
self.cerevox_client = AsyncLexa(api_key=cerevox_api_key)
openai.api_key = openai_api_key
self.document_chunks = []
async def ingest_documents(self, financial_docs: List[str]):
"""Ingest and chunk financial documents"""
async with self.cerevox_client:
documents = await self.cerevox_client.parse(financial_docs)
# Create vector-ready chunks
all_chunks = documents.get_all_text_chunks(
target_size=1000, # Optimal for financial context
tolerance=0.15
)
# Add metadata for better retrieval
for i, chunk in enumerate(all_chunks):
chunk_with_metadata = {
'id': f'chunk_{i}',
'content': chunk,
'document': documents[i // len(all_chunks) * len(documents)].filename,
'embedding': await self._get_embedding(chunk)
}
self.document_chunks.append(chunk_with_metadata)
print(f"Ingested {len(self.document_chunks)} chunks from financial documents")
async def _get_embedding(self, text: str):
"""Get embedding for text chunk"""
response = await openai.Embedding.acreate(
model="text-embedding-ada-002",
input=text
)
return response['data'][0]['embedding']
async def query_financials(self, question: str, top_k: int = 5):
"""Answer questions about financial documents"""
# Get question embedding
question_embedding = await self._get_embedding(question)
# Find relevant chunks (simplified - use proper vector DB in production)
relevant_chunks = self._find_similar_chunks(
question_embedding,
top_k
)
# Build context for LLM
context = "\n\n".join([
f"Document: {chunk['document']}\nContent: {chunk['content']}"
for chunk in relevant_chunks
])
# Generate answer
response = await openai.ChatCompletion.acreate(
model="gpt-4",
messages=[
{
"role": "system",
"content": "You are a financial analyst assistant. Answer questions based on the provided financial document context. Be precise and cite specific numbers when available."
},
{
"role": "user",
"content": f"Context:\n{context}\n\nQuestion: {question}"
}
]
)
return {
'answer': response.choices[0].message.content,
'sources': [chunk['document'] for chunk in relevant_chunks],
'relevant_chunks': len(relevant_chunks)
}
def _find_similar_chunks(self, query_embedding, top_k):
"""Find most similar chunks (simplified implementation)"""
# In production, use a proper vector database like Pinecone or Weaviate
import numpy as np
similarities = []
for chunk in self.document_chunks:
similarity = np.dot(query_embedding, chunk['embedding'])
similarities.append((similarity, chunk))
# Sort by similarity and return top k
similarities.sort(key=lambda x: x[0], reverse=True)
return [chunk for _, chunk in similarities[:top_k]]
# Usage
rag_system = FinancialRAGSystem(
cerevox_api_key="your-cerevox-key",
openai_api_key="your-openai-key"
)
# Ingest financial documents
await rag_system.ingest_documents([
"tesla_10k_2023.pdf",
"tesla_q3_earnings.pdf",
"tesla_annual_report.pdf"
])
# Ask questions
result = await rag_system.query_financials(
"What was Tesla's revenue growth in 2023?"
)
print(f"Answer: {result['answer']}")
print(f"Sources: {result['sources']}")
async def automated_investment_research(company_docs):
"""Automate investment research from company filings"""
async with AsyncLexa(api_key="your-api-key") as client:
documents = await client.parse(company_docs)
research_data = {}
for doc in documents:
# Extract key investment metrics
research_data[doc.filename] = {
'revenue_growth': extract_growth_metrics(doc, 'revenue'),
'profit_margins': extract_margins(doc),
'risk_factors': doc.search_content('risk factor|material risk'),
'management_discussion': extract_md_a(doc),
'financial_highlights': extract_highlights(doc)
}
return research_data
def extract_growth_metrics(doc, metric):
"""Extract growth metrics from financial documents"""
# Search for year-over-year comparisons
growth_patterns = [
f'{metric}.*increased.*(\d+\.?\d*)%',
f'{metric}.*growth.*(\d+\.?\d*)%',
f'{metric}.*up.*(\d+\.?\d*)%'
]
growth_data = []
for pattern in growth_patterns:
matches = re.findall(pattern, doc.content, re.IGNORECASE)
growth_data.extend(matches)
return growth_data
class FinancialRiskAnalyzer:
def __init__(self, api_key: str):
self.client = AsyncLexa(api_key=api_key)
async def analyze_risk_factors(self, financial_docs: List[str]):
"""Comprehensive risk factor analysis"""
async with self.client:
documents = await self.client.parse(financial_docs)
risk_analysis = {}
for doc in documents:
# Extract risk sections
risk_chunks = doc.search_content("risk factor|risks|uncertainties")
# Categorize risks
categorized_risks = self._categorize_risks(risk_chunks)
# Calculate risk scores
risk_scores = self._calculate_risk_scores(categorized_risks)
risk_analysis[doc.filename] = {
'total_risk_mentions': len(risk_chunks),
'risk_categories': categorized_risks,
'risk_scores': risk_scores,
'high_priority_risks': self._identify_high_priority(risk_chunks)
}
return risk_analysis
def _categorize_risks(self, risk_chunks):
"""Categorize financial risks"""
categories = {
'market_risk': ['market', 'competition', 'demand'],
'operational_risk': ['operations', 'supply chain', 'manufacturing'],
'financial_risk': ['liquidity', 'credit', 'debt', 'cash flow'],
'regulatory_risk': ['regulation', 'compliance', 'legal'],
'technology_risk': ['cyber', 'technology', 'data breach']
}
categorized = {cat: [] for cat in categories}
for chunk in risk_chunks:
chunk_lower = chunk.lower()
for category, keywords in categories.items():
if any(keyword in chunk_lower for keyword in keywords):
categorized[category].append(chunk)
return categorized
async def analyze_earnings_transcripts(transcript_files):
"""Analyze earnings call transcripts for sentiment and insights"""
async with AsyncLexa(api_key="your-api-key") as client:
documents = await client.parse(transcript_files)
earnings_insights = {}
for doc in documents:
# Extract Q&A sections
qa_sections = doc.search_content("questions and answers|q&a")
# Management guidance
guidance = doc.search_content("guidance|outlook|forecast")
# Key metrics mentioned
metrics = extract_financial_metrics(doc.content)
earnings_insights[doc.filename] = {
'qa_insights': len(qa_sections),
'forward_guidance': guidance,
'key_metrics': metrics,
'sentiment_indicators': analyze_sentiment(doc.content)
}
return earnings_insights
Lexa delivers exceptional performance for financial document processing:
30 seconds average for 100+ page 10-K filing
99.8% accuracy on complex financial tables
500+ documents/hour with async processing
import pinecone
from cerevox import AsyncLexa
# Initialize Pinecone
pinecone.init(api_key="your-pinecone-key", environment="your-env")
index = pinecone.Index("financial-docs")
async def index_financial_documents(document_paths):
async with AsyncLexa(api_key="your-api-key") as client:
documents = await client.parse(document_paths)
# Get optimized chunks for financial documents
chunks = documents.get_all_text_chunks(
target_size=1000, # Good for financial context
tolerance=0.1
)
# Create embeddings and upsert to Pinecone
vectors = []
for i, chunk in enumerate(chunks):
vector = {
'id': f'financial_chunk_{i}',
'values': get_embedding(chunk),
'metadata': {
'content': chunk,
'document_type': 'financial',
'source': documents[i // len(chunks) * len(documents)].filename
}
}
vectors.append(vector)
# Batch upsert to Pinecone
index.upsert(vectors=vectors)
return f"Indexed {len(vectors)} financial document chunks"
Lexa is SOC 2 Type II certified and provides enterprise-grade security for sensitive financial documents.
Ready to transform your financial document analysis?
Process your first financial document in 3 minutes
Explore advanced parsing methods
Build financial RAG applications
Optimize for large financial datasets
Extract structured insights from financial documents with enterprise-grade accuracy
Transform complex financial documents into structured, analyzable data in seconds. Lexa’s AI-powered parsing extracts critical financial metrics, tables, and insights with highest accuracy.
Preserve complex financial tables and calculations
SOC 2 compliant with enterprise security
Process 10-K filings, earnings reports, and prospectuses
Extract insights from 100+ page reports in under 30 seconds
Transform a complex SEC filing into structured data:
from cerevox import Lexa
# Initialize client
client = Lexa(api_key="your-api-key")
# Parse SEC 10-K filing
documents = client.parse("tesla_10k.pdf")
doc = documents[0]
# Extract financial metrics
revenue_chunks = doc.search_content("revenue|total revenue")
profit_chunks = doc.search_content("net income|profit")
print(f"Document: {doc.filename}")
print(f"Pages: {doc.total_pages}")
print(f"Tables found: {len(doc.tables)}")
print(f"Financial metrics extracted: {len(revenue_chunks + profit_chunks)}")
Process complex financial statements and preserve structure:
from cerevox import AsyncLexa
import pandas as pd
async def extract_financial_tables(filing_path):
async with AsyncLexa(api_key="your-api-key") as client:
documents = await client.parse(filing_path)
doc = documents[0]
# Convert tables to pandas DataFrames
financial_tables = []
for table in doc.tables:
try:
# Convert table to structured data
df = pd.DataFrame(table.to_dict())
# Identify table type based on content
table_type = identify_table_type(df)
financial_tables.append({
'type': table_type,
'data': df,
'page': table.page_number,
'rows': len(df),
'columns': len(df.columns)
})
except Exception as e:
print(f"Table processing error: {e}")
continue
return financial_tables
def identify_table_type(df):
"""Identify financial table type based on content"""
columns_text = ' '.join(df.columns.astype(str)).lower()
if 'revenue' in columns_text or 'income' in columns_text:
return 'income_statement'
elif 'assets' in columns_text or 'liabilities' in columns_text:
return 'balance_sheet'
elif 'cash flow' in columns_text or 'operating activities' in columns_text:
return 'cash_flow'
else:
return 'other_financial'
# Usage
tables = await extract_financial_tables("company_10k.pdf")
for table in tables:
print(f"Found {table['type']} with {table['rows']} rows")
Build a comprehensive financial metrics extractor:
import re
from typing import Dict, List
class FinancialMetricsExtractor:
def __init__(self, api_key: str):
self.client = AsyncLexa(api_key=api_key)
# Define financial metric patterns
self.metric_patterns = {
'revenue': [
r'total revenue[:\s]+\$?([\d,\.]+)',
r'net sales[:\s]+\$?([\d,\.]+)',
r'revenue[:\s]+\$?([\d,\.]+)\s*(million|billion)?'
],
'profit': [
r'net income[:\s]+\$?([\d,\.]+)',
r'profit[:\s]+\$?([\d,\.]+)',
r'earnings[:\s]+\$?([\d,\.]+)'
],
'assets': [
r'total assets[:\s]+\$?([\d,\.]+)',
r'assets[:\s]+\$?([\d,\.]+)'
],
'debt': [
r'total debt[:\s]+\$?([\d,\.]+)',
r'long.term debt[:\s]+\$?([\d,\.]+)'
]
}
async def extract_metrics(self, document_path: str) -> Dict:
"""Extract financial metrics from document"""
async with self.client:
documents = await self.client.parse(document_path)
doc = documents[0]
metrics = {}
content = doc.content.lower()
for metric_type, patterns in self.metric_patterns.items():
values = []
for pattern in patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
values.extend(matches)
# Clean and convert values
cleaned_values = []
for value in values:
if isinstance(value, tuple):
value = value[0] # Extract number from regex group
# Remove commas and convert to float
try:
clean_value = float(value.replace(',', ''))
cleaned_values.append(clean_value)
except ValueError:
continue
metrics[metric_type] = cleaned_values
return {
'document': document_path,
'metrics': metrics,
'summary': self._summarize_metrics(metrics)
}
def _summarize_metrics(self, metrics: Dict) -> Dict:
"""Generate summary statistics"""
summary = {}
for metric_type, values in metrics.items():
if values:
summary[metric_type] = {
'count': len(values),
'max': max(values),
'min': min(values),
'avg': sum(values) / len(values)
}
return summary
# Usage
extractor = FinancialMetricsExtractor("your-api-key")
results = await extractor.extract_metrics("annual_report.pdf")
print(f"Revenue mentions: {len(results['metrics']['revenue'])}")
print(f"Profit data points: {len(results['metrics']['profit'])}")
Build a financial document Q&A system:
from cerevox import AsyncLexa
import openai
from typing import List
class FinancialRAGSystem:
def __init__(self, cerevox_api_key: str, openai_api_key: str):
self.cerevox_client = AsyncLexa(api_key=cerevox_api_key)
openai.api_key = openai_api_key
self.document_chunks = []
async def ingest_documents(self, financial_docs: List[str]):
"""Ingest and chunk financial documents"""
async with self.cerevox_client:
documents = await self.cerevox_client.parse(financial_docs)
# Create vector-ready chunks
all_chunks = documents.get_all_text_chunks(
target_size=1000, # Optimal for financial context
tolerance=0.15
)
# Add metadata for better retrieval
for i, chunk in enumerate(all_chunks):
chunk_with_metadata = {
'id': f'chunk_{i}',
'content': chunk,
'document': documents[i // len(all_chunks) * len(documents)].filename,
'embedding': await self._get_embedding(chunk)
}
self.document_chunks.append(chunk_with_metadata)
print(f"Ingested {len(self.document_chunks)} chunks from financial documents")
async def _get_embedding(self, text: str):
"""Get embedding for text chunk"""
response = await openai.Embedding.acreate(
model="text-embedding-ada-002",
input=text
)
return response['data'][0]['embedding']
async def query_financials(self, question: str, top_k: int = 5):
"""Answer questions about financial documents"""
# Get question embedding
question_embedding = await self._get_embedding(question)
# Find relevant chunks (simplified - use proper vector DB in production)
relevant_chunks = self._find_similar_chunks(
question_embedding,
top_k
)
# Build context for LLM
context = "\n\n".join([
f"Document: {chunk['document']}\nContent: {chunk['content']}"
for chunk in relevant_chunks
])
# Generate answer
response = await openai.ChatCompletion.acreate(
model="gpt-4",
messages=[
{
"role": "system",
"content": "You are a financial analyst assistant. Answer questions based on the provided financial document context. Be precise and cite specific numbers when available."
},
{
"role": "user",
"content": f"Context:\n{context}\n\nQuestion: {question}"
}
]
)
return {
'answer': response.choices[0].message.content,
'sources': [chunk['document'] for chunk in relevant_chunks],
'relevant_chunks': len(relevant_chunks)
}
def _find_similar_chunks(self, query_embedding, top_k):
"""Find most similar chunks (simplified implementation)"""
# In production, use a proper vector database like Pinecone or Weaviate
import numpy as np
similarities = []
for chunk in self.document_chunks:
similarity = np.dot(query_embedding, chunk['embedding'])
similarities.append((similarity, chunk))
# Sort by similarity and return top k
similarities.sort(key=lambda x: x[0], reverse=True)
return [chunk for _, chunk in similarities[:top_k]]
# Usage
rag_system = FinancialRAGSystem(
cerevox_api_key="your-cerevox-key",
openai_api_key="your-openai-key"
)
# Ingest financial documents
await rag_system.ingest_documents([
"tesla_10k_2023.pdf",
"tesla_q3_earnings.pdf",
"tesla_annual_report.pdf"
])
# Ask questions
result = await rag_system.query_financials(
"What was Tesla's revenue growth in 2023?"
)
print(f"Answer: {result['answer']}")
print(f"Sources: {result['sources']}")
async def automated_investment_research(company_docs):
"""Automate investment research from company filings"""
async with AsyncLexa(api_key="your-api-key") as client:
documents = await client.parse(company_docs)
research_data = {}
for doc in documents:
# Extract key investment metrics
research_data[doc.filename] = {
'revenue_growth': extract_growth_metrics(doc, 'revenue'),
'profit_margins': extract_margins(doc),
'risk_factors': doc.search_content('risk factor|material risk'),
'management_discussion': extract_md_a(doc),
'financial_highlights': extract_highlights(doc)
}
return research_data
def extract_growth_metrics(doc, metric):
"""Extract growth metrics from financial documents"""
# Search for year-over-year comparisons
growth_patterns = [
f'{metric}.*increased.*(\d+\.?\d*)%',
f'{metric}.*growth.*(\d+\.?\d*)%',
f'{metric}.*up.*(\d+\.?\d*)%'
]
growth_data = []
for pattern in growth_patterns:
matches = re.findall(pattern, doc.content, re.IGNORECASE)
growth_data.extend(matches)
return growth_data
class FinancialRiskAnalyzer:
def __init__(self, api_key: str):
self.client = AsyncLexa(api_key=api_key)
async def analyze_risk_factors(self, financial_docs: List[str]):
"""Comprehensive risk factor analysis"""
async with self.client:
documents = await self.client.parse(financial_docs)
risk_analysis = {}
for doc in documents:
# Extract risk sections
risk_chunks = doc.search_content("risk factor|risks|uncertainties")
# Categorize risks
categorized_risks = self._categorize_risks(risk_chunks)
# Calculate risk scores
risk_scores = self._calculate_risk_scores(categorized_risks)
risk_analysis[doc.filename] = {
'total_risk_mentions': len(risk_chunks),
'risk_categories': categorized_risks,
'risk_scores': risk_scores,
'high_priority_risks': self._identify_high_priority(risk_chunks)
}
return risk_analysis
def _categorize_risks(self, risk_chunks):
"""Categorize financial risks"""
categories = {
'market_risk': ['market', 'competition', 'demand'],
'operational_risk': ['operations', 'supply chain', 'manufacturing'],
'financial_risk': ['liquidity', 'credit', 'debt', 'cash flow'],
'regulatory_risk': ['regulation', 'compliance', 'legal'],
'technology_risk': ['cyber', 'technology', 'data breach']
}
categorized = {cat: [] for cat in categories}
for chunk in risk_chunks:
chunk_lower = chunk.lower()
for category, keywords in categories.items():
if any(keyword in chunk_lower for keyword in keywords):
categorized[category].append(chunk)
return categorized
async def analyze_earnings_transcripts(transcript_files):
"""Analyze earnings call transcripts for sentiment and insights"""
async with AsyncLexa(api_key="your-api-key") as client:
documents = await client.parse(transcript_files)
earnings_insights = {}
for doc in documents:
# Extract Q&A sections
qa_sections = doc.search_content("questions and answers|q&a")
# Management guidance
guidance = doc.search_content("guidance|outlook|forecast")
# Key metrics mentioned
metrics = extract_financial_metrics(doc.content)
earnings_insights[doc.filename] = {
'qa_insights': len(qa_sections),
'forward_guidance': guidance,
'key_metrics': metrics,
'sentiment_indicators': analyze_sentiment(doc.content)
}
return earnings_insights
Lexa delivers exceptional performance for financial document processing:
30 seconds average for 100+ page 10-K filing
99.8% accuracy on complex financial tables
500+ documents/hour with async processing
import pinecone
from cerevox import AsyncLexa
# Initialize Pinecone
pinecone.init(api_key="your-pinecone-key", environment="your-env")
index = pinecone.Index("financial-docs")
async def index_financial_documents(document_paths):
async with AsyncLexa(api_key="your-api-key") as client:
documents = await client.parse(document_paths)
# Get optimized chunks for financial documents
chunks = documents.get_all_text_chunks(
target_size=1000, # Good for financial context
tolerance=0.1
)
# Create embeddings and upsert to Pinecone
vectors = []
for i, chunk in enumerate(chunks):
vector = {
'id': f'financial_chunk_{i}',
'values': get_embedding(chunk),
'metadata': {
'content': chunk,
'document_type': 'financial',
'source': documents[i // len(chunks) * len(documents)].filename
}
}
vectors.append(vector)
# Batch upsert to Pinecone
index.upsert(vectors=vectors)
return f"Indexed {len(vectors)} financial document chunks"
Lexa is SOC 2 Type II certified and provides enterprise-grade security for sensitive financial documents.
Ready to transform your financial document analysis?
Process your first financial document in 3 minutes
Explore advanced parsing methods
Build financial RAG applications
Optimize for large financial datasets