AI Model Training & Data Preparation with Lexa

Transform unstructured documents into high-quality training data for machine learning models. Lexa’s precision parsing creates clean, structured datasets from complex documents.

Why Lexa for AI/ML Workflows?

Clean Data Extraction

Extract structured data with 99%+ accuracy for training

Format Standardization

Normalize data across multiple document formats

Batch Processing

Process thousands of documents for large datasets

Feature Engineering

Extract features ready for ML pipelines

ML Data Preparation Use Cases

  • Text Classification Models (document categorization, sentiment analysis)
  • Named Entity Recognition (extract entities from parsed content)
  • Question-Answering Systems (create Q&A datasets from documents)
  • Summarization Models (extract summaries and key points)
  • Information Extraction (structured data from unstructured text)
  • OCR Post-Processing (clean and structure OCR output)
  • Document Similarity (create embeddings from parsed content)

Quick Start: ML Dataset Creation

Transform documents into ML-ready datasets:

from cerevox import Lexa
import pandas as pd
from sklearn.model_selection import train_test_split

# Initialize client
client = Lexa(api_key="your-api-key")

# Process labeled documents for classification
def create_classification_dataset(document_categories):
    """Create text classification dataset"""
    
    dataset = []
    
    for category, doc_paths in document_categories.items():
        documents = client.parse(doc_paths)
        
        for doc in documents:
            # Extract clean text chunks
            chunks = doc.get_text_chunks(target_size=512)
            
            for chunk in chunks:
                # Clean and prepare text
                cleaned_text = clean_text_for_ml(chunk)
                
                dataset.append({
                    'text': cleaned_text,
                    'label': category,
                    'source': doc.filename,
                    'chunk_length': len(cleaned_text),
                    'word_count': len(cleaned_text.split())
                })
    
    # Convert to DataFrame
    df = pd.DataFrame(dataset)
    
    # Split into train/val/test
    train_df, temp_df = train_test_split(df, test_size=0.3, stratify=df['label'])
    val_df, test_df = train_test_split(temp_df, test_size=0.5, stratify=temp_df['label'])
    
    return {
        'train': train_df,
        'validation': val_df,
        'test': test_df,
        'stats': df.groupby('label').size().to_dict()
    }

def clean_text_for_ml(text):
    """Clean text for ML training"""
    import re
    
    # Remove extra whitespace
    text = re.sub(r'\s+', ' ', text)
    
    # Remove special characters but keep punctuation
    text = re.sub(r'[^\w\s\.,!?;:-]', '', text)
    
    # Strip and return
    return text.strip()

# Usage
doc_categories = {
    'financial': ['financial_report1.pdf', 'financial_report2.pdf'],
    'legal': ['contract1.pdf', 'legal_doc1.pdf'],
    'technical': ['manual1.pdf', 'spec1.pdf']
}

dataset = create_classification_dataset(doc_categories)
print(f"Training samples: {len(dataset['train'])}")
print(f"Class distribution: {dataset['stats']}")

Advanced ML Data Workflows

Feature Engineering Pipeline

Extract ML-ready features from complex documents:

from cerevox import AsyncLexa
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from typing import List, Dict

class DocumentFeatureExtractor:
    def __init__(self, api_key: str):
        self.client = AsyncLexa(api_key=api_key)
        self.vectorizer = TfidfVectorizer(max_features=1000, stop_words='english')
    
    async def extract_comprehensive_features(self, documents: List[str]) -> pd.DataFrame:
        """Extract comprehensive features for ML models"""
        
        async with self.client:
            docs = await self.client.parse(documents)
            
            features = []
            all_texts = []
            
            for doc in docs:
                # Document-level features
                doc_features = {
                    'filename': doc.filename,
                    'total_pages': doc.total_pages,
                    'total_elements': doc.total_elements,
                    'table_count': len(doc.tables),
                    'image_count': len(doc.images) if hasattr(doc, 'images') else 0,
                    'file_size_kb': getattr(doc, 'file_size', 0) / 1024,
                }
                
                # Content-based features
                full_text = doc.content
                all_texts.append(full_text)
                
                # Text statistics
                doc_features.update({
                    'char_count': len(full_text),
                    'word_count': len(full_text.split()),
                    'sentence_count': len(full_text.split('.')),
                    'paragraph_count': len(full_text.split('\n\n')),
                    'avg_word_length': np.mean([len(word) for word in full_text.split()]),
                    'punctuation_ratio': sum(1 for c in full_text if c in '.,!?;:') / len(full_text),
                    'uppercase_ratio': sum(1 for c in full_text if c.isupper()) / len(full_text),
                    'digit_ratio': sum(1 for c in full_text if c.isdigit()) / len(full_text)
                })
                
                # Complexity features
                doc_features.update({
                    'lexical_diversity': self._calculate_lexical_diversity(full_text),
                    'readability_score': self._calculate_readability(full_text),
                    'formality_score': self._calculate_formality(full_text)
                })
                
                # Domain-specific features
                doc_features.update(self._extract_domain_features(full_text))
                
                features.append(doc_features)
            
            # Create DataFrame
            df = pd.DataFrame(features)
            
            # Add TF-IDF features
            tfidf_matrix = self.vectorizer.fit_transform(all_texts)
            tfidf_df = pd.DataFrame(
                tfidf_matrix.toarray(),
                columns=[f'tfidf_{i}' for i in range(tfidf_matrix.shape[1])]
            )
            
            # Combine all features
            final_df = pd.concat([df, tfidf_df], axis=1)
            
            return final_df
    
    def _calculate_lexical_diversity(self, text: str) -> float:
        """Calculate lexical diversity (unique words / total words)"""
        words = text.lower().split()
        if not words:
            return 0.0
        return len(set(words)) / len(words)
    
    def _calculate_readability(self, text: str) -> float:
        """Calculate simplified readability score"""
        sentences = text.split('.')
        words = text.split()
        
        if not sentences or not words:
            return 0.0
        
        avg_sentence_length = len(words) / len(sentences)
        avg_word_length = np.mean([len(word) for word in words])
        
        # Simplified readability formula
        return max(0, 100 - (avg_sentence_length * 0.5) - (avg_word_length * 2))
    
    def _calculate_formality(self, text: str) -> float:
        """Calculate formality score based on linguistic features"""
        formal_indicators = [
            'therefore', 'furthermore', 'moreover', 'consequently',
            'accordingly', 'nevertheless', 'notwithstanding'
        ]
        
        informal_indicators = [
            'gonna', 'wanna', 'kinda', 'sorta', 'yeah', 'okay'
        ]
        
        text_lower = text.lower()
        formal_count = sum(1 for indicator in formal_indicators if indicator in text_lower)
        informal_count = sum(1 for indicator in informal_indicators if indicator in text_lower)
        
        total_indicators = formal_count + informal_count
        if total_indicators == 0:
            return 0.5  # Neutral
        
        return formal_count / total_indicators
    
    def _extract_domain_features(self, text: str) -> Dict:
        """Extract domain-specific features"""
        text_lower = text.lower()
        
        return {
            'financial_terms': sum(1 for term in ['revenue', 'profit', 'loss', 'investment', 'roi'] 
                                 if term in text_lower),
            'legal_terms': sum(1 for term in ['contract', 'agreement', 'clause', 'liability', 'party'] 
                             if term in text_lower),
            'technical_terms': sum(1 for term in ['system', 'process', 'method', 'algorithm', 'data'] 
                                 if term in text_lower),
            'medical_terms': sum(1 for term in ['patient', 'treatment', 'diagnosis', 'medical', 'health'] 
                               if term in text_lower)
        }

# Usage
extractor = DocumentFeatureExtractor("your-api-key")

# Extract features from document collection
document_paths = [
    "financial_report.pdf",
    "legal_contract.pdf", 
    "technical_manual.pdf"
]

feature_df = await extractor.extract_comprehensive_features(document_paths)

print(f"Feature matrix shape: {feature_df.shape}")
print(f"Available features: {list(feature_df.columns)}")

# Use features for ML model training
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split

# Assuming you have labels
X = feature_df.drop(['filename'], axis=1).fillna(0)
y = ['financial', 'legal', 'technical']  # Your labels

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
model = RandomForestClassifier()
model.fit(X_train, y_train)

Training Data Quality Pipeline

Ensure high-quality training data:

class TrainingDataQualityPipeline:
    def __init__(self, api_key: str):
        self.client = AsyncLexa(api_key=api_key)
    
    async def validate_training_data(self, document_paths: List[str], 
                                   labels: List[str] = None) -> Dict:
        """Validate and analyze training data quality"""
        
        async with self.client:
            documents = await self.client.parse(document_paths)
            
            quality_report = {
                'total_documents': len(documents),
                'parsing_success_rate': len(documents) / len(document_paths),
                'content_analysis': {},
                'recommendations': []
            }
            
            # Analyze content quality
            text_lengths = []
            error_documents = []
            
            for i, doc in enumerate(documents):
                if not doc.content or len(doc.content.strip()) < 50:
                    error_documents.append({
                        'filename': doc.filename,
                        'issue': 'insufficient_content',
                        'content_length': len(doc.content) if doc.content else 0
                    })
                else:
                    text_lengths.append(len(doc.content))
            
            # Content statistics
            if text_lengths:
                quality_report['content_analysis'] = {
                    'avg_content_length': np.mean(text_lengths),
                    'min_content_length': min(text_lengths),
                    'max_content_length': max(text_lengths),
                    'std_content_length': np.std(text_lengths),
                    'documents_with_errors': len(error_documents)
                }
            
            # Generate recommendations
            if error_documents:
                quality_report['recommendations'].append({
                    'type': 'content_quality',
                    'message': f'{len(error_documents)} documents have insufficient content',
                    'action': 'Review and exclude documents with less than 50 characters'
                })
            
            if text_lengths:
                length_std = np.std(text_lengths)
                length_mean = np.mean(text_lengths)
                
                if length_std > length_mean * 0.5:
                    quality_report['recommendations'].append({
                        'type': 'length_variance',
                        'message': 'High variance in document lengths detected',
                        'action': 'Consider chunking strategy or separate models for different document types'
                    })
            
            # Label balance analysis (if labels provided)
            if labels:
                label_counts = pd.Series(labels).value_counts()
                min_class_size = label_counts.min()
                max_class_size = label_counts.max()
                
                if max_class_size > min_class_size * 3:
                    quality_report['recommendations'].append({
                        'type': 'class_imbalance',
                        'message': 'Significant class imbalance detected',
                        'action': 'Consider data augmentation or stratified sampling',
                        'label_distribution': label_counts.to_dict()
                    })
            
            return quality_report
    
    def suggest_chunking_strategy(self, documents, target_model: str = 'bert') -> Dict:
        """Suggest optimal chunking strategy for ML model"""
        
        # Model-specific recommendations
        chunking_recommendations = {
            'bert': {'target_size': 512, 'overlap': 50, 'tolerance': 0.1},
            'roberta': {'target_size': 512, 'overlap': 50, 'tolerance': 0.1},
            'longformer': {'target_size': 2048, 'overlap': 100, 'tolerance': 0.15},
            'bigbird': {'target_size': 2048, 'overlap': 100, 'tolerance': 0.15},
            'gpt': {'target_size': 1024, 'overlap': 0, 'tolerance': 0.1}
        }
        
        base_config = chunking_recommendations.get(target_model.lower(), {
            'target_size': 512, 'overlap': 50, 'tolerance': 0.1
        })
        
        # Analyze document characteristics
        doc_lengths = [len(doc.content) for doc in documents if doc.content]
        avg_length = np.mean(doc_lengths)
        
        # Adjust recommendations based on document characteristics
        if avg_length < 1000:
            # Short documents - use smaller chunks
            base_config['target_size'] = min(base_config['target_size'], 256)
        elif avg_length > 10000:
            # Long documents - consider larger chunks if model supports
            if target_model.lower() in ['longformer', 'bigbird']:
                base_config['target_size'] = 4096
        
        return {
            'recommended_config': base_config,
            'document_stats': {
                'avg_length': avg_length,
                'total_documents': len(documents),
                'min_length': min(doc_lengths) if doc_lengths else 0,
                'max_length': max(doc_lengths) if doc_lengths else 0
            },
            'rationale': f"Optimized for {target_model} with average document length {avg_length:.0f} characters"
        }

# Usage
quality_pipeline = TrainingDataQualityPipeline("your-api-key")

# Validate training data
document_paths = ["doc1.pdf", "doc2.pdf", "doc3.pdf"]
labels = ["class_a", "class_b", "class_a"]

quality_report = await quality_pipeline.validate_training_data(document_paths, labels)
print("Quality Report:", quality_report)

# Get chunking recommendations
docs = await AsyncLexa(api_key="your-api-key").parse(document_paths)
chunking_strategy = quality_pipeline.suggest_chunking_strategy(docs, target_model='bert')
print("Chunking Strategy:", chunking_strategy)

Real-World ML Use Cases

Document Classification Model Training

async def train_document_classifier(training_docs: Dict[str, List[str]]):
    """Complete pipeline for training document classifier"""
    
    # 1. Data preparation
    pipeline = MLDataPreparationPipeline("your-api-key")
    df = await pipeline.prepare_training_data(training_docs, task_type='classification')
    
    # 2. Feature extraction
    extractor = DocumentFeatureExtractor("your-api-key")
    all_docs = [doc for docs in training_docs.values() for doc in docs]
    features_df = await extractor.extract_comprehensive_features(all_docs)
    
    # 3. Combine data and features
    combined_df = df.merge(features_df, left_on='source_filename', right_on='filename')
    
    # 4. Train model
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.metrics import classification_report
    
    feature_columns = [col for col in combined_df.columns 
                      if col not in ['text', 'label', 'source_filename', 'filename']]
    
    X = combined_df[feature_columns].fillna(0)
    y = combined_df['label']
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y)
    
    model = RandomForestClassifier(n_estimators=100, random_state=42)
    model.fit(X_train, y_train)
    
    # 5. Evaluate
    y_pred = model.predict(X_test)
    report = classification_report(y_test, y_pred)
    
    return {
        'model': model,
        'test_accuracy': model.score(X_test, y_test),
        'classification_report': report,
        'feature_importance': dict(zip(feature_columns, model.feature_importances_))
    }

Named Entity Recognition Dataset

async def create_ner_dataset(document_paths: List[str], 
                           entity_types: List[str] = None):
    """Create NER training dataset from documents"""
    
    if entity_types is None:
        entity_types = ['PERSON', 'ORG', 'GPE', 'DATE', 'MONEY']
    
    async with AsyncLexa(api_key="your-api-key") as client:
        documents = await client.parse(document_paths)
        
        ner_dataset = []
        
        for doc in documents:
            # Get sentences for NER training
            sentences = doc.content.split('.')
            
            for sentence in sentences:
                sentence = sentence.strip()
                if len(sentence) < 10:  # Skip very short sentences
                    continue
                
                # Extract entities (simplified - in production use spaCy/NER model)
                entities = extract_entities_for_ner(sentence, entity_types)
                
                if entities:  # Only include sentences with entities
                    ner_dataset.append({
                        'text': sentence,
                        'entities': entities,
                        'source': doc.filename,
                        'length': len(sentence)
                    })
        
        return ner_dataset

def extract_entities_for_ner(text: str, entity_types: List[str]) -> List[Dict]:
    """Extract entities in NER format (simplified)"""
    import re
    
    entities = []
    
    # Date patterns
    date_pattern = r'\b\d{1,2}/\d{1,2}/\d{4}\b|\b\d{4}-\d{2}-\d{2}\b'
    for match in re.finditer(date_pattern, text):
        entities.append({
            'start': match.start(),
            'end': match.end(),
            'label': 'DATE',
            'text': match.group()
        })
    
    # Money patterns
    money_pattern = r'\$[\d,]+\.?\d*'
    for match in re.finditer(money_pattern, text):
        entities.append({
            'start': match.start(),
            'end': match.end(),
            'label': 'MONEY',
            'text': match.group()
        })
    
    # Simple person names (Title + Name pattern)
    person_pattern = r'\b(?:Mr|Ms|Mrs|Dr)\.?\s+[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*'
    for match in re.finditer(person_pattern, text):
        entities.append({
            'start': match.start(),
            'end': match.end(),
            'label': 'PERSON',
            'text': match.group()
        })
    
    return entities

Performance for ML Workflows

Data Processing

500+ docs/hour for training data preparation

Feature Extraction

50+ features extracted per document automatically

Quality Assurance

99.5% clean data rate for model training

Integration with ML Frameworks

Hugging Face Integration

from transformers import AutoTokenizer, AutoModelForSequenceClassification
from cerevox import AsyncLexa

async def prepare_for_huggingface(documents, model_name="bert-base-uncased"):
    """Prepare Lexa output for Hugging Face models"""
    
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    
    async with AsyncLexa(api_key="your-api-key") as client:
        docs = await client.parse(documents)
        
        # Get chunks sized for the model
        max_length = tokenizer.model_max_length - 2  # Account for special tokens
        
        chunks = docs.get_all_text_chunks(
            target_size=max_length * 4,  # Approximate character to token ratio
            tolerance=0.1
        )
        
        # Tokenize and prepare
        tokenized_data = []
        for chunk in chunks:
            tokens = tokenizer(
                chunk,
                truncation=True,
                padding='max_length',
                max_length=max_length,
                return_tensors='pt'
            )
            
            tokenized_data.append({
                'input_ids': tokens['input_ids'],
                'attention_mask': tokens['attention_mask'],
                'text': chunk
            })
        
        return tokenized_data

Next Steps

AI Model Training & Data Preparation with Lexa

Transform unstructured documents into high-quality training data for machine learning models. Lexa’s precision parsing creates clean, structured datasets from complex documents.

Why Lexa for AI/ML Workflows?

Clean Data Extraction

Extract structured data with 99%+ accuracy for training

Format Standardization

Normalize data across multiple document formats

Batch Processing

Process thousands of documents for large datasets

Feature Engineering

Extract features ready for ML pipelines

ML Data Preparation Use Cases

  • Text Classification Models (document categorization, sentiment analysis)
  • Named Entity Recognition (extract entities from parsed content)
  • Question-Answering Systems (create Q&A datasets from documents)
  • Summarization Models (extract summaries and key points)
  • Information Extraction (structured data from unstructured text)
  • OCR Post-Processing (clean and structure OCR output)
  • Document Similarity (create embeddings from parsed content)

Quick Start: ML Dataset Creation

Transform documents into ML-ready datasets:

from cerevox import Lexa
import pandas as pd
from sklearn.model_selection import train_test_split

# Initialize client
client = Lexa(api_key="your-api-key")

# Process labeled documents for classification
def create_classification_dataset(document_categories):
    """Create text classification dataset"""
    
    dataset = []
    
    for category, doc_paths in document_categories.items():
        documents = client.parse(doc_paths)
        
        for doc in documents:
            # Extract clean text chunks
            chunks = doc.get_text_chunks(target_size=512)
            
            for chunk in chunks:
                # Clean and prepare text
                cleaned_text = clean_text_for_ml(chunk)
                
                dataset.append({
                    'text': cleaned_text,
                    'label': category,
                    'source': doc.filename,
                    'chunk_length': len(cleaned_text),
                    'word_count': len(cleaned_text.split())
                })
    
    # Convert to DataFrame
    df = pd.DataFrame(dataset)
    
    # Split into train/val/test
    train_df, temp_df = train_test_split(df, test_size=0.3, stratify=df['label'])
    val_df, test_df = train_test_split(temp_df, test_size=0.5, stratify=temp_df['label'])
    
    return {
        'train': train_df,
        'validation': val_df,
        'test': test_df,
        'stats': df.groupby('label').size().to_dict()
    }

def clean_text_for_ml(text):
    """Clean text for ML training"""
    import re
    
    # Remove extra whitespace
    text = re.sub(r'\s+', ' ', text)
    
    # Remove special characters but keep punctuation
    text = re.sub(r'[^\w\s\.,!?;:-]', '', text)
    
    # Strip and return
    return text.strip()

# Usage
doc_categories = {
    'financial': ['financial_report1.pdf', 'financial_report2.pdf'],
    'legal': ['contract1.pdf', 'legal_doc1.pdf'],
    'technical': ['manual1.pdf', 'spec1.pdf']
}

dataset = create_classification_dataset(doc_categories)
print(f"Training samples: {len(dataset['train'])}")
print(f"Class distribution: {dataset['stats']}")

Advanced ML Data Workflows

Feature Engineering Pipeline

Extract ML-ready features from complex documents:

from cerevox import AsyncLexa
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from typing import List, Dict

class DocumentFeatureExtractor:
    def __init__(self, api_key: str):
        self.client = AsyncLexa(api_key=api_key)
        self.vectorizer = TfidfVectorizer(max_features=1000, stop_words='english')
    
    async def extract_comprehensive_features(self, documents: List[str]) -> pd.DataFrame:
        """Extract comprehensive features for ML models"""
        
        async with self.client:
            docs = await self.client.parse(documents)
            
            features = []
            all_texts = []
            
            for doc in docs:
                # Document-level features
                doc_features = {
                    'filename': doc.filename,
                    'total_pages': doc.total_pages,
                    'total_elements': doc.total_elements,
                    'table_count': len(doc.tables),
                    'image_count': len(doc.images) if hasattr(doc, 'images') else 0,
                    'file_size_kb': getattr(doc, 'file_size', 0) / 1024,
                }
                
                # Content-based features
                full_text = doc.content
                all_texts.append(full_text)
                
                # Text statistics
                doc_features.update({
                    'char_count': len(full_text),
                    'word_count': len(full_text.split()),
                    'sentence_count': len(full_text.split('.')),
                    'paragraph_count': len(full_text.split('\n\n')),
                    'avg_word_length': np.mean([len(word) for word in full_text.split()]),
                    'punctuation_ratio': sum(1 for c in full_text if c in '.,!?;:') / len(full_text),
                    'uppercase_ratio': sum(1 for c in full_text if c.isupper()) / len(full_text),
                    'digit_ratio': sum(1 for c in full_text if c.isdigit()) / len(full_text)
                })
                
                # Complexity features
                doc_features.update({
                    'lexical_diversity': self._calculate_lexical_diversity(full_text),
                    'readability_score': self._calculate_readability(full_text),
                    'formality_score': self._calculate_formality(full_text)
                })
                
                # Domain-specific features
                doc_features.update(self._extract_domain_features(full_text))
                
                features.append(doc_features)
            
            # Create DataFrame
            df = pd.DataFrame(features)
            
            # Add TF-IDF features
            tfidf_matrix = self.vectorizer.fit_transform(all_texts)
            tfidf_df = pd.DataFrame(
                tfidf_matrix.toarray(),
                columns=[f'tfidf_{i}' for i in range(tfidf_matrix.shape[1])]
            )
            
            # Combine all features
            final_df = pd.concat([df, tfidf_df], axis=1)
            
            return final_df
    
    def _calculate_lexical_diversity(self, text: str) -> float:
        """Calculate lexical diversity (unique words / total words)"""
        words = text.lower().split()
        if not words:
            return 0.0
        return len(set(words)) / len(words)
    
    def _calculate_readability(self, text: str) -> float:
        """Calculate simplified readability score"""
        sentences = text.split('.')
        words = text.split()
        
        if not sentences or not words:
            return 0.0
        
        avg_sentence_length = len(words) / len(sentences)
        avg_word_length = np.mean([len(word) for word in words])
        
        # Simplified readability formula
        return max(0, 100 - (avg_sentence_length * 0.5) - (avg_word_length * 2))
    
    def _calculate_formality(self, text: str) -> float:
        """Calculate formality score based on linguistic features"""
        formal_indicators = [
            'therefore', 'furthermore', 'moreover', 'consequently',
            'accordingly', 'nevertheless', 'notwithstanding'
        ]
        
        informal_indicators = [
            'gonna', 'wanna', 'kinda', 'sorta', 'yeah', 'okay'
        ]
        
        text_lower = text.lower()
        formal_count = sum(1 for indicator in formal_indicators if indicator in text_lower)
        informal_count = sum(1 for indicator in informal_indicators if indicator in text_lower)
        
        total_indicators = formal_count + informal_count
        if total_indicators == 0:
            return 0.5  # Neutral
        
        return formal_count / total_indicators
    
    def _extract_domain_features(self, text: str) -> Dict:
        """Extract domain-specific features"""
        text_lower = text.lower()
        
        return {
            'financial_terms': sum(1 for term in ['revenue', 'profit', 'loss', 'investment', 'roi'] 
                                 if term in text_lower),
            'legal_terms': sum(1 for term in ['contract', 'agreement', 'clause', 'liability', 'party'] 
                             if term in text_lower),
            'technical_terms': sum(1 for term in ['system', 'process', 'method', 'algorithm', 'data'] 
                                 if term in text_lower),
            'medical_terms': sum(1 for term in ['patient', 'treatment', 'diagnosis', 'medical', 'health'] 
                               if term in text_lower)
        }

# Usage
extractor = DocumentFeatureExtractor("your-api-key")

# Extract features from document collection
document_paths = [
    "financial_report.pdf",
    "legal_contract.pdf", 
    "technical_manual.pdf"
]

feature_df = await extractor.extract_comprehensive_features(document_paths)

print(f"Feature matrix shape: {feature_df.shape}")
print(f"Available features: {list(feature_df.columns)}")

# Use features for ML model training
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split

# Assuming you have labels
X = feature_df.drop(['filename'], axis=1).fillna(0)
y = ['financial', 'legal', 'technical']  # Your labels

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
model = RandomForestClassifier()
model.fit(X_train, y_train)

Training Data Quality Pipeline

Ensure high-quality training data:

class TrainingDataQualityPipeline:
    def __init__(self, api_key: str):
        self.client = AsyncLexa(api_key=api_key)
    
    async def validate_training_data(self, document_paths: List[str], 
                                   labels: List[str] = None) -> Dict:
        """Validate and analyze training data quality"""
        
        async with self.client:
            documents = await self.client.parse(document_paths)
            
            quality_report = {
                'total_documents': len(documents),
                'parsing_success_rate': len(documents) / len(document_paths),
                'content_analysis': {},
                'recommendations': []
            }
            
            # Analyze content quality
            text_lengths = []
            error_documents = []
            
            for i, doc in enumerate(documents):
                if not doc.content or len(doc.content.strip()) < 50:
                    error_documents.append({
                        'filename': doc.filename,
                        'issue': 'insufficient_content',
                        'content_length': len(doc.content) if doc.content else 0
                    })
                else:
                    text_lengths.append(len(doc.content))
            
            # Content statistics
            if text_lengths:
                quality_report['content_analysis'] = {
                    'avg_content_length': np.mean(text_lengths),
                    'min_content_length': min(text_lengths),
                    'max_content_length': max(text_lengths),
                    'std_content_length': np.std(text_lengths),
                    'documents_with_errors': len(error_documents)
                }
            
            # Generate recommendations
            if error_documents:
                quality_report['recommendations'].append({
                    'type': 'content_quality',
                    'message': f'{len(error_documents)} documents have insufficient content',
                    'action': 'Review and exclude documents with less than 50 characters'
                })
            
            if text_lengths:
                length_std = np.std(text_lengths)
                length_mean = np.mean(text_lengths)
                
                if length_std > length_mean * 0.5:
                    quality_report['recommendations'].append({
                        'type': 'length_variance',
                        'message': 'High variance in document lengths detected',
                        'action': 'Consider chunking strategy or separate models for different document types'
                    })
            
            # Label balance analysis (if labels provided)
            if labels:
                label_counts = pd.Series(labels).value_counts()
                min_class_size = label_counts.min()
                max_class_size = label_counts.max()
                
                if max_class_size > min_class_size * 3:
                    quality_report['recommendations'].append({
                        'type': 'class_imbalance',
                        'message': 'Significant class imbalance detected',
                        'action': 'Consider data augmentation or stratified sampling',
                        'label_distribution': label_counts.to_dict()
                    })
            
            return quality_report
    
    def suggest_chunking_strategy(self, documents, target_model: str = 'bert') -> Dict:
        """Suggest optimal chunking strategy for ML model"""
        
        # Model-specific recommendations
        chunking_recommendations = {
            'bert': {'target_size': 512, 'overlap': 50, 'tolerance': 0.1},
            'roberta': {'target_size': 512, 'overlap': 50, 'tolerance': 0.1},
            'longformer': {'target_size': 2048, 'overlap': 100, 'tolerance': 0.15},
            'bigbird': {'target_size': 2048, 'overlap': 100, 'tolerance': 0.15},
            'gpt': {'target_size': 1024, 'overlap': 0, 'tolerance': 0.1}
        }
        
        base_config = chunking_recommendations.get(target_model.lower(), {
            'target_size': 512, 'overlap': 50, 'tolerance': 0.1
        })
        
        # Analyze document characteristics
        doc_lengths = [len(doc.content) for doc in documents if doc.content]
        avg_length = np.mean(doc_lengths)
        
        # Adjust recommendations based on document characteristics
        if avg_length < 1000:
            # Short documents - use smaller chunks
            base_config['target_size'] = min(base_config['target_size'], 256)
        elif avg_length > 10000:
            # Long documents - consider larger chunks if model supports
            if target_model.lower() in ['longformer', 'bigbird']:
                base_config['target_size'] = 4096
        
        return {
            'recommended_config': base_config,
            'document_stats': {
                'avg_length': avg_length,
                'total_documents': len(documents),
                'min_length': min(doc_lengths) if doc_lengths else 0,
                'max_length': max(doc_lengths) if doc_lengths else 0
            },
            'rationale': f"Optimized for {target_model} with average document length {avg_length:.0f} characters"
        }

# Usage
quality_pipeline = TrainingDataQualityPipeline("your-api-key")

# Validate training data
document_paths = ["doc1.pdf", "doc2.pdf", "doc3.pdf"]
labels = ["class_a", "class_b", "class_a"]

quality_report = await quality_pipeline.validate_training_data(document_paths, labels)
print("Quality Report:", quality_report)

# Get chunking recommendations
docs = await AsyncLexa(api_key="your-api-key").parse(document_paths)
chunking_strategy = quality_pipeline.suggest_chunking_strategy(docs, target_model='bert')
print("Chunking Strategy:", chunking_strategy)

Real-World ML Use Cases

Document Classification Model Training

async def train_document_classifier(training_docs: Dict[str, List[str]]):
    """Complete pipeline for training document classifier"""
    
    # 1. Data preparation
    pipeline = MLDataPreparationPipeline("your-api-key")
    df = await pipeline.prepare_training_data(training_docs, task_type='classification')
    
    # 2. Feature extraction
    extractor = DocumentFeatureExtractor("your-api-key")
    all_docs = [doc for docs in training_docs.values() for doc in docs]
    features_df = await extractor.extract_comprehensive_features(all_docs)
    
    # 3. Combine data and features
    combined_df = df.merge(features_df, left_on='source_filename', right_on='filename')
    
    # 4. Train model
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.metrics import classification_report
    
    feature_columns = [col for col in combined_df.columns 
                      if col not in ['text', 'label', 'source_filename', 'filename']]
    
    X = combined_df[feature_columns].fillna(0)
    y = combined_df['label']
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y)
    
    model = RandomForestClassifier(n_estimators=100, random_state=42)
    model.fit(X_train, y_train)
    
    # 5. Evaluate
    y_pred = model.predict(X_test)
    report = classification_report(y_test, y_pred)
    
    return {
        'model': model,
        'test_accuracy': model.score(X_test, y_test),
        'classification_report': report,
        'feature_importance': dict(zip(feature_columns, model.feature_importances_))
    }

Named Entity Recognition Dataset

async def create_ner_dataset(document_paths: List[str], 
                           entity_types: List[str] = None):
    """Create NER training dataset from documents"""
    
    if entity_types is None:
        entity_types = ['PERSON', 'ORG', 'GPE', 'DATE', 'MONEY']
    
    async with AsyncLexa(api_key="your-api-key") as client:
        documents = await client.parse(document_paths)
        
        ner_dataset = []
        
        for doc in documents:
            # Get sentences for NER training
            sentences = doc.content.split('.')
            
            for sentence in sentences:
                sentence = sentence.strip()
                if len(sentence) < 10:  # Skip very short sentences
                    continue
                
                # Extract entities (simplified - in production use spaCy/NER model)
                entities = extract_entities_for_ner(sentence, entity_types)
                
                if entities:  # Only include sentences with entities
                    ner_dataset.append({
                        'text': sentence,
                        'entities': entities,
                        'source': doc.filename,
                        'length': len(sentence)
                    })
        
        return ner_dataset

def extract_entities_for_ner(text: str, entity_types: List[str]) -> List[Dict]:
    """Extract entities in NER format (simplified)"""
    import re
    
    entities = []
    
    # Date patterns
    date_pattern = r'\b\d{1,2}/\d{1,2}/\d{4}\b|\b\d{4}-\d{2}-\d{2}\b'
    for match in re.finditer(date_pattern, text):
        entities.append({
            'start': match.start(),
            'end': match.end(),
            'label': 'DATE',
            'text': match.group()
        })
    
    # Money patterns
    money_pattern = r'\$[\d,]+\.?\d*'
    for match in re.finditer(money_pattern, text):
        entities.append({
            'start': match.start(),
            'end': match.end(),
            'label': 'MONEY',
            'text': match.group()
        })
    
    # Simple person names (Title + Name pattern)
    person_pattern = r'\b(?:Mr|Ms|Mrs|Dr)\.?\s+[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*'
    for match in re.finditer(person_pattern, text):
        entities.append({
            'start': match.start(),
            'end': match.end(),
            'label': 'PERSON',
            'text': match.group()
        })
    
    return entities

Performance for ML Workflows

Data Processing

500+ docs/hour for training data preparation

Feature Extraction

50+ features extracted per document automatically

Quality Assurance

99.5% clean data rate for model training

Integration with ML Frameworks

Hugging Face Integration

from transformers import AutoTokenizer, AutoModelForSequenceClassification
from cerevox import AsyncLexa

async def prepare_for_huggingface(documents, model_name="bert-base-uncased"):
    """Prepare Lexa output for Hugging Face models"""
    
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    
    async with AsyncLexa(api_key="your-api-key") as client:
        docs = await client.parse(documents)
        
        # Get chunks sized for the model
        max_length = tokenizer.model_max_length - 2  # Account for special tokens
        
        chunks = docs.get_all_text_chunks(
            target_size=max_length * 4,  # Approximate character to token ratio
            tolerance=0.1
        )
        
        # Tokenize and prepare
        tokenized_data = []
        for chunk in chunks:
            tokens = tokenizer(
                chunk,
                truncation=True,
                padding='max_length',
                max_length=max_length,
                return_tensors='pt'
            )
            
            tokenized_data.append({
                'input_ids': tokens['input_ids'],
                'attention_mask': tokens['attention_mask'],
                'text': chunk
            })
        
        return tokenized_data

Next Steps