Extract actionable insights from market reports, surveys, and competitive intelligence
from cerevox import Lexa
import re
from typing import Dict, List
# Initialize client
client = Lexa(api_key="your-api-key")
def analyze_market_report(report_path: str):
"""Extract key market insights from research report"""
documents = client.parse(report_path)
report = documents[0]
# Extract market data
market_insights = {
'report_name': report.filename,
'total_pages': report.total_pages,
'market_size': extract_market_size(report.content),
'growth_rates': extract_growth_rates(report.content),
'key_players': extract_companies(report.content),
'market_trends': extract_trends(report.content),
'forecasts': extract_forecasts(report.content),
'data_tables': len(report.tables)
}
return market_insights
def extract_market_size(content: str) -> List[str]:
"""Extract market size mentions"""
patterns = [
r'market size.*?\$?([\d,\.]+)\s*(billion|million|trillion)',
r'valued at.*?\$?([\d,\.]+)\s*(billion|million|trillion)',
r'market worth.*?\$?([\d,\.]+)\s*(billion|million|trillion)'
]
market_sizes = []
for pattern in patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
market_sizes.extend([f"${match[0]} {match[1]}" for match in matches])
return list(set(market_sizes))
def extract_growth_rates(content: str) -> List[str]:
"""Extract growth rate mentions"""
growth_pattern = r'(?:growth|CAGR|increase).*?(\d+\.?\d*)%'
matches = re.findall(growth_pattern, content, re.IGNORECASE)
return [f"{match}%" for match in matches]
def extract_companies(content: str) -> List[str]:
"""Extract company mentions (simplified)"""
# Common company patterns
company_patterns = [
r'[A-Z][a-z]+ (?:Inc|Corp|LLC|Ltd|Co)\.?',
r'[A-Z][A-Za-z]+ [A-Z][a-z]+(?:\s+[A-Z][a-z]+)?'
]
companies = []
for pattern in company_patterns:
matches = re.findall(pattern, content)
companies.extend(matches)
# Remove duplicates and common false positives
companies = list(set(companies))
return [c for c in companies if len(c) > 3][:10] # Top 10
def extract_trends(content: str) -> List[str]:
"""Extract market trends"""
trend_keywords = [
'artificial intelligence', 'ai', 'machine learning',
'cloud computing', 'digital transformation',
'remote work', 'sustainability', 'automation'
]
found_trends = []
content_lower = content.lower()
for trend in trend_keywords:
if trend in content_lower:
found_trends.append(trend)
return found_trends
def extract_forecasts(content: str) -> List[str]:
"""Extract forecast information"""
forecast_patterns = [
r'forecast.*?(\d{4})',
r'projected.*?(\d{4})',
r'expected.*?(\d{4})',
r'by (\d{4})'
]
forecasts = []
for pattern in forecast_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
forecasts.extend(matches)
return list(set(forecasts))
# Usage
insights = analyze_market_report("market_research_2024.pdf")
print(f"Market sizes found: {insights['market_size']}")
print(f"Growth rates: {insights['growth_rates']}")
print(f"Key players: {insights['key_players'][:5]}")
from cerevox import AsyncLexa
import pandas as pd
from collections import defaultdict
from datetime import datetime
import re
class MarketTrendAnalyzer:
def __init__(self, api_key: str):
self.client = AsyncLexa(api_key=api_key)
self.trend_database = defaultdict(list)
async def analyze_market_trends(self,
research_documents: List[str],
trend_categories: List[str] = None) -> Dict:
"""Analyze market trends across multiple research documents"""
if trend_categories is None:
trend_categories = [
'artificial intelligence', 'cloud computing', 'sustainability',
'remote work', 'digital transformation', 'automation',
'cybersecurity', 'e-commerce', 'mobile technology'
]
async with self.client:
documents = await self.client.parse(research_documents)
trend_analysis = {
'trends_by_document': {},
'trend_frequency': defaultdict(int),
'trend_growth_indicators': {},
'emerging_trends': [],
'declining_trends': []
}
for doc in documents:
doc_trends = self._extract_document_trends(doc, trend_categories)
trend_analysis['trends_by_document'][doc.filename] = doc_trends
# Update frequency counts
for trend, data in doc_trends.items():
trend_analysis['trend_frequency'][trend] += data['mentions']
# Analyze trend growth indicators
for trend in trend_categories:
growth_data = self._analyze_trend_growth(documents, trend)
trend_analysis['trend_growth_indicators'][trend] = growth_data
# Identify emerging and declining trends
trend_analysis['emerging_trends'] = self._identify_emerging_trends(
trend_analysis['trend_frequency']
)
return trend_analysis
def _extract_document_trends(self, document, trend_categories: List[str]) -> Dict:
"""Extract trend mentions from a single document"""
doc_trends = {}
content_lower = document.content.lower()
for trend in trend_categories:
mentions = content_lower.count(trend.lower())
if mentions > 0:
# Look for growth indicators
growth_indicators = self._find_growth_indicators(document.content, trend)
# Extract trend context
trend_context = self._extract_trend_context(document.content, trend)
doc_trends[trend] = {
'mentions': mentions,
'growth_indicators': growth_indicators,
'context': trend_context[:3], # Top 3 contexts
'sentiment': self._assess_trend_sentiment(trend_context)
}
return doc_trends
def _find_growth_indicators(self, content: str, trend: str) -> List[str]:
"""Find growth indicators for a specific trend"""
growth_patterns = [
rf'{re.escape(trend)}.*?growing.*?(\d+\.?\d*)%',
rf'{re.escape(trend)}.*?increased.*?(\d+\.?\d*)%',
rf'{re.escape(trend)}.*?growth.*?(\d+\.?\d*)%',
rf'(\d+\.?\d*)%.*?growth.*?{re.escape(trend)}'
]
indicators = []
for pattern in growth_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
indicators.extend([f"{match}%" for match in matches])
return list(set(indicators))
def _extract_trend_context(self, content: str, trend: str) -> List[str]:
"""Extract contextual information about a trend"""
import re
# Find sentences containing the trend
sentences = content.split('.')
trend_contexts = []
for sentence in sentences:
if trend.lower() in sentence.lower():
# Clean and add context
cleaned_sentence = sentence.strip()
if len(cleaned_sentence) > 20: # Skip very short sentences
trend_contexts.append(cleaned_sentence)
return trend_contexts
def _assess_trend_sentiment(self, contexts: List[str]) -> str:
"""Assess sentiment around a trend (simplified)"""
positive_words = ['growth', 'increase', 'opportunity', 'potential', 'strong', 'rising']
negative_words = ['decline', 'decrease', 'challenge', 'weak', 'falling', 'struggle']
positive_score = 0
negative_score = 0
for context in contexts:
context_lower = context.lower()
positive_score += sum(1 for word in positive_words if word in context_lower)
negative_score += sum(1 for word in negative_words if word in context_lower)
if positive_score > negative_score:
return 'positive'
elif negative_score > positive_score:
return 'negative'
else:
return 'neutral'
def _analyze_trend_growth(self, documents, trend: str) -> Dict:
"""Analyze growth patterns for a specific trend"""
growth_data = {
'total_mentions': 0,
'documents_mentioning': 0,
'growth_rates': [],
'forecast_years': []
}
for doc in documents:
content = doc.content.lower()
if trend.lower() in content:
growth_data['documents_mentioning'] += 1
growth_data['total_mentions'] += content.count(trend.lower())
# Extract growth rates and forecasts
growth_indicators = self._find_growth_indicators(doc.content, trend)
growth_data['growth_rates'].extend(growth_indicators)
# Extract forecast years
forecast_pattern = rf'{re.escape(trend)}.*?(\d{{4}})'
forecast_matches = re.findall(forecast_pattern, doc.content, re.IGNORECASE)
growth_data['forecast_years'].extend(forecast_matches)
return growth_data
def _identify_emerging_trends(self, trend_frequency: Dict) -> List[str]:
"""Identify emerging trends based on frequency"""
# Simple heuristic: trends mentioned frequently but not overwhelmingly
frequencies = list(trend_frequency.values())
if not frequencies:
return []
avg_frequency = sum(frequencies) / len(frequencies)
emerging = []
for trend, freq in trend_frequency.items():
if avg_frequency * 0.5 <= freq <= avg_frequency * 1.5:
emerging.append(trend)
return emerging
# Usage
trend_analyzer = MarketTrendAnalyzer("your-api-key")
research_docs = [
"tech_trends_2024.pdf",
"market_forecast_report.pdf",
"industry_analysis_q3.pdf"
]
trend_analysis = await trend_analyzer.analyze_market_trends(research_docs)
print(f"Most mentioned trends: {dict(list(trend_analysis['trend_frequency'].items())[:5])}")
print(f"Emerging trends: {trend_analysis['emerging_trends']}")
class SurveyAnalyzer:
def __init__(self, api_key: str):
self.client = AsyncLexa(api_key=api_key)
async def analyze_survey_data(self, survey_documents: List[str]) -> Dict:
"""Analyze market survey documents"""
async with self.client:
documents = await self.client.parse(survey_documents)
survey_analysis = {
'survey_metadata': [],
'response_patterns': {},
'demographic_insights': {},
'satisfaction_metrics': {},
'key_findings': []
}
for doc in documents:
# Extract survey metadata
metadata = self._extract_survey_metadata(doc)
survey_analysis['survey_metadata'].append(metadata)
# Analyze response patterns
patterns = self._analyze_response_patterns(doc)
survey_analysis['response_patterns'][doc.filename] = patterns
# Extract satisfaction metrics
satisfaction = self._extract_satisfaction_metrics(doc)
survey_analysis['satisfaction_metrics'][doc.filename] = satisfaction
# Extract key findings
findings = self._extract_survey_findings(doc)
survey_analysis['key_findings'].extend(findings)
return survey_analysis
def _extract_survey_metadata(self, document) -> Dict:
"""Extract survey metadata and basic info"""
content = document.content
# Extract sample size
sample_patterns = [
r'sample size.*?(\d+)',
r'(\d+).*?respondents',
r'n\s*=\s*(\d+)'
]
sample_size = None
for pattern in sample_patterns:
match = re.search(pattern, content, re.IGNORECASE)
if match:
sample_size = int(match.group(1))
break
# Extract survey period
date_patterns = [
r'conducted.*?(\d{4})',
r'survey period.*?(\d{1,2}/\d{4})',
r'fieldwork.*?(\d{4})'
]
survey_period = None
for pattern in date_patterns:
match = re.search(pattern, content, re.IGNORECASE)
if match:
survey_period = match.group(1)
break
return {
'document': document.filename,
'pages': document.total_pages,
'sample_size': sample_size,
'survey_period': survey_period,
'tables_count': len(document.tables)
}
def _analyze_response_patterns(self, document) -> Dict:
"""Analyze survey response patterns"""
content = document.content
# Extract percentage responses
percentage_pattern = r'(\d+\.?\d*)%'
percentages = re.findall(percentage_pattern, content)
percentages = [float(p) for p in percentages if float(p) <= 100]
# Extract rating scales
rating_patterns = [
r'(\d+)/10',
r'(\d+) out of 10',
r'rated (\d+)'
]
ratings = []
for pattern in rating_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
ratings.extend([int(m) for m in matches])
return {
'percentage_responses': {
'count': len(percentages),
'average': sum(percentages) / len(percentages) if percentages else 0,
'distribution': self._categorize_percentages(percentages)
},
'rating_responses': {
'count': len(ratings),
'average': sum(ratings) / len(ratings) if ratings else 0,
'high_ratings': sum(1 for r in ratings if r >= 7)
}
}
def _categorize_percentages(self, percentages: List[float]) -> Dict:
"""Categorize percentage responses"""
categories = {
'high (70-100%)': sum(1 for p in percentages if p >= 70),
'medium (30-69%)': sum(1 for p in percentages if 30 <= p < 70),
'low (0-29%)': sum(1 for p in percentages if p < 30)
}
return categories
def _extract_satisfaction_metrics(self, document) -> Dict:
"""Extract customer satisfaction metrics"""
content = document.content.lower()
satisfaction_keywords = {
'very_satisfied': ['very satisfied', 'extremely satisfied', 'highly satisfied'],
'satisfied': ['satisfied', 'pleased', 'happy'],
'neutral': ['neutral', 'neither', 'average'],
'dissatisfied': ['dissatisfied', 'unhappy', 'disappointed'],
'very_dissatisfied': ['very dissatisfied', 'extremely dissatisfied']
}
satisfaction_scores = {}
for category, keywords in satisfaction_keywords.items():
score = sum(content.count(keyword) for keyword in keywords)
satisfaction_scores[category] = score
return satisfaction_scores
def _extract_survey_findings(self, document) -> List[str]:
"""Extract key survey findings"""
finding_indicators = [
'key finding', 'main finding', 'important finding',
'conclusion', 'result shows', 'data reveals'
]
findings = []
for indicator in finding_indicators:
matches = document.search_content(indicator)
findings.extend(matches)
return findings[:5] # Top 5 findings per document
# Usage
survey_analyzer = SurveyAnalyzer("your-api-key")
survey_docs = [
"customer_satisfaction_2024.pdf",
"market_research_survey.pdf",
"brand_perception_study.pdf"
]
survey_results = await survey_analyzer.analyze_survey_data(survey_docs)
print(f"Surveys analyzed: {len(survey_results['survey_metadata'])}")
print(f"Key findings: {len(survey_results['key_findings'])}")
async def analyze_industry_reports(report_paths: List[str]):
"""Comprehensive industry report analysis"""
async with AsyncLexa(api_key="your-api-key") as client:
documents = await client.parse(report_paths)
industry_analysis = {
'market_size_trends': {},
'competitive_landscape': {},
'technology_trends': {},
'regulatory_changes': {},
'investment_patterns': {}
}
for doc in documents:
# Extract market sizing information
market_data = extract_market_sizing(doc)
industry_analysis['market_size_trends'][doc.filename] = market_data
# Analyze competitive mentions
competitive_data = analyze_competitive_mentions(doc)
industry_analysis['competitive_landscape'][doc.filename] = competitive_data
# Extract technology trends
tech_trends = extract_technology_trends(doc)
industry_analysis['technology_trends'][doc.filename] = tech_trends
return industry_analysis
def extract_market_sizing(document):
"""Extract market sizing data from document"""
content = document.content
# Market size patterns
size_patterns = [
r'market.*?worth.*?\$?([\d,\.]+)\s*(billion|million|trillion)',
r'industry.*?valued.*?\$?([\d,\.]+)\s*(billion|million|trillion)',
r'revenue.*?\$?([\d,\.]+)\s*(billion|million|trillion)'
]
market_sizes = []
for pattern in size_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
for match in matches:
market_sizes.append({
'value': match[0],
'unit': match[1],
'full_value': f"${match[0]} {match[1]}"
})
return {
'market_sizes': market_sizes,
'count': len(market_sizes),
'tables_with_data': len(document.tables)
}
class CompetitiveDashboard:
def __init__(self, api_key: str):
self.client = AsyncLexa(api_key=api_key)
self.competitor_data = {}
async def build_competitive_dashboard(self, competitor_docs: Dict[str, List[str]]):
"""Build comprehensive competitive intelligence dashboard"""
dashboard_data = {
'competitors': {},
'market_positioning': {},
'performance_metrics': {},
'strategic_initiatives': {}
}
async with self.client:
for competitor, doc_paths in competitor_docs.items():
documents = await self.client.parse(doc_paths)
competitor_profile = {
'documents_analyzed': len(documents),
'total_pages': sum(doc.total_pages for doc in documents),
'revenue_data': self._extract_financial_data(documents),
'product_portfolio': self._extract_product_data(documents),
'market_position': self._assess_market_position(documents),
'recent_initiatives': self._extract_strategic_initiatives(documents)
}
dashboard_data['competitors'][competitor] = competitor_profile
return dashboard_data
def _extract_financial_data(self, documents):
"""Extract financial performance data"""
financial_data = {
'revenue_mentions': 0,
'growth_rates': [],
'profit_margins': [],
'market_share': []
}
for doc in documents:
content = doc.content
# Count revenue mentions
financial_data['revenue_mentions'] += len(doc.search_content("revenue|sales"))
# Extract growth rates
growth_pattern = r'growth.*?(\d+\.?\d*)%'
growth_matches = re.findall(growth_pattern, content, re.IGNORECASE)
financial_data['growth_rates'].extend(growth_matches)
# Extract market share
share_pattern = r'market share.*?(\d+\.?\d*)%'
share_matches = re.findall(share_pattern, content, re.IGNORECASE)
financial_data['market_share'].extend(share_matches)
return financial_data
# Usage
dashboard = CompetitiveDashboard("your-api-key")
competitor_documents = {
'Company_A': ['company_a_annual_report.pdf', 'company_a_strategy.pdf'],
'Company_B': ['company_b_earnings.pdf', 'company_b_presentation.pdf'],
'Company_C': ['company_c_research.pdf', 'company_c_analysis.pdf']
}
competitive_dashboard = await dashboard.build_competitive_dashboard(competitor_documents)
import pandas as pd
from cerevox import AsyncLexa
async def export_for_bi_tools(research_documents: List[str],
export_format: str = 'excel'):
"""Export market research data for BI tools"""
async with AsyncLexa(api_key="your-api-key") as client:
documents = await client.parse(research_documents)
# Structure data for BI consumption
market_data = []
for doc in documents:
# Extract structured insights
insights = {
'document_name': doc.filename,
'analysis_date': datetime.now().strftime('%Y-%m-%d'),
'pages': doc.total_pages,
'data_tables': len(doc.tables),
'market_mentions': len(doc.search_content("market")),
'growth_mentions': len(doc.search_content("growth")),
'competitor_mentions': len(doc.search_content("competitor|competition")),
'trend_mentions': len(doc.search_content("trend|trending"))
}
market_data.append(insights)
# Create DataFrame
df = pd.DataFrame(market_data)
# Export in requested format
if export_format == 'excel':
df.to_excel('market_research_analysis.xlsx', index=False)
elif export_format == 'csv':
df.to_csv('market_research_analysis.csv', index=False)
elif export_format == 'json':
df.to_json('market_research_analysis.json', orient='records')
return df
# Usage
df = await export_for_bi_tools([
"q3_market_report.pdf",
"competitor_analysis.pdf",
"industry_trends.pdf"
], export_format='excel')
print(f"Exported {len(df)} reports to Excel for BI analysis")