Extract actionable insights from market reports, surveys, and competitive intelligence
Transform market research documents into actionable business intelligence. Lexa’s advanced parsing extracts trends, metrics, and insights from complex market reports with precision.
Parse complex market data tables with 99.8% accuracy
Extract key metrics and growth trends automatically
Aggregate insights across multiple research sources
Extract competitor data and market positioning
Extract key insights from market research documents:
from cerevox import Lexa
import re
from typing import Dict, List
# Initialize client
client = Lexa(api_key="your-api-key")
def analyze_market_report(report_path: str):
"""Extract key market insights from research report"""
documents = client.parse(report_path)
report = documents[0]
# Extract market data
market_insights = {
'report_name': report.filename,
'total_pages': report.total_pages,
'market_size': extract_market_size(report.content),
'growth_rates': extract_growth_rates(report.content),
'key_players': extract_companies(report.content),
'market_trends': extract_trends(report.content),
'forecasts': extract_forecasts(report.content),
'data_tables': len(report.tables)
}
return market_insights
def extract_market_size(content: str) -> List[str]:
"""Extract market size mentions"""
patterns = [
r'market size.*?\$?([\d,\.]+)\s*(billion|million|trillion)',
r'valued at.*?\$?([\d,\.]+)\s*(billion|million|trillion)',
r'market worth.*?\$?([\d,\.]+)\s*(billion|million|trillion)'
]
market_sizes = []
for pattern in patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
market_sizes.extend([f"${match[0]} {match[1]}" for match in matches])
return list(set(market_sizes))
def extract_growth_rates(content: str) -> List[str]:
"""Extract growth rate mentions"""
growth_pattern = r'(?:growth|CAGR|increase).*?(\d+\.?\d*)%'
matches = re.findall(growth_pattern, content, re.IGNORECASE)
return [f"{match}%" for match in matches]
def extract_companies(content: str) -> List[str]:
"""Extract company mentions (simplified)"""
# Common company patterns
company_patterns = [
r'[A-Z][a-z]+ (?:Inc|Corp|LLC|Ltd|Co)\.?',
r'[A-Z][A-Za-z]+ [A-Z][a-z]+(?:\s+[A-Z][a-z]+)?'
]
companies = []
for pattern in company_patterns:
matches = re.findall(pattern, content)
companies.extend(matches)
# Remove duplicates and common false positives
companies = list(set(companies))
return [c for c in companies if len(c) > 3][:10] # Top 10
def extract_trends(content: str) -> List[str]:
"""Extract market trends"""
trend_keywords = [
'artificial intelligence', 'ai', 'machine learning',
'cloud computing', 'digital transformation',
'remote work', 'sustainability', 'automation'
]
found_trends = []
content_lower = content.lower()
for trend in trend_keywords:
if trend in content_lower:
found_trends.append(trend)
return found_trends
def extract_forecasts(content: str) -> List[str]:
"""Extract forecast information"""
forecast_patterns = [
r'forecast.*?(\d{4})',
r'projected.*?(\d{4})',
r'expected.*?(\d{4})',
r'by (\d{4})'
]
forecasts = []
for pattern in forecast_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
forecasts.extend(matches)
return list(set(forecasts))
# Usage
insights = analyze_market_report("market_research_2024.pdf")
print(f"Market sizes found: {insights['market_size']}")
print(f"Growth rates: {insights['growth_rates']}")
print(f"Key players: {insights['key_players'][:5]}")
Track and analyze market trends across multiple reports:
from cerevox import AsyncLexa
import pandas as pd
from collections import defaultdict
from datetime import datetime
import re
class MarketTrendAnalyzer:
def __init__(self, api_key: str):
self.client = AsyncLexa(api_key=api_key)
self.trend_database = defaultdict(list)
async def analyze_market_trends(self,
research_documents: List[str],
trend_categories: List[str] = None) -> Dict:
"""Analyze market trends across multiple research documents"""
if trend_categories is None:
trend_categories = [
'artificial intelligence', 'cloud computing', 'sustainability',
'remote work', 'digital transformation', 'automation',
'cybersecurity', 'e-commerce', 'mobile technology'
]
async with self.client:
documents = await self.client.parse(research_documents)
trend_analysis = {
'trends_by_document': {},
'trend_frequency': defaultdict(int),
'trend_growth_indicators': {},
'emerging_trends': [],
'declining_trends': []
}
for doc in documents:
doc_trends = self._extract_document_trends(doc, trend_categories)
trend_analysis['trends_by_document'][doc.filename] = doc_trends
# Update frequency counts
for trend, data in doc_trends.items():
trend_analysis['trend_frequency'][trend] += data['mentions']
# Analyze trend growth indicators
for trend in trend_categories:
growth_data = self._analyze_trend_growth(documents, trend)
trend_analysis['trend_growth_indicators'][trend] = growth_data
# Identify emerging and declining trends
trend_analysis['emerging_trends'] = self._identify_emerging_trends(
trend_analysis['trend_frequency']
)
return trend_analysis
def _extract_document_trends(self, document, trend_categories: List[str]) -> Dict:
"""Extract trend mentions from a single document"""
doc_trends = {}
content_lower = document.content.lower()
for trend in trend_categories:
mentions = content_lower.count(trend.lower())
if mentions > 0:
# Look for growth indicators
growth_indicators = self._find_growth_indicators(document.content, trend)
# Extract trend context
trend_context = self._extract_trend_context(document.content, trend)
doc_trends[trend] = {
'mentions': mentions,
'growth_indicators': growth_indicators,
'context': trend_context[:3], # Top 3 contexts
'sentiment': self._assess_trend_sentiment(trend_context)
}
return doc_trends
def _find_growth_indicators(self, content: str, trend: str) -> List[str]:
"""Find growth indicators for a specific trend"""
growth_patterns = [
rf'{re.escape(trend)}.*?growing.*?(\d+\.?\d*)%',
rf'{re.escape(trend)}.*?increased.*?(\d+\.?\d*)%',
rf'{re.escape(trend)}.*?growth.*?(\d+\.?\d*)%',
rf'(\d+\.?\d*)%.*?growth.*?{re.escape(trend)}'
]
indicators = []
for pattern in growth_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
indicators.extend([f"{match}%" for match in matches])
return list(set(indicators))
def _extract_trend_context(self, content: str, trend: str) -> List[str]:
"""Extract contextual information about a trend"""
import re
# Find sentences containing the trend
sentences = content.split('.')
trend_contexts = []
for sentence in sentences:
if trend.lower() in sentence.lower():
# Clean and add context
cleaned_sentence = sentence.strip()
if len(cleaned_sentence) > 20: # Skip very short sentences
trend_contexts.append(cleaned_sentence)
return trend_contexts
def _assess_trend_sentiment(self, contexts: List[str]) -> str:
"""Assess sentiment around a trend (simplified)"""
positive_words = ['growth', 'increase', 'opportunity', 'potential', 'strong', 'rising']
negative_words = ['decline', 'decrease', 'challenge', 'weak', 'falling', 'struggle']
positive_score = 0
negative_score = 0
for context in contexts:
context_lower = context.lower()
positive_score += sum(1 for word in positive_words if word in context_lower)
negative_score += sum(1 for word in negative_words if word in context_lower)
if positive_score > negative_score:
return 'positive'
elif negative_score > positive_score:
return 'negative'
else:
return 'neutral'
def _analyze_trend_growth(self, documents, trend: str) -> Dict:
"""Analyze growth patterns for a specific trend"""
growth_data = {
'total_mentions': 0,
'documents_mentioning': 0,
'growth_rates': [],
'forecast_years': []
}
for doc in documents:
content = doc.content.lower()
if trend.lower() in content:
growth_data['documents_mentioning'] += 1
growth_data['total_mentions'] += content.count(trend.lower())
# Extract growth rates and forecasts
growth_indicators = self._find_growth_indicators(doc.content, trend)
growth_data['growth_rates'].extend(growth_indicators)
# Extract forecast years
forecast_pattern = rf'{re.escape(trend)}.*?(\d{{4}})'
forecast_matches = re.findall(forecast_pattern, doc.content, re.IGNORECASE)
growth_data['forecast_years'].extend(forecast_matches)
return growth_data
def _identify_emerging_trends(self, trend_frequency: Dict) -> List[str]:
"""Identify emerging trends based on frequency"""
# Simple heuristic: trends mentioned frequently but not overwhelmingly
frequencies = list(trend_frequency.values())
if not frequencies:
return []
avg_frequency = sum(frequencies) / len(frequencies)
emerging = []
for trend, freq in trend_frequency.items():
if avg_frequency * 0.5 <= freq <= avg_frequency * 1.5:
emerging.append(trend)
return emerging
# Usage
trend_analyzer = MarketTrendAnalyzer("your-api-key")
research_docs = [
"tech_trends_2024.pdf",
"market_forecast_report.pdf",
"industry_analysis_q3.pdf"
]
trend_analysis = await trend_analyzer.analyze_market_trends(research_docs)
print(f"Most mentioned trends: {dict(list(trend_analysis['trend_frequency'].items())[:5])}")
print(f"Emerging trends: {trend_analysis['emerging_trends']}")
Process market survey documents and extract insights:
class SurveyAnalyzer:
def __init__(self, api_key: str):
self.client = AsyncLexa(api_key=api_key)
async def analyze_survey_data(self, survey_documents: List[str]) -> Dict:
"""Analyze market survey documents"""
async with self.client:
documents = await self.client.parse(survey_documents)
survey_analysis = {
'survey_metadata': [],
'response_patterns': {},
'demographic_insights': {},
'satisfaction_metrics': {},
'key_findings': []
}
for doc in documents:
# Extract survey metadata
metadata = self._extract_survey_metadata(doc)
survey_analysis['survey_metadata'].append(metadata)
# Analyze response patterns
patterns = self._analyze_response_patterns(doc)
survey_analysis['response_patterns'][doc.filename] = patterns
# Extract satisfaction metrics
satisfaction = self._extract_satisfaction_metrics(doc)
survey_analysis['satisfaction_metrics'][doc.filename] = satisfaction
# Extract key findings
findings = self._extract_survey_findings(doc)
survey_analysis['key_findings'].extend(findings)
return survey_analysis
def _extract_survey_metadata(self, document) -> Dict:
"""Extract survey metadata and basic info"""
content = document.content
# Extract sample size
sample_patterns = [
r'sample size.*?(\d+)',
r'(\d+).*?respondents',
r'n\s*=\s*(\d+)'
]
sample_size = None
for pattern in sample_patterns:
match = re.search(pattern, content, re.IGNORECASE)
if match:
sample_size = int(match.group(1))
break
# Extract survey period
date_patterns = [
r'conducted.*?(\d{4})',
r'survey period.*?(\d{1,2}/\d{4})',
r'fieldwork.*?(\d{4})'
]
survey_period = None
for pattern in date_patterns:
match = re.search(pattern, content, re.IGNORECASE)
if match:
survey_period = match.group(1)
break
return {
'document': document.filename,
'pages': document.total_pages,
'sample_size': sample_size,
'survey_period': survey_period,
'tables_count': len(document.tables)
}
def _analyze_response_patterns(self, document) -> Dict:
"""Analyze survey response patterns"""
content = document.content
# Extract percentage responses
percentage_pattern = r'(\d+\.?\d*)%'
percentages = re.findall(percentage_pattern, content)
percentages = [float(p) for p in percentages if float(p) <= 100]
# Extract rating scales
rating_patterns = [
r'(\d+)/10',
r'(\d+) out of 10',
r'rated (\d+)'
]
ratings = []
for pattern in rating_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
ratings.extend([int(m) for m in matches])
return {
'percentage_responses': {
'count': len(percentages),
'average': sum(percentages) / len(percentages) if percentages else 0,
'distribution': self._categorize_percentages(percentages)
},
'rating_responses': {
'count': len(ratings),
'average': sum(ratings) / len(ratings) if ratings else 0,
'high_ratings': sum(1 for r in ratings if r >= 7)
}
}
def _categorize_percentages(self, percentages: List[float]) -> Dict:
"""Categorize percentage responses"""
categories = {
'high (70-100%)': sum(1 for p in percentages if p >= 70),
'medium (30-69%)': sum(1 for p in percentages if 30 <= p < 70),
'low (0-29%)': sum(1 for p in percentages if p < 30)
}
return categories
def _extract_satisfaction_metrics(self, document) -> Dict:
"""Extract customer satisfaction metrics"""
content = document.content.lower()
satisfaction_keywords = {
'very_satisfied': ['very satisfied', 'extremely satisfied', 'highly satisfied'],
'satisfied': ['satisfied', 'pleased', 'happy'],
'neutral': ['neutral', 'neither', 'average'],
'dissatisfied': ['dissatisfied', 'unhappy', 'disappointed'],
'very_dissatisfied': ['very dissatisfied', 'extremely dissatisfied']
}
satisfaction_scores = {}
for category, keywords in satisfaction_keywords.items():
score = sum(content.count(keyword) for keyword in keywords)
satisfaction_scores[category] = score
return satisfaction_scores
def _extract_survey_findings(self, document) -> List[str]:
"""Extract key survey findings"""
finding_indicators = [
'key finding', 'main finding', 'important finding',
'conclusion', 'result shows', 'data reveals'
]
findings = []
for indicator in finding_indicators:
matches = document.search_content(indicator)
findings.extend(matches)
return findings[:5] # Top 5 findings per document
# Usage
survey_analyzer = SurveyAnalyzer("your-api-key")
survey_docs = [
"customer_satisfaction_2024.pdf",
"market_research_survey.pdf",
"brand_perception_study.pdf"
]
survey_results = await survey_analyzer.analyze_survey_data(survey_docs)
print(f"Surveys analyzed: {len(survey_results['survey_metadata'])}")
print(f"Key findings: {len(survey_results['key_findings'])}")
async def analyze_industry_reports(report_paths: List[str]):
"""Comprehensive industry report analysis"""
async with AsyncLexa(api_key="your-api-key") as client:
documents = await client.parse(report_paths)
industry_analysis = {
'market_size_trends': {},
'competitive_landscape': {},
'technology_trends': {},
'regulatory_changes': {},
'investment_patterns': {}
}
for doc in documents:
# Extract market sizing information
market_data = extract_market_sizing(doc)
industry_analysis['market_size_trends'][doc.filename] = market_data
# Analyze competitive mentions
competitive_data = analyze_competitive_mentions(doc)
industry_analysis['competitive_landscape'][doc.filename] = competitive_data
# Extract technology trends
tech_trends = extract_technology_trends(doc)
industry_analysis['technology_trends'][doc.filename] = tech_trends
return industry_analysis
def extract_market_sizing(document):
"""Extract market sizing data from document"""
content = document.content
# Market size patterns
size_patterns = [
r'market.*?worth.*?\$?([\d,\.]+)\s*(billion|million|trillion)',
r'industry.*?valued.*?\$?([\d,\.]+)\s*(billion|million|trillion)',
r'revenue.*?\$?([\d,\.]+)\s*(billion|million|trillion)'
]
market_sizes = []
for pattern in size_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
for match in matches:
market_sizes.append({
'value': match[0],
'unit': match[1],
'full_value': f"${match[0]} {match[1]}"
})
return {
'market_sizes': market_sizes,
'count': len(market_sizes),
'tables_with_data': len(document.tables)
}
class CompetitiveDashboard:
def __init__(self, api_key: str):
self.client = AsyncLexa(api_key=api_key)
self.competitor_data = {}
async def build_competitive_dashboard(self, competitor_docs: Dict[str, List[str]]):
"""Build comprehensive competitive intelligence dashboard"""
dashboard_data = {
'competitors': {},
'market_positioning': {},
'performance_metrics': {},
'strategic_initiatives': {}
}
async with self.client:
for competitor, doc_paths in competitor_docs.items():
documents = await self.client.parse(doc_paths)
competitor_profile = {
'documents_analyzed': len(documents),
'total_pages': sum(doc.total_pages for doc in documents),
'revenue_data': self._extract_financial_data(documents),
'product_portfolio': self._extract_product_data(documents),
'market_position': self._assess_market_position(documents),
'recent_initiatives': self._extract_strategic_initiatives(documents)
}
dashboard_data['competitors'][competitor] = competitor_profile
return dashboard_data
def _extract_financial_data(self, documents):
"""Extract financial performance data"""
financial_data = {
'revenue_mentions': 0,
'growth_rates': [],
'profit_margins': [],
'market_share': []
}
for doc in documents:
content = doc.content
# Count revenue mentions
financial_data['revenue_mentions'] += len(doc.search_content("revenue|sales"))
# Extract growth rates
growth_pattern = r'growth.*?(\d+\.?\d*)%'
growth_matches = re.findall(growth_pattern, content, re.IGNORECASE)
financial_data['growth_rates'].extend(growth_matches)
# Extract market share
share_pattern = r'market share.*?(\d+\.?\d*)%'
share_matches = re.findall(share_pattern, content, re.IGNORECASE)
financial_data['market_share'].extend(share_matches)
return financial_data
# Usage
dashboard = CompetitiveDashboard("your-api-key")
competitor_documents = {
'Company_A': ['company_a_annual_report.pdf', 'company_a_strategy.pdf'],
'Company_B': ['company_b_earnings.pdf', 'company_b_presentation.pdf'],
'Company_C': ['company_c_research.pdf', 'company_c_analysis.pdf']
}
competitive_dashboard = await dashboard.build_competitive_dashboard(competitor_documents)
15 seconds for 200+ page market research report
95% accuracy in trend and metric extraction
100+ reports/hour for competitive intelligence
import pandas as pd
from cerevox import AsyncLexa
async def export_for_bi_tools(research_documents: List[str],
export_format: str = 'excel'):
"""Export market research data for BI tools"""
async with AsyncLexa(api_key="your-api-key") as client:
documents = await client.parse(research_documents)
# Structure data for BI consumption
market_data = []
for doc in documents:
# Extract structured insights
insights = {
'document_name': doc.filename,
'analysis_date': datetime.now().strftime('%Y-%m-%d'),
'pages': doc.total_pages,
'data_tables': len(doc.tables),
'market_mentions': len(doc.search_content("market")),
'growth_mentions': len(doc.search_content("growth")),
'competitor_mentions': len(doc.search_content("competitor|competition")),
'trend_mentions': len(doc.search_content("trend|trending"))
}
market_data.append(insights)
# Create DataFrame
df = pd.DataFrame(market_data)
# Export in requested format
if export_format == 'excel':
df.to_excel('market_research_analysis.xlsx', index=False)
elif export_format == 'csv':
df.to_csv('market_research_analysis.csv', index=False)
elif export_format == 'json':
df.to_json('market_research_analysis.json', orient='records')
return df
# Usage
df = await export_for_bi_tools([
"q3_market_report.pdf",
"competitor_analysis.pdf",
"industry_trends.pdf"
], export_format='excel')
print(f"Exported {len(df)} reports to Excel for BI analysis")
Start analyzing market research in minutes
Build market intelligence search systems
Optimize for large-scale research analysis
Explore advanced market data extraction
Extract actionable insights from market reports, surveys, and competitive intelligence
Transform market research documents into actionable business intelligence. Lexa’s advanced parsing extracts trends, metrics, and insights from complex market reports with precision.
Parse complex market data tables with 99.8% accuracy
Extract key metrics and growth trends automatically
Aggregate insights across multiple research sources
Extract competitor data and market positioning
Extract key insights from market research documents:
from cerevox import Lexa
import re
from typing import Dict, List
# Initialize client
client = Lexa(api_key="your-api-key")
def analyze_market_report(report_path: str):
"""Extract key market insights from research report"""
documents = client.parse(report_path)
report = documents[0]
# Extract market data
market_insights = {
'report_name': report.filename,
'total_pages': report.total_pages,
'market_size': extract_market_size(report.content),
'growth_rates': extract_growth_rates(report.content),
'key_players': extract_companies(report.content),
'market_trends': extract_trends(report.content),
'forecasts': extract_forecasts(report.content),
'data_tables': len(report.tables)
}
return market_insights
def extract_market_size(content: str) -> List[str]:
"""Extract market size mentions"""
patterns = [
r'market size.*?\$?([\d,\.]+)\s*(billion|million|trillion)',
r'valued at.*?\$?([\d,\.]+)\s*(billion|million|trillion)',
r'market worth.*?\$?([\d,\.]+)\s*(billion|million|trillion)'
]
market_sizes = []
for pattern in patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
market_sizes.extend([f"${match[0]} {match[1]}" for match in matches])
return list(set(market_sizes))
def extract_growth_rates(content: str) -> List[str]:
"""Extract growth rate mentions"""
growth_pattern = r'(?:growth|CAGR|increase).*?(\d+\.?\d*)%'
matches = re.findall(growth_pattern, content, re.IGNORECASE)
return [f"{match}%" for match in matches]
def extract_companies(content: str) -> List[str]:
"""Extract company mentions (simplified)"""
# Common company patterns
company_patterns = [
r'[A-Z][a-z]+ (?:Inc|Corp|LLC|Ltd|Co)\.?',
r'[A-Z][A-Za-z]+ [A-Z][a-z]+(?:\s+[A-Z][a-z]+)?'
]
companies = []
for pattern in company_patterns:
matches = re.findall(pattern, content)
companies.extend(matches)
# Remove duplicates and common false positives
companies = list(set(companies))
return [c for c in companies if len(c) > 3][:10] # Top 10
def extract_trends(content: str) -> List[str]:
"""Extract market trends"""
trend_keywords = [
'artificial intelligence', 'ai', 'machine learning',
'cloud computing', 'digital transformation',
'remote work', 'sustainability', 'automation'
]
found_trends = []
content_lower = content.lower()
for trend in trend_keywords:
if trend in content_lower:
found_trends.append(trend)
return found_trends
def extract_forecasts(content: str) -> List[str]:
"""Extract forecast information"""
forecast_patterns = [
r'forecast.*?(\d{4})',
r'projected.*?(\d{4})',
r'expected.*?(\d{4})',
r'by (\d{4})'
]
forecasts = []
for pattern in forecast_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
forecasts.extend(matches)
return list(set(forecasts))
# Usage
insights = analyze_market_report("market_research_2024.pdf")
print(f"Market sizes found: {insights['market_size']}")
print(f"Growth rates: {insights['growth_rates']}")
print(f"Key players: {insights['key_players'][:5]}")
Track and analyze market trends across multiple reports:
from cerevox import AsyncLexa
import pandas as pd
from collections import defaultdict
from datetime import datetime
import re
class MarketTrendAnalyzer:
def __init__(self, api_key: str):
self.client = AsyncLexa(api_key=api_key)
self.trend_database = defaultdict(list)
async def analyze_market_trends(self,
research_documents: List[str],
trend_categories: List[str] = None) -> Dict:
"""Analyze market trends across multiple research documents"""
if trend_categories is None:
trend_categories = [
'artificial intelligence', 'cloud computing', 'sustainability',
'remote work', 'digital transformation', 'automation',
'cybersecurity', 'e-commerce', 'mobile technology'
]
async with self.client:
documents = await self.client.parse(research_documents)
trend_analysis = {
'trends_by_document': {},
'trend_frequency': defaultdict(int),
'trend_growth_indicators': {},
'emerging_trends': [],
'declining_trends': []
}
for doc in documents:
doc_trends = self._extract_document_trends(doc, trend_categories)
trend_analysis['trends_by_document'][doc.filename] = doc_trends
# Update frequency counts
for trend, data in doc_trends.items():
trend_analysis['trend_frequency'][trend] += data['mentions']
# Analyze trend growth indicators
for trend in trend_categories:
growth_data = self._analyze_trend_growth(documents, trend)
trend_analysis['trend_growth_indicators'][trend] = growth_data
# Identify emerging and declining trends
trend_analysis['emerging_trends'] = self._identify_emerging_trends(
trend_analysis['trend_frequency']
)
return trend_analysis
def _extract_document_trends(self, document, trend_categories: List[str]) -> Dict:
"""Extract trend mentions from a single document"""
doc_trends = {}
content_lower = document.content.lower()
for trend in trend_categories:
mentions = content_lower.count(trend.lower())
if mentions > 0:
# Look for growth indicators
growth_indicators = self._find_growth_indicators(document.content, trend)
# Extract trend context
trend_context = self._extract_trend_context(document.content, trend)
doc_trends[trend] = {
'mentions': mentions,
'growth_indicators': growth_indicators,
'context': trend_context[:3], # Top 3 contexts
'sentiment': self._assess_trend_sentiment(trend_context)
}
return doc_trends
def _find_growth_indicators(self, content: str, trend: str) -> List[str]:
"""Find growth indicators for a specific trend"""
growth_patterns = [
rf'{re.escape(trend)}.*?growing.*?(\d+\.?\d*)%',
rf'{re.escape(trend)}.*?increased.*?(\d+\.?\d*)%',
rf'{re.escape(trend)}.*?growth.*?(\d+\.?\d*)%',
rf'(\d+\.?\d*)%.*?growth.*?{re.escape(trend)}'
]
indicators = []
for pattern in growth_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
indicators.extend([f"{match}%" for match in matches])
return list(set(indicators))
def _extract_trend_context(self, content: str, trend: str) -> List[str]:
"""Extract contextual information about a trend"""
import re
# Find sentences containing the trend
sentences = content.split('.')
trend_contexts = []
for sentence in sentences:
if trend.lower() in sentence.lower():
# Clean and add context
cleaned_sentence = sentence.strip()
if len(cleaned_sentence) > 20: # Skip very short sentences
trend_contexts.append(cleaned_sentence)
return trend_contexts
def _assess_trend_sentiment(self, contexts: List[str]) -> str:
"""Assess sentiment around a trend (simplified)"""
positive_words = ['growth', 'increase', 'opportunity', 'potential', 'strong', 'rising']
negative_words = ['decline', 'decrease', 'challenge', 'weak', 'falling', 'struggle']
positive_score = 0
negative_score = 0
for context in contexts:
context_lower = context.lower()
positive_score += sum(1 for word in positive_words if word in context_lower)
negative_score += sum(1 for word in negative_words if word in context_lower)
if positive_score > negative_score:
return 'positive'
elif negative_score > positive_score:
return 'negative'
else:
return 'neutral'
def _analyze_trend_growth(self, documents, trend: str) -> Dict:
"""Analyze growth patterns for a specific trend"""
growth_data = {
'total_mentions': 0,
'documents_mentioning': 0,
'growth_rates': [],
'forecast_years': []
}
for doc in documents:
content = doc.content.lower()
if trend.lower() in content:
growth_data['documents_mentioning'] += 1
growth_data['total_mentions'] += content.count(trend.lower())
# Extract growth rates and forecasts
growth_indicators = self._find_growth_indicators(doc.content, trend)
growth_data['growth_rates'].extend(growth_indicators)
# Extract forecast years
forecast_pattern = rf'{re.escape(trend)}.*?(\d{{4}})'
forecast_matches = re.findall(forecast_pattern, doc.content, re.IGNORECASE)
growth_data['forecast_years'].extend(forecast_matches)
return growth_data
def _identify_emerging_trends(self, trend_frequency: Dict) -> List[str]:
"""Identify emerging trends based on frequency"""
# Simple heuristic: trends mentioned frequently but not overwhelmingly
frequencies = list(trend_frequency.values())
if not frequencies:
return []
avg_frequency = sum(frequencies) / len(frequencies)
emerging = []
for trend, freq in trend_frequency.items():
if avg_frequency * 0.5 <= freq <= avg_frequency * 1.5:
emerging.append(trend)
return emerging
# Usage
trend_analyzer = MarketTrendAnalyzer("your-api-key")
research_docs = [
"tech_trends_2024.pdf",
"market_forecast_report.pdf",
"industry_analysis_q3.pdf"
]
trend_analysis = await trend_analyzer.analyze_market_trends(research_docs)
print(f"Most mentioned trends: {dict(list(trend_analysis['trend_frequency'].items())[:5])}")
print(f"Emerging trends: {trend_analysis['emerging_trends']}")
Process market survey documents and extract insights:
class SurveyAnalyzer:
def __init__(self, api_key: str):
self.client = AsyncLexa(api_key=api_key)
async def analyze_survey_data(self, survey_documents: List[str]) -> Dict:
"""Analyze market survey documents"""
async with self.client:
documents = await self.client.parse(survey_documents)
survey_analysis = {
'survey_metadata': [],
'response_patterns': {},
'demographic_insights': {},
'satisfaction_metrics': {},
'key_findings': []
}
for doc in documents:
# Extract survey metadata
metadata = self._extract_survey_metadata(doc)
survey_analysis['survey_metadata'].append(metadata)
# Analyze response patterns
patterns = self._analyze_response_patterns(doc)
survey_analysis['response_patterns'][doc.filename] = patterns
# Extract satisfaction metrics
satisfaction = self._extract_satisfaction_metrics(doc)
survey_analysis['satisfaction_metrics'][doc.filename] = satisfaction
# Extract key findings
findings = self._extract_survey_findings(doc)
survey_analysis['key_findings'].extend(findings)
return survey_analysis
def _extract_survey_metadata(self, document) -> Dict:
"""Extract survey metadata and basic info"""
content = document.content
# Extract sample size
sample_patterns = [
r'sample size.*?(\d+)',
r'(\d+).*?respondents',
r'n\s*=\s*(\d+)'
]
sample_size = None
for pattern in sample_patterns:
match = re.search(pattern, content, re.IGNORECASE)
if match:
sample_size = int(match.group(1))
break
# Extract survey period
date_patterns = [
r'conducted.*?(\d{4})',
r'survey period.*?(\d{1,2}/\d{4})',
r'fieldwork.*?(\d{4})'
]
survey_period = None
for pattern in date_patterns:
match = re.search(pattern, content, re.IGNORECASE)
if match:
survey_period = match.group(1)
break
return {
'document': document.filename,
'pages': document.total_pages,
'sample_size': sample_size,
'survey_period': survey_period,
'tables_count': len(document.tables)
}
def _analyze_response_patterns(self, document) -> Dict:
"""Analyze survey response patterns"""
content = document.content
# Extract percentage responses
percentage_pattern = r'(\d+\.?\d*)%'
percentages = re.findall(percentage_pattern, content)
percentages = [float(p) for p in percentages if float(p) <= 100]
# Extract rating scales
rating_patterns = [
r'(\d+)/10',
r'(\d+) out of 10',
r'rated (\d+)'
]
ratings = []
for pattern in rating_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
ratings.extend([int(m) for m in matches])
return {
'percentage_responses': {
'count': len(percentages),
'average': sum(percentages) / len(percentages) if percentages else 0,
'distribution': self._categorize_percentages(percentages)
},
'rating_responses': {
'count': len(ratings),
'average': sum(ratings) / len(ratings) if ratings else 0,
'high_ratings': sum(1 for r in ratings if r >= 7)
}
}
def _categorize_percentages(self, percentages: List[float]) -> Dict:
"""Categorize percentage responses"""
categories = {
'high (70-100%)': sum(1 for p in percentages if p >= 70),
'medium (30-69%)': sum(1 for p in percentages if 30 <= p < 70),
'low (0-29%)': sum(1 for p in percentages if p < 30)
}
return categories
def _extract_satisfaction_metrics(self, document) -> Dict:
"""Extract customer satisfaction metrics"""
content = document.content.lower()
satisfaction_keywords = {
'very_satisfied': ['very satisfied', 'extremely satisfied', 'highly satisfied'],
'satisfied': ['satisfied', 'pleased', 'happy'],
'neutral': ['neutral', 'neither', 'average'],
'dissatisfied': ['dissatisfied', 'unhappy', 'disappointed'],
'very_dissatisfied': ['very dissatisfied', 'extremely dissatisfied']
}
satisfaction_scores = {}
for category, keywords in satisfaction_keywords.items():
score = sum(content.count(keyword) for keyword in keywords)
satisfaction_scores[category] = score
return satisfaction_scores
def _extract_survey_findings(self, document) -> List[str]:
"""Extract key survey findings"""
finding_indicators = [
'key finding', 'main finding', 'important finding',
'conclusion', 'result shows', 'data reveals'
]
findings = []
for indicator in finding_indicators:
matches = document.search_content(indicator)
findings.extend(matches)
return findings[:5] # Top 5 findings per document
# Usage
survey_analyzer = SurveyAnalyzer("your-api-key")
survey_docs = [
"customer_satisfaction_2024.pdf",
"market_research_survey.pdf",
"brand_perception_study.pdf"
]
survey_results = await survey_analyzer.analyze_survey_data(survey_docs)
print(f"Surveys analyzed: {len(survey_results['survey_metadata'])}")
print(f"Key findings: {len(survey_results['key_findings'])}")
async def analyze_industry_reports(report_paths: List[str]):
"""Comprehensive industry report analysis"""
async with AsyncLexa(api_key="your-api-key") as client:
documents = await client.parse(report_paths)
industry_analysis = {
'market_size_trends': {},
'competitive_landscape': {},
'technology_trends': {},
'regulatory_changes': {},
'investment_patterns': {}
}
for doc in documents:
# Extract market sizing information
market_data = extract_market_sizing(doc)
industry_analysis['market_size_trends'][doc.filename] = market_data
# Analyze competitive mentions
competitive_data = analyze_competitive_mentions(doc)
industry_analysis['competitive_landscape'][doc.filename] = competitive_data
# Extract technology trends
tech_trends = extract_technology_trends(doc)
industry_analysis['technology_trends'][doc.filename] = tech_trends
return industry_analysis
def extract_market_sizing(document):
"""Extract market sizing data from document"""
content = document.content
# Market size patterns
size_patterns = [
r'market.*?worth.*?\$?([\d,\.]+)\s*(billion|million|trillion)',
r'industry.*?valued.*?\$?([\d,\.]+)\s*(billion|million|trillion)',
r'revenue.*?\$?([\d,\.]+)\s*(billion|million|trillion)'
]
market_sizes = []
for pattern in size_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
for match in matches:
market_sizes.append({
'value': match[0],
'unit': match[1],
'full_value': f"${match[0]} {match[1]}"
})
return {
'market_sizes': market_sizes,
'count': len(market_sizes),
'tables_with_data': len(document.tables)
}
class CompetitiveDashboard:
def __init__(self, api_key: str):
self.client = AsyncLexa(api_key=api_key)
self.competitor_data = {}
async def build_competitive_dashboard(self, competitor_docs: Dict[str, List[str]]):
"""Build comprehensive competitive intelligence dashboard"""
dashboard_data = {
'competitors': {},
'market_positioning': {},
'performance_metrics': {},
'strategic_initiatives': {}
}
async with self.client:
for competitor, doc_paths in competitor_docs.items():
documents = await self.client.parse(doc_paths)
competitor_profile = {
'documents_analyzed': len(documents),
'total_pages': sum(doc.total_pages for doc in documents),
'revenue_data': self._extract_financial_data(documents),
'product_portfolio': self._extract_product_data(documents),
'market_position': self._assess_market_position(documents),
'recent_initiatives': self._extract_strategic_initiatives(documents)
}
dashboard_data['competitors'][competitor] = competitor_profile
return dashboard_data
def _extract_financial_data(self, documents):
"""Extract financial performance data"""
financial_data = {
'revenue_mentions': 0,
'growth_rates': [],
'profit_margins': [],
'market_share': []
}
for doc in documents:
content = doc.content
# Count revenue mentions
financial_data['revenue_mentions'] += len(doc.search_content("revenue|sales"))
# Extract growth rates
growth_pattern = r'growth.*?(\d+\.?\d*)%'
growth_matches = re.findall(growth_pattern, content, re.IGNORECASE)
financial_data['growth_rates'].extend(growth_matches)
# Extract market share
share_pattern = r'market share.*?(\d+\.?\d*)%'
share_matches = re.findall(share_pattern, content, re.IGNORECASE)
financial_data['market_share'].extend(share_matches)
return financial_data
# Usage
dashboard = CompetitiveDashboard("your-api-key")
competitor_documents = {
'Company_A': ['company_a_annual_report.pdf', 'company_a_strategy.pdf'],
'Company_B': ['company_b_earnings.pdf', 'company_b_presentation.pdf'],
'Company_C': ['company_c_research.pdf', 'company_c_analysis.pdf']
}
competitive_dashboard = await dashboard.build_competitive_dashboard(competitor_documents)
15 seconds for 200+ page market research report
95% accuracy in trend and metric extraction
100+ reports/hour for competitive intelligence
import pandas as pd
from cerevox import AsyncLexa
async def export_for_bi_tools(research_documents: List[str],
export_format: str = 'excel'):
"""Export market research data for BI tools"""
async with AsyncLexa(api_key="your-api-key") as client:
documents = await client.parse(research_documents)
# Structure data for BI consumption
market_data = []
for doc in documents:
# Extract structured insights
insights = {
'document_name': doc.filename,
'analysis_date': datetime.now().strftime('%Y-%m-%d'),
'pages': doc.total_pages,
'data_tables': len(doc.tables),
'market_mentions': len(doc.search_content("market")),
'growth_mentions': len(doc.search_content("growth")),
'competitor_mentions': len(doc.search_content("competitor|competition")),
'trend_mentions': len(doc.search_content("trend|trending"))
}
market_data.append(insights)
# Create DataFrame
df = pd.DataFrame(market_data)
# Export in requested format
if export_format == 'excel':
df.to_excel('market_research_analysis.xlsx', index=False)
elif export_format == 'csv':
df.to_csv('market_research_analysis.csv', index=False)
elif export_format == 'json':
df.to_json('market_research_analysis.json', orient='records')
return df
# Usage
df = await export_for_bi_tools([
"q3_market_report.pdf",
"competitor_analysis.pdf",
"industry_trends.pdf"
], export_format='excel')
print(f"Exported {len(df)} reports to Excel for BI analysis")
Start analyzing market research in minutes
Build market intelligence search systems
Optimize for large-scale research analysis
Explore advanced market data extraction