AI Model Training & Data Preparation with Lexa
Transform unstructured documents into high-quality training data for machine learning models. Lexa’s precision parsing creates clean, structured datasets from complex documents.Why Lexa for AI/ML Workflows?
Clean Data Extraction
Extract structured data with 99%+ accuracy for training
Format Standardization
Normalize data across multiple document formats
Batch Processing
Process thousands of documents for large datasets
Feature Engineering
Extract features ready for ML pipelines
ML Data Preparation Use Cases
- Text Classification Models (document categorization, sentiment analysis)
- Named Entity Recognition (extract entities from parsed content)
- Question-Answering Systems (create Q&A datasets from documents)
- Summarization Models (extract summaries and key points)
- Information Extraction (structured data from unstructured text)
- OCR Post-Processing (clean and structure OCR output)
- Document Similarity (create embeddings from parsed content)
Quick Start: ML Dataset Creation
Transform documents into ML-ready datasets:Copy
from cerevox import Lexa
import pandas as pd
from sklearn.model_selection import train_test_split
# Initialize client
client = Lexa(api_key="your-api-key")
# Process labeled documents for classification
def create_classification_dataset(document_categories):
"""Create text classification dataset"""
dataset = []
for category, doc_paths in document_categories.items():
documents = client.parse(doc_paths)
for doc in documents:
# Extract clean text chunks
chunks = doc.get_text_chunks(target_size=512)
for chunk in chunks:
# Clean and prepare text
cleaned_text = clean_text_for_ml(chunk)
dataset.append({
'text': cleaned_text,
'label': category,
'source': doc.filename,
'chunk_length': len(cleaned_text),
'word_count': len(cleaned_text.split())
})
# Convert to DataFrame
df = pd.DataFrame(dataset)
# Split into train/val/test
train_df, temp_df = train_test_split(df, test_size=0.3, stratify=df['label'])
val_df, test_df = train_test_split(temp_df, test_size=0.5, stratify=temp_df['label'])
return {
'train': train_df,
'validation': val_df,
'test': test_df,
'stats': df.groupby('label').size().to_dict()
}
def clean_text_for_ml(text):
"""Clean text for ML training"""
import re
# Remove extra whitespace
text = re.sub(r'\s+', ' ', text)
# Remove special characters but keep punctuation
text = re.sub(r'[^\w\s\.,!?;:-]', '', text)
# Strip and return
return text.strip()
# Usage
doc_categories = {
'financial': ['financial_report1.pdf', 'financial_report2.pdf'],
'legal': ['contract1.pdf', 'legal_doc1.pdf'],
'technical': ['manual1.pdf', 'spec1.pdf']
}
dataset = create_classification_dataset(doc_categories)
print(f"Training samples: {len(dataset['train'])}")
print(f"Class distribution: {dataset['stats']}")
Advanced ML Data Workflows
Feature Engineering Pipeline
Extract ML-ready features from complex documents:Copy
from cerevox import AsyncLexa
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from typing import List, Dict
class DocumentFeatureExtractor:
def __init__(self, api_key: str):
self.client = AsyncLexa(api_key=api_key)
self.vectorizer = TfidfVectorizer(max_features=1000, stop_words='english')
async def extract_comprehensive_features(self, documents: List[str]) -> pd.DataFrame:
"""Extract comprehensive features for ML models"""
async with self.client:
docs = await self.client.parse(documents)
features = []
all_texts = []
for doc in docs:
# Document-level features
doc_features = {
'filename': doc.filename,
'total_pages': doc.total_pages,
'total_elements': doc.total_elements,
'table_count': len(doc.tables),
'image_count': len(doc.images) if hasattr(doc, 'images') else 0,
'file_size_kb': getattr(doc, 'file_size', 0) / 1024,
}
# Content-based features
full_text = doc.content
all_texts.append(full_text)
# Text statistics
doc_features.update({
'char_count': len(full_text),
'word_count': len(full_text.split()),
'sentence_count': len(full_text.split('.')),
'paragraph_count': len(full_text.split('\n\n')),
'avg_word_length': np.mean([len(word) for word in full_text.split()]),
'punctuation_ratio': sum(1 for c in full_text if c in '.,!?;:') / len(full_text),
'uppercase_ratio': sum(1 for c in full_text if c.isupper()) / len(full_text),
'digit_ratio': sum(1 for c in full_text if c.isdigit()) / len(full_text)
})
# Complexity features
doc_features.update({
'lexical_diversity': self._calculate_lexical_diversity(full_text),
'readability_score': self._calculate_readability(full_text),
'formality_score': self._calculate_formality(full_text)
})
# Domain-specific features
doc_features.update(self._extract_domain_features(full_text))
features.append(doc_features)
# Create DataFrame
df = pd.DataFrame(features)
# Add TF-IDF features
tfidf_matrix = self.vectorizer.fit_transform(all_texts)
tfidf_df = pd.DataFrame(
tfidf_matrix.toarray(),
columns=[f'tfidf_{i}' for i in range(tfidf_matrix.shape[1])]
)
# Combine all features
final_df = pd.concat([df, tfidf_df], axis=1)
return final_df
def _calculate_lexical_diversity(self, text: str) -> float:
"""Calculate lexical diversity (unique words / total words)"""
words = text.lower().split()
if not words:
return 0.0
return len(set(words)) / len(words)
def _calculate_readability(self, text: str) -> float:
"""Calculate simplified readability score"""
sentences = text.split('.')
words = text.split()
if not sentences or not words:
return 0.0
avg_sentence_length = len(words) / len(sentences)
avg_word_length = np.mean([len(word) for word in words])
# Simplified readability formula
return max(0, 100 - (avg_sentence_length * 0.5) - (avg_word_length * 2))
def _calculate_formality(self, text: str) -> float:
"""Calculate formality score based on linguistic features"""
formal_indicators = [
'therefore', 'furthermore', 'moreover', 'consequently',
'accordingly', 'nevertheless', 'notwithstanding'
]
informal_indicators = [
'gonna', 'wanna', 'kinda', 'sorta', 'yeah', 'okay'
]
text_lower = text.lower()
formal_count = sum(1 for indicator in formal_indicators if indicator in text_lower)
informal_count = sum(1 for indicator in informal_indicators if indicator in text_lower)
total_indicators = formal_count + informal_count
if total_indicators == 0:
return 0.5 # Neutral
return formal_count / total_indicators
def _extract_domain_features(self, text: str) -> Dict:
"""Extract domain-specific features"""
text_lower = text.lower()
return {
'financial_terms': sum(1 for term in ['revenue', 'profit', 'loss', 'investment', 'roi']
if term in text_lower),
'legal_terms': sum(1 for term in ['contract', 'agreement', 'clause', 'liability', 'party']
if term in text_lower),
'technical_terms': sum(1 for term in ['system', 'process', 'method', 'algorithm', 'data']
if term in text_lower),
'medical_terms': sum(1 for term in ['patient', 'treatment', 'diagnosis', 'medical', 'health']
if term in text_lower)
}
# Usage
extractor = DocumentFeatureExtractor("your-api-key")
# Extract features from document collection
document_paths = [
"financial_report.pdf",
"legal_contract.pdf",
"technical_manual.pdf"
]
feature_df = await extractor.extract_comprehensive_features(document_paths)
print(f"Feature matrix shape: {feature_df.shape}")
print(f"Available features: {list(feature_df.columns)}")
# Use features for ML model training
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
# Assuming you have labels
X = feature_df.drop(['filename'], axis=1).fillna(0)
y = ['financial', 'legal', 'technical'] # Your labels
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
model = RandomForestClassifier()
model.fit(X_train, y_train)
Training Data Quality Pipeline
Ensure high-quality training data:Copy
class TrainingDataQualityPipeline:
def __init__(self, api_key: str):
self.client = AsyncLexa(api_key=api_key)
async def validate_training_data(self, document_paths: List[str],
labels: List[str] = None) -> Dict:
"""Validate and analyze training data quality"""
async with self.client:
documents = await self.client.parse(document_paths)
quality_report = {
'total_documents': len(documents),
'parsing_success_rate': len(documents) / len(document_paths),
'content_analysis': {},
'recommendations': []
}
# Analyze content quality
text_lengths = []
error_documents = []
for i, doc in enumerate(documents):
if not doc.content or len(doc.content.strip()) < 50:
error_documents.append({
'filename': doc.filename,
'issue': 'insufficient_content',
'content_length': len(doc.content) if doc.content else 0
})
else:
text_lengths.append(len(doc.content))
# Content statistics
if text_lengths:
quality_report['content_analysis'] = {
'avg_content_length': np.mean(text_lengths),
'min_content_length': min(text_lengths),
'max_content_length': max(text_lengths),
'std_content_length': np.std(text_lengths),
'documents_with_errors': len(error_documents)
}
# Generate recommendations
if error_documents:
quality_report['recommendations'].append({
'type': 'content_quality',
'message': f'{len(error_documents)} documents have insufficient content',
'action': 'Review and exclude documents with less than 50 characters'
})
if text_lengths:
length_std = np.std(text_lengths)
length_mean = np.mean(text_lengths)
if length_std > length_mean * 0.5:
quality_report['recommendations'].append({
'type': 'length_variance',
'message': 'High variance in document lengths detected',
'action': 'Consider chunking strategy or separate models for different document types'
})
# Label balance analysis (if labels provided)
if labels:
label_counts = pd.Series(labels).value_counts()
min_class_size = label_counts.min()
max_class_size = label_counts.max()
if max_class_size > min_class_size * 3:
quality_report['recommendations'].append({
'type': 'class_imbalance',
'message': 'Significant class imbalance detected',
'action': 'Consider data augmentation or stratified sampling',
'label_distribution': label_counts.to_dict()
})
return quality_report
def suggest_chunking_strategy(self, documents, target_model: str = 'bert') -> Dict:
"""Suggest optimal chunking strategy for ML model"""
# Model-specific recommendations
chunking_recommendations = {
'bert': {'target_size': 512, 'overlap': 50, 'tolerance': 0.1},
'roberta': {'target_size': 512, 'overlap': 50, 'tolerance': 0.1},
'longformer': {'target_size': 2048, 'overlap': 100, 'tolerance': 0.15},
'bigbird': {'target_size': 2048, 'overlap': 100, 'tolerance': 0.15},
'gpt': {'target_size': 1024, 'overlap': 0, 'tolerance': 0.1}
}
base_config = chunking_recommendations.get(target_model.lower(), {
'target_size': 512, 'overlap': 50, 'tolerance': 0.1
})
# Analyze document characteristics
doc_lengths = [len(doc.content) for doc in documents if doc.content]
avg_length = np.mean(doc_lengths)
# Adjust recommendations based on document characteristics
if avg_length < 1000:
# Short documents - use smaller chunks
base_config['target_size'] = min(base_config['target_size'], 256)
elif avg_length > 10000:
# Long documents - consider larger chunks if model supports
if target_model.lower() in ['longformer', 'bigbird']:
base_config['target_size'] = 4096
return {
'recommended_config': base_config,
'document_stats': {
'avg_length': avg_length,
'total_documents': len(documents),
'min_length': min(doc_lengths) if doc_lengths else 0,
'max_length': max(doc_lengths) if doc_lengths else 0
},
'rationale': f"Optimized for {target_model} with average document length {avg_length:.0f} characters"
}
# Usage
quality_pipeline = TrainingDataQualityPipeline("your-api-key")
# Validate training data
document_paths = ["doc1.pdf", "doc2.pdf", "doc3.pdf"]
labels = ["class_a", "class_b", "class_a"]
quality_report = await quality_pipeline.validate_training_data(document_paths, labels)
print("Quality Report:", quality_report)
# Get chunking recommendations
docs = await AsyncLexa(api_key="your-api-key").parse(document_paths)
chunking_strategy = quality_pipeline.suggest_chunking_strategy(docs, target_model='bert')
print("Chunking Strategy:", chunking_strategy)
Real-World ML Use Cases
Document Classification Model Training
Copy
async def train_document_classifier(training_docs: Dict[str, List[str]]):
"""Complete pipeline for training document classifier"""
# 1. Data preparation
pipeline = MLDataPreparationPipeline("your-api-key")
df = await pipeline.prepare_training_data(training_docs, task_type='classification')
# 2. Feature extraction
extractor = DocumentFeatureExtractor("your-api-key")
all_docs = [doc for docs in training_docs.values() for doc in docs]
features_df = await extractor.extract_comprehensive_features(all_docs)
# 3. Combine data and features
combined_df = df.merge(features_df, left_on='source_filename', right_on='filename')
# 4. Train model
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
feature_columns = [col for col in combined_df.columns
if col not in ['text', 'label', 'source_filename', 'filename']]
X = combined_df[feature_columns].fillna(0)
y = combined_df['label']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y)
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# 5. Evaluate
y_pred = model.predict(X_test)
report = classification_report(y_test, y_pred)
return {
'model': model,
'test_accuracy': model.score(X_test, y_test),
'classification_report': report,
'feature_importance': dict(zip(feature_columns, model.feature_importances_))
}
Named Entity Recognition Dataset
Copy
async def create_ner_dataset(document_paths: List[str],
entity_types: List[str] = None):
"""Create NER training dataset from documents"""
if entity_types is None:
entity_types = ['PERSON', 'ORG', 'GPE', 'DATE', 'MONEY']
async with AsyncLexa(api_key="your-api-key") as client:
documents = await client.parse(document_paths)
ner_dataset = []
for doc in documents:
# Get sentences for NER training
sentences = doc.content.split('.')
for sentence in sentences:
sentence = sentence.strip()
if len(sentence) < 10: # Skip very short sentences
continue
# Extract entities (simplified - in production use spaCy/NER model)
entities = extract_entities_for_ner(sentence, entity_types)
if entities: # Only include sentences with entities
ner_dataset.append({
'text': sentence,
'entities': entities,
'source': doc.filename,
'length': len(sentence)
})
return ner_dataset
def extract_entities_for_ner(text: str, entity_types: List[str]) -> List[Dict]:
"""Extract entities in NER format (simplified)"""
import re
entities = []
# Date patterns
date_pattern = r'\b\d{1,2}/\d{1,2}/\d{4}\b|\b\d{4}-\d{2}-\d{2}\b'
for match in re.finditer(date_pattern, text):
entities.append({
'start': match.start(),
'end': match.end(),
'label': 'DATE',
'text': match.group()
})
# Money patterns
money_pattern = r'\$[\d,]+\.?\d*'
for match in re.finditer(money_pattern, text):
entities.append({
'start': match.start(),
'end': match.end(),
'label': 'MONEY',
'text': match.group()
})
# Simple person names (Title + Name pattern)
person_pattern = r'\b(?:Mr|Ms|Mrs|Dr)\.?\s+[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*'
for match in re.finditer(person_pattern, text):
entities.append({
'start': match.start(),
'end': match.end(),
'label': 'PERSON',
'text': match.group()
})
return entities
Performance for ML Workflows
Data Processing
500+ docs/hour for training data preparation
Feature Extraction
50+ features extracted per document automatically
Quality Assurance
99.5% clean data rate for model training
Integration with ML Frameworks
Hugging Face Integration
Copy
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from cerevox import AsyncLexa
async def prepare_for_huggingface(documents, model_name="bert-base-uncased"):
"""Prepare Lexa output for Hugging Face models"""
tokenizer = AutoTokenizer.from_pretrained(model_name)
async with AsyncLexa(api_key="your-api-key") as client:
docs = await client.parse(documents)
# Get chunks sized for the model
max_length = tokenizer.model_max_length - 2 # Account for special tokens
chunks = docs.get_all_text_chunks(
target_size=max_length * 4, # Approximate character to token ratio
tolerance=0.1
)
# Tokenize and prepare
tokenized_data = []
for chunk in chunks:
tokens = tokenizer(
chunk,
truncation=True,
padding='max_length',
max_length=max_length,
return_tensors='pt'
)
tokenized_data.append({
'input_ids': tokens['input_ids'],
'attention_mask': tokens['attention_mask'],
'text': chunk
})
return tokenized_data