Market Research with Lexa

Transform market research documents into actionable business intelligence. Lexa’s advanced parsing extracts trends, metrics, and insights from complex market reports with precision.

Why Lexa for Market Research?

Data Table Extraction

Parse complex market data tables with 99.8% accuracy

Trend Identification

Extract key metrics and growth trends automatically

Multi-Source Analysis

Aggregate insights across multiple research sources

Competitive Intelligence

Extract competitor data and market positioning

Market Research Applications

  • Industry Reports (Gartner, McKinsey, Forrester research)
  • Market Surveys (consumer insights, market sizing)
  • Competitive Analysis (competitor reports, product comparisons)
  • Financial Research (analyst reports, market forecasts)
  • Customer Research (satisfaction surveys, feedback analysis)
  • Trend Analysis (technology trends, market dynamics)
  • Regulatory Research (compliance reports, policy analysis)

Quick Start: Market Analysis

Extract key insights from market research documents:

from cerevox import Lexa
import re
from typing import Dict, List

# Initialize client
client = Lexa(api_key="your-api-key")

def analyze_market_report(report_path: str):
    """Extract key market insights from research report"""
    
    documents = client.parse(report_path)
    report = documents[0]
    
    # Extract market data
    market_insights = {
        'report_name': report.filename,
        'total_pages': report.total_pages,
        'market_size': extract_market_size(report.content),
        'growth_rates': extract_growth_rates(report.content),
        'key_players': extract_companies(report.content),
        'market_trends': extract_trends(report.content),
        'forecasts': extract_forecasts(report.content),
        'data_tables': len(report.tables)
    }
    
    return market_insights

def extract_market_size(content: str) -> List[str]:
    """Extract market size mentions"""
    patterns = [
        r'market size.*?\$?([\d,\.]+)\s*(billion|million|trillion)',
        r'valued at.*?\$?([\d,\.]+)\s*(billion|million|trillion)',
        r'market worth.*?\$?([\d,\.]+)\s*(billion|million|trillion)'
    ]
    
    market_sizes = []
    for pattern in patterns:
        matches = re.findall(pattern, content, re.IGNORECASE)
        market_sizes.extend([f"${match[0]} {match[1]}" for match in matches])
    
    return list(set(market_sizes))

def extract_growth_rates(content: str) -> List[str]:
    """Extract growth rate mentions"""
    growth_pattern = r'(?:growth|CAGR|increase).*?(\d+\.?\d*)%'
    matches = re.findall(growth_pattern, content, re.IGNORECASE)
    return [f"{match}%" for match in matches]

def extract_companies(content: str) -> List[str]:
    """Extract company mentions (simplified)"""
    # Common company patterns
    company_patterns = [
        r'[A-Z][a-z]+ (?:Inc|Corp|LLC|Ltd|Co)\.?',
        r'[A-Z][A-Za-z]+ [A-Z][a-z]+(?:\s+[A-Z][a-z]+)?'
    ]
    
    companies = []
    for pattern in company_patterns:
        matches = re.findall(pattern, content)
        companies.extend(matches)
    
    # Remove duplicates and common false positives
    companies = list(set(companies))
    return [c for c in companies if len(c) > 3][:10]  # Top 10

def extract_trends(content: str) -> List[str]:
    """Extract market trends"""
    trend_keywords = [
        'artificial intelligence', 'ai', 'machine learning',
        'cloud computing', 'digital transformation',
        'remote work', 'sustainability', 'automation'
    ]
    
    found_trends = []
    content_lower = content.lower()
    
    for trend in trend_keywords:
        if trend in content_lower:
            found_trends.append(trend)
    
    return found_trends

def extract_forecasts(content: str) -> List[str]:
    """Extract forecast information"""
    forecast_patterns = [
        r'forecast.*?(\d{4})',
        r'projected.*?(\d{4})',
        r'expected.*?(\d{4})',
        r'by (\d{4})'
    ]
    
    forecasts = []
    for pattern in forecast_patterns:
        matches = re.findall(pattern, content, re.IGNORECASE)
        forecasts.extend(matches)
    
    return list(set(forecasts))

# Usage
insights = analyze_market_report("market_research_2024.pdf")
print(f"Market sizes found: {insights['market_size']}")
print(f"Growth rates: {insights['growth_rates']}")
print(f"Key players: {insights['key_players'][:5]}")

Advanced Market Research Analytics

Market Trend Analysis

Track and analyze market trends across multiple reports:

from cerevox import AsyncLexa
import pandas as pd
from collections import defaultdict
from datetime import datetime
import re

class MarketTrendAnalyzer:
    def __init__(self, api_key: str):
        self.client = AsyncLexa(api_key=api_key)
        self.trend_database = defaultdict(list)
    
    async def analyze_market_trends(self, 
                                  research_documents: List[str],
                                  trend_categories: List[str] = None) -> Dict:
        """Analyze market trends across multiple research documents"""
        
        if trend_categories is None:
            trend_categories = [
                'artificial intelligence', 'cloud computing', 'sustainability',
                'remote work', 'digital transformation', 'automation',
                'cybersecurity', 'e-commerce', 'mobile technology'
            ]
        
        async with self.client:
            documents = await self.client.parse(research_documents)
            
            trend_analysis = {
                'trends_by_document': {},
                'trend_frequency': defaultdict(int),
                'trend_growth_indicators': {},
                'emerging_trends': [],
                'declining_trends': []
            }
            
            for doc in documents:
                doc_trends = self._extract_document_trends(doc, trend_categories)
                trend_analysis['trends_by_document'][doc.filename] = doc_trends
                
                # Update frequency counts
                for trend, data in doc_trends.items():
                    trend_analysis['trend_frequency'][trend] += data['mentions']
            
            # Analyze trend growth indicators
            for trend in trend_categories:
                growth_data = self._analyze_trend_growth(documents, trend)
                trend_analysis['trend_growth_indicators'][trend] = growth_data
            
            # Identify emerging and declining trends
            trend_analysis['emerging_trends'] = self._identify_emerging_trends(
                trend_analysis['trend_frequency']
            )
            
            return trend_analysis
    
    def _extract_document_trends(self, document, trend_categories: List[str]) -> Dict:
        """Extract trend mentions from a single document"""
        doc_trends = {}
        content_lower = document.content.lower()
        
        for trend in trend_categories:
            mentions = content_lower.count(trend.lower())
            
            if mentions > 0:
                # Look for growth indicators
                growth_indicators = self._find_growth_indicators(document.content, trend)
                
                # Extract trend context
                trend_context = self._extract_trend_context(document.content, trend)
                
                doc_trends[trend] = {
                    'mentions': mentions,
                    'growth_indicators': growth_indicators,
                    'context': trend_context[:3],  # Top 3 contexts
                    'sentiment': self._assess_trend_sentiment(trend_context)
                }
        
        return doc_trends
    
    def _find_growth_indicators(self, content: str, trend: str) -> List[str]:
        """Find growth indicators for a specific trend"""
        growth_patterns = [
            rf'{re.escape(trend)}.*?growing.*?(\d+\.?\d*)%',
            rf'{re.escape(trend)}.*?increased.*?(\d+\.?\d*)%',
            rf'{re.escape(trend)}.*?growth.*?(\d+\.?\d*)%',
            rf'(\d+\.?\d*)%.*?growth.*?{re.escape(trend)}'
        ]
        
        indicators = []
        for pattern in growth_patterns:
            matches = re.findall(pattern, content, re.IGNORECASE)
            indicators.extend([f"{match}%" for match in matches])
        
        return list(set(indicators))
    
    def _extract_trend_context(self, content: str, trend: str) -> List[str]:
        """Extract contextual information about a trend"""
        import re
        
        # Find sentences containing the trend
        sentences = content.split('.')
        trend_contexts = []
        
        for sentence in sentences:
            if trend.lower() in sentence.lower():
                # Clean and add context
                cleaned_sentence = sentence.strip()
                if len(cleaned_sentence) > 20:  # Skip very short sentences
                    trend_contexts.append(cleaned_sentence)
        
        return trend_contexts
    
    def _assess_trend_sentiment(self, contexts: List[str]) -> str:
        """Assess sentiment around a trend (simplified)"""
        positive_words = ['growth', 'increase', 'opportunity', 'potential', 'strong', 'rising']
        negative_words = ['decline', 'decrease', 'challenge', 'weak', 'falling', 'struggle']
        
        positive_score = 0
        negative_score = 0
        
        for context in contexts:
            context_lower = context.lower()
            positive_score += sum(1 for word in positive_words if word in context_lower)
            negative_score += sum(1 for word in negative_words if word in context_lower)
        
        if positive_score > negative_score:
            return 'positive'
        elif negative_score > positive_score:
            return 'negative'
        else:
            return 'neutral'
    
    def _analyze_trend_growth(self, documents, trend: str) -> Dict:
        """Analyze growth patterns for a specific trend"""
        growth_data = {
            'total_mentions': 0,
            'documents_mentioning': 0,
            'growth_rates': [],
            'forecast_years': []
        }
        
        for doc in documents:
            content = doc.content.lower()
            
            if trend.lower() in content:
                growth_data['documents_mentioning'] += 1
                growth_data['total_mentions'] += content.count(trend.lower())
                
                # Extract growth rates and forecasts
                growth_indicators = self._find_growth_indicators(doc.content, trend)
                growth_data['growth_rates'].extend(growth_indicators)
                
                # Extract forecast years
                forecast_pattern = rf'{re.escape(trend)}.*?(\d{{4}})'
                forecast_matches = re.findall(forecast_pattern, doc.content, re.IGNORECASE)
                growth_data['forecast_years'].extend(forecast_matches)
        
        return growth_data
    
    def _identify_emerging_trends(self, trend_frequency: Dict) -> List[str]:
        """Identify emerging trends based on frequency"""
        # Simple heuristic: trends mentioned frequently but not overwhelmingly
        frequencies = list(trend_frequency.values())
        if not frequencies:
            return []
        
        avg_frequency = sum(frequencies) / len(frequencies)
        
        emerging = []
        for trend, freq in trend_frequency.items():
            if avg_frequency * 0.5 <= freq <= avg_frequency * 1.5:
                emerging.append(trend)
        
        return emerging

# Usage
trend_analyzer = MarketTrendAnalyzer("your-api-key")

research_docs = [
    "tech_trends_2024.pdf",
    "market_forecast_report.pdf",
    "industry_analysis_q3.pdf"
]

trend_analysis = await trend_analyzer.analyze_market_trends(research_docs)

print(f"Most mentioned trends: {dict(list(trend_analysis['trend_frequency'].items())[:5])}")
print(f"Emerging trends: {trend_analysis['emerging_trends']}")

Survey Data Analysis

Process market survey documents and extract insights:

class SurveyAnalyzer:
    def __init__(self, api_key: str):
        self.client = AsyncLexa(api_key=api_key)
    
    async def analyze_survey_data(self, survey_documents: List[str]) -> Dict:
        """Analyze market survey documents"""
        
        async with self.client:
            documents = await self.client.parse(survey_documents)
            
            survey_analysis = {
                'survey_metadata': [],
                'response_patterns': {},
                'demographic_insights': {},
                'satisfaction_metrics': {},
                'key_findings': []
            }
            
            for doc in documents:
                # Extract survey metadata
                metadata = self._extract_survey_metadata(doc)
                survey_analysis['survey_metadata'].append(metadata)
                
                # Analyze response patterns
                patterns = self._analyze_response_patterns(doc)
                survey_analysis['response_patterns'][doc.filename] = patterns
                
                # Extract satisfaction metrics
                satisfaction = self._extract_satisfaction_metrics(doc)
                survey_analysis['satisfaction_metrics'][doc.filename] = satisfaction
                
                # Extract key findings
                findings = self._extract_survey_findings(doc)
                survey_analysis['key_findings'].extend(findings)
            
            return survey_analysis
    
    def _extract_survey_metadata(self, document) -> Dict:
        """Extract survey metadata and basic info"""
        content = document.content
        
        # Extract sample size
        sample_patterns = [
            r'sample size.*?(\d+)',
            r'(\d+).*?respondents',
            r'n\s*=\s*(\d+)'
        ]
        
        sample_size = None
        for pattern in sample_patterns:
            match = re.search(pattern, content, re.IGNORECASE)
            if match:
                sample_size = int(match.group(1))
                break
        
        # Extract survey period
        date_patterns = [
            r'conducted.*?(\d{4})',
            r'survey period.*?(\d{1,2}/\d{4})',
            r'fieldwork.*?(\d{4})'
        ]
        
        survey_period = None
        for pattern in date_patterns:
            match = re.search(pattern, content, re.IGNORECASE)
            if match:
                survey_period = match.group(1)
                break
        
        return {
            'document': document.filename,
            'pages': document.total_pages,
            'sample_size': sample_size,
            'survey_period': survey_period,
            'tables_count': len(document.tables)
        }
    
    def _analyze_response_patterns(self, document) -> Dict:
        """Analyze survey response patterns"""
        content = document.content
        
        # Extract percentage responses
        percentage_pattern = r'(\d+\.?\d*)%'
        percentages = re.findall(percentage_pattern, content)
        percentages = [float(p) for p in percentages if float(p) <= 100]
        
        # Extract rating scales
        rating_patterns = [
            r'(\d+)/10',
            r'(\d+) out of 10',
            r'rated (\d+)'
        ]
        
        ratings = []
        for pattern in rating_patterns:
            matches = re.findall(pattern, content, re.IGNORECASE)
            ratings.extend([int(m) for m in matches])
        
        return {
            'percentage_responses': {
                'count': len(percentages),
                'average': sum(percentages) / len(percentages) if percentages else 0,
                'distribution': self._categorize_percentages(percentages)
            },
            'rating_responses': {
                'count': len(ratings),
                'average': sum(ratings) / len(ratings) if ratings else 0,
                'high_ratings': sum(1 for r in ratings if r >= 7)
            }
        }
    
    def _categorize_percentages(self, percentages: List[float]) -> Dict:
        """Categorize percentage responses"""
        categories = {
            'high (70-100%)': sum(1 for p in percentages if p >= 70),
            'medium (30-69%)': sum(1 for p in percentages if 30 <= p < 70),
            'low (0-29%)': sum(1 for p in percentages if p < 30)
        }
        return categories
    
    def _extract_satisfaction_metrics(self, document) -> Dict:
        """Extract customer satisfaction metrics"""
        content = document.content.lower()
        
        satisfaction_keywords = {
            'very_satisfied': ['very satisfied', 'extremely satisfied', 'highly satisfied'],
            'satisfied': ['satisfied', 'pleased', 'happy'],
            'neutral': ['neutral', 'neither', 'average'],
            'dissatisfied': ['dissatisfied', 'unhappy', 'disappointed'],
            'very_dissatisfied': ['very dissatisfied', 'extremely dissatisfied']
        }
        
        satisfaction_scores = {}
        for category, keywords in satisfaction_keywords.items():
            score = sum(content.count(keyword) for keyword in keywords)
            satisfaction_scores[category] = score
        
        return satisfaction_scores
    
    def _extract_survey_findings(self, document) -> List[str]:
        """Extract key survey findings"""
        finding_indicators = [
            'key finding', 'main finding', 'important finding',
            'conclusion', 'result shows', 'data reveals'
        ]
        
        findings = []
        for indicator in finding_indicators:
            matches = document.search_content(indicator)
            findings.extend(matches)
        
        return findings[:5]  # Top 5 findings per document

# Usage
survey_analyzer = SurveyAnalyzer("your-api-key")

survey_docs = [
    "customer_satisfaction_2024.pdf",
    "market_research_survey.pdf",
    "brand_perception_study.pdf"
]

survey_results = await survey_analyzer.analyze_survey_data(survey_docs)
print(f"Surveys analyzed: {len(survey_results['survey_metadata'])}")
print(f"Key findings: {len(survey_results['key_findings'])}")

Real-World Market Research Use Cases

Industry Report Analysis

async def analyze_industry_reports(report_paths: List[str]):
    """Comprehensive industry report analysis"""
    
    async with AsyncLexa(api_key="your-api-key") as client:
        documents = await client.parse(report_paths)
        
        industry_analysis = {
            'market_size_trends': {},
            'competitive_landscape': {},
            'technology_trends': {},
            'regulatory_changes': {},
            'investment_patterns': {}
        }
        
        for doc in documents:
            # Extract market sizing information
            market_data = extract_market_sizing(doc)
            industry_analysis['market_size_trends'][doc.filename] = market_data
            
            # Analyze competitive mentions
            competitive_data = analyze_competitive_mentions(doc)
            industry_analysis['competitive_landscape'][doc.filename] = competitive_data
            
            # Extract technology trends
            tech_trends = extract_technology_trends(doc)
            industry_analysis['technology_trends'][doc.filename] = tech_trends
        
        return industry_analysis

def extract_market_sizing(document):
    """Extract market sizing data from document"""
    content = document.content
    
    # Market size patterns
    size_patterns = [
        r'market.*?worth.*?\$?([\d,\.]+)\s*(billion|million|trillion)',
        r'industry.*?valued.*?\$?([\d,\.]+)\s*(billion|million|trillion)',
        r'revenue.*?\$?([\d,\.]+)\s*(billion|million|trillion)'
    ]
    
    market_sizes = []
    for pattern in size_patterns:
        matches = re.findall(pattern, content, re.IGNORECASE)
        for match in matches:
            market_sizes.append({
                'value': match[0],
                'unit': match[1],
                'full_value': f"${match[0]} {match[1]}"
            })
    
    return {
        'market_sizes': market_sizes,
        'count': len(market_sizes),
        'tables_with_data': len(document.tables)
    }

Competitive Intelligence Dashboard

class CompetitiveDashboard:
    def __init__(self, api_key: str):
        self.client = AsyncLexa(api_key=api_key)
        self.competitor_data = {}
    
    async def build_competitive_dashboard(self, competitor_docs: Dict[str, List[str]]):
        """Build comprehensive competitive intelligence dashboard"""
        
        dashboard_data = {
            'competitors': {},
            'market_positioning': {},
            'performance_metrics': {},
            'strategic_initiatives': {}
        }
        
        async with self.client:
            for competitor, doc_paths in competitor_docs.items():
                documents = await self.client.parse(doc_paths)
                
                competitor_profile = {
                    'documents_analyzed': len(documents),
                    'total_pages': sum(doc.total_pages for doc in documents),
                    'revenue_data': self._extract_financial_data(documents),
                    'product_portfolio': self._extract_product_data(documents),
                    'market_position': self._assess_market_position(documents),
                    'recent_initiatives': self._extract_strategic_initiatives(documents)
                }
                
                dashboard_data['competitors'][competitor] = competitor_profile
        
        return dashboard_data
    
    def _extract_financial_data(self, documents):
        """Extract financial performance data"""
        financial_data = {
            'revenue_mentions': 0,
            'growth_rates': [],
            'profit_margins': [],
            'market_share': []
        }
        
        for doc in documents:
            content = doc.content
            
            # Count revenue mentions
            financial_data['revenue_mentions'] += len(doc.search_content("revenue|sales"))
            
            # Extract growth rates
            growth_pattern = r'growth.*?(\d+\.?\d*)%'
            growth_matches = re.findall(growth_pattern, content, re.IGNORECASE)
            financial_data['growth_rates'].extend(growth_matches)
            
            # Extract market share
            share_pattern = r'market share.*?(\d+\.?\d*)%'
            share_matches = re.findall(share_pattern, content, re.IGNORECASE)
            financial_data['market_share'].extend(share_matches)
        
        return financial_data

# Usage
dashboard = CompetitiveDashboard("your-api-key")

competitor_documents = {
    'Company_A': ['company_a_annual_report.pdf', 'company_a_strategy.pdf'],
    'Company_B': ['company_b_earnings.pdf', 'company_b_presentation.pdf'],
    'Company_C': ['company_c_research.pdf', 'company_c_analysis.pdf']
}

competitive_dashboard = await dashboard.build_competitive_dashboard(competitor_documents)

Performance for Market Research

Report Processing

15 seconds for 200+ page market research report

Data Extraction

95% accuracy in trend and metric extraction

Multi-Source Analysis

100+ reports/hour for competitive intelligence

Export and Visualization

Export to Business Intelligence Tools

import pandas as pd
from cerevox import AsyncLexa

async def export_for_bi_tools(research_documents: List[str], 
                             export_format: str = 'excel'):
    """Export market research data for BI tools"""
    
    async with AsyncLexa(api_key="your-api-key") as client:
        documents = await client.parse(research_documents)
        
        # Structure data for BI consumption
        market_data = []
        
        for doc in documents:
            # Extract structured insights
            insights = {
                'document_name': doc.filename,
                'analysis_date': datetime.now().strftime('%Y-%m-%d'),
                'pages': doc.total_pages,
                'data_tables': len(doc.tables),
                'market_mentions': len(doc.search_content("market")),
                'growth_mentions': len(doc.search_content("growth")),
                'competitor_mentions': len(doc.search_content("competitor|competition")),
                'trend_mentions': len(doc.search_content("trend|trending"))
            }
            
            market_data.append(insights)
        
        # Create DataFrame
        df = pd.DataFrame(market_data)
        
        # Export in requested format
        if export_format == 'excel':
            df.to_excel('market_research_analysis.xlsx', index=False)
        elif export_format == 'csv':
            df.to_csv('market_research_analysis.csv', index=False)
        elif export_format == 'json':
            df.to_json('market_research_analysis.json', orient='records')
        
        return df

# Usage
df = await export_for_bi_tools([
    "q3_market_report.pdf",
    "competitor_analysis.pdf",
    "industry_trends.pdf"
], export_format='excel')

print(f"Exported {len(df)} reports to Excel for BI analysis")

Next Steps

Market Research with Lexa

Transform market research documents into actionable business intelligence. Lexa’s advanced parsing extracts trends, metrics, and insights from complex market reports with precision.

Why Lexa for Market Research?

Data Table Extraction

Parse complex market data tables with 99.8% accuracy

Trend Identification

Extract key metrics and growth trends automatically

Multi-Source Analysis

Aggregate insights across multiple research sources

Competitive Intelligence

Extract competitor data and market positioning

Market Research Applications

  • Industry Reports (Gartner, McKinsey, Forrester research)
  • Market Surveys (consumer insights, market sizing)
  • Competitive Analysis (competitor reports, product comparisons)
  • Financial Research (analyst reports, market forecasts)
  • Customer Research (satisfaction surveys, feedback analysis)
  • Trend Analysis (technology trends, market dynamics)
  • Regulatory Research (compliance reports, policy analysis)

Quick Start: Market Analysis

Extract key insights from market research documents:

from cerevox import Lexa
import re
from typing import Dict, List

# Initialize client
client = Lexa(api_key="your-api-key")

def analyze_market_report(report_path: str):
    """Extract key market insights from research report"""
    
    documents = client.parse(report_path)
    report = documents[0]
    
    # Extract market data
    market_insights = {
        'report_name': report.filename,
        'total_pages': report.total_pages,
        'market_size': extract_market_size(report.content),
        'growth_rates': extract_growth_rates(report.content),
        'key_players': extract_companies(report.content),
        'market_trends': extract_trends(report.content),
        'forecasts': extract_forecasts(report.content),
        'data_tables': len(report.tables)
    }
    
    return market_insights

def extract_market_size(content: str) -> List[str]:
    """Extract market size mentions"""
    patterns = [
        r'market size.*?\$?([\d,\.]+)\s*(billion|million|trillion)',
        r'valued at.*?\$?([\d,\.]+)\s*(billion|million|trillion)',
        r'market worth.*?\$?([\d,\.]+)\s*(billion|million|trillion)'
    ]
    
    market_sizes = []
    for pattern in patterns:
        matches = re.findall(pattern, content, re.IGNORECASE)
        market_sizes.extend([f"${match[0]} {match[1]}" for match in matches])
    
    return list(set(market_sizes))

def extract_growth_rates(content: str) -> List[str]:
    """Extract growth rate mentions"""
    growth_pattern = r'(?:growth|CAGR|increase).*?(\d+\.?\d*)%'
    matches = re.findall(growth_pattern, content, re.IGNORECASE)
    return [f"{match}%" for match in matches]

def extract_companies(content: str) -> List[str]:
    """Extract company mentions (simplified)"""
    # Common company patterns
    company_patterns = [
        r'[A-Z][a-z]+ (?:Inc|Corp|LLC|Ltd|Co)\.?',
        r'[A-Z][A-Za-z]+ [A-Z][a-z]+(?:\s+[A-Z][a-z]+)?'
    ]
    
    companies = []
    for pattern in company_patterns:
        matches = re.findall(pattern, content)
        companies.extend(matches)
    
    # Remove duplicates and common false positives
    companies = list(set(companies))
    return [c for c in companies if len(c) > 3][:10]  # Top 10

def extract_trends(content: str) -> List[str]:
    """Extract market trends"""
    trend_keywords = [
        'artificial intelligence', 'ai', 'machine learning',
        'cloud computing', 'digital transformation',
        'remote work', 'sustainability', 'automation'
    ]
    
    found_trends = []
    content_lower = content.lower()
    
    for trend in trend_keywords:
        if trend in content_lower:
            found_trends.append(trend)
    
    return found_trends

def extract_forecasts(content: str) -> List[str]:
    """Extract forecast information"""
    forecast_patterns = [
        r'forecast.*?(\d{4})',
        r'projected.*?(\d{4})',
        r'expected.*?(\d{4})',
        r'by (\d{4})'
    ]
    
    forecasts = []
    for pattern in forecast_patterns:
        matches = re.findall(pattern, content, re.IGNORECASE)
        forecasts.extend(matches)
    
    return list(set(forecasts))

# Usage
insights = analyze_market_report("market_research_2024.pdf")
print(f"Market sizes found: {insights['market_size']}")
print(f"Growth rates: {insights['growth_rates']}")
print(f"Key players: {insights['key_players'][:5]}")

Advanced Market Research Analytics

Market Trend Analysis

Track and analyze market trends across multiple reports:

from cerevox import AsyncLexa
import pandas as pd
from collections import defaultdict
from datetime import datetime
import re

class MarketTrendAnalyzer:
    def __init__(self, api_key: str):
        self.client = AsyncLexa(api_key=api_key)
        self.trend_database = defaultdict(list)
    
    async def analyze_market_trends(self, 
                                  research_documents: List[str],
                                  trend_categories: List[str] = None) -> Dict:
        """Analyze market trends across multiple research documents"""
        
        if trend_categories is None:
            trend_categories = [
                'artificial intelligence', 'cloud computing', 'sustainability',
                'remote work', 'digital transformation', 'automation',
                'cybersecurity', 'e-commerce', 'mobile technology'
            ]
        
        async with self.client:
            documents = await self.client.parse(research_documents)
            
            trend_analysis = {
                'trends_by_document': {},
                'trend_frequency': defaultdict(int),
                'trend_growth_indicators': {},
                'emerging_trends': [],
                'declining_trends': []
            }
            
            for doc in documents:
                doc_trends = self._extract_document_trends(doc, trend_categories)
                trend_analysis['trends_by_document'][doc.filename] = doc_trends
                
                # Update frequency counts
                for trend, data in doc_trends.items():
                    trend_analysis['trend_frequency'][trend] += data['mentions']
            
            # Analyze trend growth indicators
            for trend in trend_categories:
                growth_data = self._analyze_trend_growth(documents, trend)
                trend_analysis['trend_growth_indicators'][trend] = growth_data
            
            # Identify emerging and declining trends
            trend_analysis['emerging_trends'] = self._identify_emerging_trends(
                trend_analysis['trend_frequency']
            )
            
            return trend_analysis
    
    def _extract_document_trends(self, document, trend_categories: List[str]) -> Dict:
        """Extract trend mentions from a single document"""
        doc_trends = {}
        content_lower = document.content.lower()
        
        for trend in trend_categories:
            mentions = content_lower.count(trend.lower())
            
            if mentions > 0:
                # Look for growth indicators
                growth_indicators = self._find_growth_indicators(document.content, trend)
                
                # Extract trend context
                trend_context = self._extract_trend_context(document.content, trend)
                
                doc_trends[trend] = {
                    'mentions': mentions,
                    'growth_indicators': growth_indicators,
                    'context': trend_context[:3],  # Top 3 contexts
                    'sentiment': self._assess_trend_sentiment(trend_context)
                }
        
        return doc_trends
    
    def _find_growth_indicators(self, content: str, trend: str) -> List[str]:
        """Find growth indicators for a specific trend"""
        growth_patterns = [
            rf'{re.escape(trend)}.*?growing.*?(\d+\.?\d*)%',
            rf'{re.escape(trend)}.*?increased.*?(\d+\.?\d*)%',
            rf'{re.escape(trend)}.*?growth.*?(\d+\.?\d*)%',
            rf'(\d+\.?\d*)%.*?growth.*?{re.escape(trend)}'
        ]
        
        indicators = []
        for pattern in growth_patterns:
            matches = re.findall(pattern, content, re.IGNORECASE)
            indicators.extend([f"{match}%" for match in matches])
        
        return list(set(indicators))
    
    def _extract_trend_context(self, content: str, trend: str) -> List[str]:
        """Extract contextual information about a trend"""
        import re
        
        # Find sentences containing the trend
        sentences = content.split('.')
        trend_contexts = []
        
        for sentence in sentences:
            if trend.lower() in sentence.lower():
                # Clean and add context
                cleaned_sentence = sentence.strip()
                if len(cleaned_sentence) > 20:  # Skip very short sentences
                    trend_contexts.append(cleaned_sentence)
        
        return trend_contexts
    
    def _assess_trend_sentiment(self, contexts: List[str]) -> str:
        """Assess sentiment around a trend (simplified)"""
        positive_words = ['growth', 'increase', 'opportunity', 'potential', 'strong', 'rising']
        negative_words = ['decline', 'decrease', 'challenge', 'weak', 'falling', 'struggle']
        
        positive_score = 0
        negative_score = 0
        
        for context in contexts:
            context_lower = context.lower()
            positive_score += sum(1 for word in positive_words if word in context_lower)
            negative_score += sum(1 for word in negative_words if word in context_lower)
        
        if positive_score > negative_score:
            return 'positive'
        elif negative_score > positive_score:
            return 'negative'
        else:
            return 'neutral'
    
    def _analyze_trend_growth(self, documents, trend: str) -> Dict:
        """Analyze growth patterns for a specific trend"""
        growth_data = {
            'total_mentions': 0,
            'documents_mentioning': 0,
            'growth_rates': [],
            'forecast_years': []
        }
        
        for doc in documents:
            content = doc.content.lower()
            
            if trend.lower() in content:
                growth_data['documents_mentioning'] += 1
                growth_data['total_mentions'] += content.count(trend.lower())
                
                # Extract growth rates and forecasts
                growth_indicators = self._find_growth_indicators(doc.content, trend)
                growth_data['growth_rates'].extend(growth_indicators)
                
                # Extract forecast years
                forecast_pattern = rf'{re.escape(trend)}.*?(\d{{4}})'
                forecast_matches = re.findall(forecast_pattern, doc.content, re.IGNORECASE)
                growth_data['forecast_years'].extend(forecast_matches)
        
        return growth_data
    
    def _identify_emerging_trends(self, trend_frequency: Dict) -> List[str]:
        """Identify emerging trends based on frequency"""
        # Simple heuristic: trends mentioned frequently but not overwhelmingly
        frequencies = list(trend_frequency.values())
        if not frequencies:
            return []
        
        avg_frequency = sum(frequencies) / len(frequencies)
        
        emerging = []
        for trend, freq in trend_frequency.items():
            if avg_frequency * 0.5 <= freq <= avg_frequency * 1.5:
                emerging.append(trend)
        
        return emerging

# Usage
trend_analyzer = MarketTrendAnalyzer("your-api-key")

research_docs = [
    "tech_trends_2024.pdf",
    "market_forecast_report.pdf",
    "industry_analysis_q3.pdf"
]

trend_analysis = await trend_analyzer.analyze_market_trends(research_docs)

print(f"Most mentioned trends: {dict(list(trend_analysis['trend_frequency'].items())[:5])}")
print(f"Emerging trends: {trend_analysis['emerging_trends']}")

Survey Data Analysis

Process market survey documents and extract insights:

class SurveyAnalyzer:
    def __init__(self, api_key: str):
        self.client = AsyncLexa(api_key=api_key)
    
    async def analyze_survey_data(self, survey_documents: List[str]) -> Dict:
        """Analyze market survey documents"""
        
        async with self.client:
            documents = await self.client.parse(survey_documents)
            
            survey_analysis = {
                'survey_metadata': [],
                'response_patterns': {},
                'demographic_insights': {},
                'satisfaction_metrics': {},
                'key_findings': []
            }
            
            for doc in documents:
                # Extract survey metadata
                metadata = self._extract_survey_metadata(doc)
                survey_analysis['survey_metadata'].append(metadata)
                
                # Analyze response patterns
                patterns = self._analyze_response_patterns(doc)
                survey_analysis['response_patterns'][doc.filename] = patterns
                
                # Extract satisfaction metrics
                satisfaction = self._extract_satisfaction_metrics(doc)
                survey_analysis['satisfaction_metrics'][doc.filename] = satisfaction
                
                # Extract key findings
                findings = self._extract_survey_findings(doc)
                survey_analysis['key_findings'].extend(findings)
            
            return survey_analysis
    
    def _extract_survey_metadata(self, document) -> Dict:
        """Extract survey metadata and basic info"""
        content = document.content
        
        # Extract sample size
        sample_patterns = [
            r'sample size.*?(\d+)',
            r'(\d+).*?respondents',
            r'n\s*=\s*(\d+)'
        ]
        
        sample_size = None
        for pattern in sample_patterns:
            match = re.search(pattern, content, re.IGNORECASE)
            if match:
                sample_size = int(match.group(1))
                break
        
        # Extract survey period
        date_patterns = [
            r'conducted.*?(\d{4})',
            r'survey period.*?(\d{1,2}/\d{4})',
            r'fieldwork.*?(\d{4})'
        ]
        
        survey_period = None
        for pattern in date_patterns:
            match = re.search(pattern, content, re.IGNORECASE)
            if match:
                survey_period = match.group(1)
                break
        
        return {
            'document': document.filename,
            'pages': document.total_pages,
            'sample_size': sample_size,
            'survey_period': survey_period,
            'tables_count': len(document.tables)
        }
    
    def _analyze_response_patterns(self, document) -> Dict:
        """Analyze survey response patterns"""
        content = document.content
        
        # Extract percentage responses
        percentage_pattern = r'(\d+\.?\d*)%'
        percentages = re.findall(percentage_pattern, content)
        percentages = [float(p) for p in percentages if float(p) <= 100]
        
        # Extract rating scales
        rating_patterns = [
            r'(\d+)/10',
            r'(\d+) out of 10',
            r'rated (\d+)'
        ]
        
        ratings = []
        for pattern in rating_patterns:
            matches = re.findall(pattern, content, re.IGNORECASE)
            ratings.extend([int(m) for m in matches])
        
        return {
            'percentage_responses': {
                'count': len(percentages),
                'average': sum(percentages) / len(percentages) if percentages else 0,
                'distribution': self._categorize_percentages(percentages)
            },
            'rating_responses': {
                'count': len(ratings),
                'average': sum(ratings) / len(ratings) if ratings else 0,
                'high_ratings': sum(1 for r in ratings if r >= 7)
            }
        }
    
    def _categorize_percentages(self, percentages: List[float]) -> Dict:
        """Categorize percentage responses"""
        categories = {
            'high (70-100%)': sum(1 for p in percentages if p >= 70),
            'medium (30-69%)': sum(1 for p in percentages if 30 <= p < 70),
            'low (0-29%)': sum(1 for p in percentages if p < 30)
        }
        return categories
    
    def _extract_satisfaction_metrics(self, document) -> Dict:
        """Extract customer satisfaction metrics"""
        content = document.content.lower()
        
        satisfaction_keywords = {
            'very_satisfied': ['very satisfied', 'extremely satisfied', 'highly satisfied'],
            'satisfied': ['satisfied', 'pleased', 'happy'],
            'neutral': ['neutral', 'neither', 'average'],
            'dissatisfied': ['dissatisfied', 'unhappy', 'disappointed'],
            'very_dissatisfied': ['very dissatisfied', 'extremely dissatisfied']
        }
        
        satisfaction_scores = {}
        for category, keywords in satisfaction_keywords.items():
            score = sum(content.count(keyword) for keyword in keywords)
            satisfaction_scores[category] = score
        
        return satisfaction_scores
    
    def _extract_survey_findings(self, document) -> List[str]:
        """Extract key survey findings"""
        finding_indicators = [
            'key finding', 'main finding', 'important finding',
            'conclusion', 'result shows', 'data reveals'
        ]
        
        findings = []
        for indicator in finding_indicators:
            matches = document.search_content(indicator)
            findings.extend(matches)
        
        return findings[:5]  # Top 5 findings per document

# Usage
survey_analyzer = SurveyAnalyzer("your-api-key")

survey_docs = [
    "customer_satisfaction_2024.pdf",
    "market_research_survey.pdf",
    "brand_perception_study.pdf"
]

survey_results = await survey_analyzer.analyze_survey_data(survey_docs)
print(f"Surveys analyzed: {len(survey_results['survey_metadata'])}")
print(f"Key findings: {len(survey_results['key_findings'])}")

Real-World Market Research Use Cases

Industry Report Analysis

async def analyze_industry_reports(report_paths: List[str]):
    """Comprehensive industry report analysis"""
    
    async with AsyncLexa(api_key="your-api-key") as client:
        documents = await client.parse(report_paths)
        
        industry_analysis = {
            'market_size_trends': {},
            'competitive_landscape': {},
            'technology_trends': {},
            'regulatory_changes': {},
            'investment_patterns': {}
        }
        
        for doc in documents:
            # Extract market sizing information
            market_data = extract_market_sizing(doc)
            industry_analysis['market_size_trends'][doc.filename] = market_data
            
            # Analyze competitive mentions
            competitive_data = analyze_competitive_mentions(doc)
            industry_analysis['competitive_landscape'][doc.filename] = competitive_data
            
            # Extract technology trends
            tech_trends = extract_technology_trends(doc)
            industry_analysis['technology_trends'][doc.filename] = tech_trends
        
        return industry_analysis

def extract_market_sizing(document):
    """Extract market sizing data from document"""
    content = document.content
    
    # Market size patterns
    size_patterns = [
        r'market.*?worth.*?\$?([\d,\.]+)\s*(billion|million|trillion)',
        r'industry.*?valued.*?\$?([\d,\.]+)\s*(billion|million|trillion)',
        r'revenue.*?\$?([\d,\.]+)\s*(billion|million|trillion)'
    ]
    
    market_sizes = []
    for pattern in size_patterns:
        matches = re.findall(pattern, content, re.IGNORECASE)
        for match in matches:
            market_sizes.append({
                'value': match[0],
                'unit': match[1],
                'full_value': f"${match[0]} {match[1]}"
            })
    
    return {
        'market_sizes': market_sizes,
        'count': len(market_sizes),
        'tables_with_data': len(document.tables)
    }

Competitive Intelligence Dashboard

class CompetitiveDashboard:
    def __init__(self, api_key: str):
        self.client = AsyncLexa(api_key=api_key)
        self.competitor_data = {}
    
    async def build_competitive_dashboard(self, competitor_docs: Dict[str, List[str]]):
        """Build comprehensive competitive intelligence dashboard"""
        
        dashboard_data = {
            'competitors': {},
            'market_positioning': {},
            'performance_metrics': {},
            'strategic_initiatives': {}
        }
        
        async with self.client:
            for competitor, doc_paths in competitor_docs.items():
                documents = await self.client.parse(doc_paths)
                
                competitor_profile = {
                    'documents_analyzed': len(documents),
                    'total_pages': sum(doc.total_pages for doc in documents),
                    'revenue_data': self._extract_financial_data(documents),
                    'product_portfolio': self._extract_product_data(documents),
                    'market_position': self._assess_market_position(documents),
                    'recent_initiatives': self._extract_strategic_initiatives(documents)
                }
                
                dashboard_data['competitors'][competitor] = competitor_profile
        
        return dashboard_data
    
    def _extract_financial_data(self, documents):
        """Extract financial performance data"""
        financial_data = {
            'revenue_mentions': 0,
            'growth_rates': [],
            'profit_margins': [],
            'market_share': []
        }
        
        for doc in documents:
            content = doc.content
            
            # Count revenue mentions
            financial_data['revenue_mentions'] += len(doc.search_content("revenue|sales"))
            
            # Extract growth rates
            growth_pattern = r'growth.*?(\d+\.?\d*)%'
            growth_matches = re.findall(growth_pattern, content, re.IGNORECASE)
            financial_data['growth_rates'].extend(growth_matches)
            
            # Extract market share
            share_pattern = r'market share.*?(\d+\.?\d*)%'
            share_matches = re.findall(share_pattern, content, re.IGNORECASE)
            financial_data['market_share'].extend(share_matches)
        
        return financial_data

# Usage
dashboard = CompetitiveDashboard("your-api-key")

competitor_documents = {
    'Company_A': ['company_a_annual_report.pdf', 'company_a_strategy.pdf'],
    'Company_B': ['company_b_earnings.pdf', 'company_b_presentation.pdf'],
    'Company_C': ['company_c_research.pdf', 'company_c_analysis.pdf']
}

competitive_dashboard = await dashboard.build_competitive_dashboard(competitor_documents)

Performance for Market Research

Report Processing

15 seconds for 200+ page market research report

Data Extraction

95% accuracy in trend and metric extraction

Multi-Source Analysis

100+ reports/hour for competitive intelligence

Export and Visualization

Export to Business Intelligence Tools

import pandas as pd
from cerevox import AsyncLexa

async def export_for_bi_tools(research_documents: List[str], 
                             export_format: str = 'excel'):
    """Export market research data for BI tools"""
    
    async with AsyncLexa(api_key="your-api-key") as client:
        documents = await client.parse(research_documents)
        
        # Structure data for BI consumption
        market_data = []
        
        for doc in documents:
            # Extract structured insights
            insights = {
                'document_name': doc.filename,
                'analysis_date': datetime.now().strftime('%Y-%m-%d'),
                'pages': doc.total_pages,
                'data_tables': len(doc.tables),
                'market_mentions': len(doc.search_content("market")),
                'growth_mentions': len(doc.search_content("growth")),
                'competitor_mentions': len(doc.search_content("competitor|competition")),
                'trend_mentions': len(doc.search_content("trend|trending"))
            }
            
            market_data.append(insights)
        
        # Create DataFrame
        df = pd.DataFrame(market_data)
        
        # Export in requested format
        if export_format == 'excel':
            df.to_excel('market_research_analysis.xlsx', index=False)
        elif export_format == 'csv':
            df.to_csv('market_research_analysis.csv', index=False)
        elif export_format == 'json':
            df.to_json('market_research_analysis.json', orient='records')
        
        return df

# Usage
df = await export_for_bi_tools([
    "q3_market_report.pdf",
    "competitor_analysis.pdf",
    "industry_trends.pdf"
], export_format='excel')

print(f"Exported {len(df)} reports to Excel for BI analysis")

Next Steps