Transform unstructured documents into ML-ready datasets with precision
Transform unstructured documents into high-quality training data for machine learning models. Lexa’s precision parsing creates clean, structured datasets from complex documents.
Extract structured data with 99%+ accuracy for training
Normalize data across multiple document formats
Process thousands of documents for large datasets
Extract features ready for ML pipelines
Transform documents into ML-ready datasets:
from cerevox import Lexa
import pandas as pd
from sklearn.model_selection import train_test_split
# Initialize client
client = Lexa(api_key="your-api-key")
# Process labeled documents for classification
def create_classification_dataset(document_categories):
"""Create text classification dataset"""
dataset = []
for category, doc_paths in document_categories.items():
documents = client.parse(doc_paths)
for doc in documents:
# Extract clean text chunks
chunks = doc.get_text_chunks(target_size=512)
for chunk in chunks:
# Clean and prepare text
cleaned_text = clean_text_for_ml(chunk)
dataset.append({
'text': cleaned_text,
'label': category,
'source': doc.filename,
'chunk_length': len(cleaned_text),
'word_count': len(cleaned_text.split())
})
# Convert to DataFrame
df = pd.DataFrame(dataset)
# Split into train/val/test
train_df, temp_df = train_test_split(df, test_size=0.3, stratify=df['label'])
val_df, test_df = train_test_split(temp_df, test_size=0.5, stratify=temp_df['label'])
return {
'train': train_df,
'validation': val_df,
'test': test_df,
'stats': df.groupby('label').size().to_dict()
}
def clean_text_for_ml(text):
"""Clean text for ML training"""
import re
# Remove extra whitespace
text = re.sub(r'\s+', ' ', text)
# Remove special characters but keep punctuation
text = re.sub(r'[^\w\s\.,!?;:-]', '', text)
# Strip and return
return text.strip()
# Usage
doc_categories = {
'financial': ['financial_report1.pdf', 'financial_report2.pdf'],
'legal': ['contract1.pdf', 'legal_doc1.pdf'],
'technical': ['manual1.pdf', 'spec1.pdf']
}
dataset = create_classification_dataset(doc_categories)
print(f"Training samples: {len(dataset['train'])}")
print(f"Class distribution: {dataset['stats']}")
Extract ML-ready features from complex documents:
from cerevox import AsyncLexa
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from typing import List, Dict
class DocumentFeatureExtractor:
def __init__(self, api_key: str):
self.client = AsyncLexa(api_key=api_key)
self.vectorizer = TfidfVectorizer(max_features=1000, stop_words='english')
async def extract_comprehensive_features(self, documents: List[str]) -> pd.DataFrame:
"""Extract comprehensive features for ML models"""
async with self.client:
docs = await self.client.parse(documents)
features = []
all_texts = []
for doc in docs:
# Document-level features
doc_features = {
'filename': doc.filename,
'total_pages': doc.total_pages,
'total_elements': doc.total_elements,
'table_count': len(doc.tables),
'image_count': len(doc.images) if hasattr(doc, 'images') else 0,
'file_size_kb': getattr(doc, 'file_size', 0) / 1024,
}
# Content-based features
full_text = doc.content
all_texts.append(full_text)
# Text statistics
doc_features.update({
'char_count': len(full_text),
'word_count': len(full_text.split()),
'sentence_count': len(full_text.split('.')),
'paragraph_count': len(full_text.split('\n\n')),
'avg_word_length': np.mean([len(word) for word in full_text.split()]),
'punctuation_ratio': sum(1 for c in full_text if c in '.,!?;:') / len(full_text),
'uppercase_ratio': sum(1 for c in full_text if c.isupper()) / len(full_text),
'digit_ratio': sum(1 for c in full_text if c.isdigit()) / len(full_text)
})
# Complexity features
doc_features.update({
'lexical_diversity': self._calculate_lexical_diversity(full_text),
'readability_score': self._calculate_readability(full_text),
'formality_score': self._calculate_formality(full_text)
})
# Domain-specific features
doc_features.update(self._extract_domain_features(full_text))
features.append(doc_features)
# Create DataFrame
df = pd.DataFrame(features)
# Add TF-IDF features
tfidf_matrix = self.vectorizer.fit_transform(all_texts)
tfidf_df = pd.DataFrame(
tfidf_matrix.toarray(),
columns=[f'tfidf_{i}' for i in range(tfidf_matrix.shape[1])]
)
# Combine all features
final_df = pd.concat([df, tfidf_df], axis=1)
return final_df
def _calculate_lexical_diversity(self, text: str) -> float:
"""Calculate lexical diversity (unique words / total words)"""
words = text.lower().split()
if not words:
return 0.0
return len(set(words)) / len(words)
def _calculate_readability(self, text: str) -> float:
"""Calculate simplified readability score"""
sentences = text.split('.')
words = text.split()
if not sentences or not words:
return 0.0
avg_sentence_length = len(words) / len(sentences)
avg_word_length = np.mean([len(word) for word in words])
# Simplified readability formula
return max(0, 100 - (avg_sentence_length * 0.5) - (avg_word_length * 2))
def _calculate_formality(self, text: str) -> float:
"""Calculate formality score based on linguistic features"""
formal_indicators = [
'therefore', 'furthermore', 'moreover', 'consequently',
'accordingly', 'nevertheless', 'notwithstanding'
]
informal_indicators = [
'gonna', 'wanna', 'kinda', 'sorta', 'yeah', 'okay'
]
text_lower = text.lower()
formal_count = sum(1 for indicator in formal_indicators if indicator in text_lower)
informal_count = sum(1 for indicator in informal_indicators if indicator in text_lower)
total_indicators = formal_count + informal_count
if total_indicators == 0:
return 0.5 # Neutral
return formal_count / total_indicators
def _extract_domain_features(self, text: str) -> Dict:
"""Extract domain-specific features"""
text_lower = text.lower()
return {
'financial_terms': sum(1 for term in ['revenue', 'profit', 'loss', 'investment', 'roi']
if term in text_lower),
'legal_terms': sum(1 for term in ['contract', 'agreement', 'clause', 'liability', 'party']
if term in text_lower),
'technical_terms': sum(1 for term in ['system', 'process', 'method', 'algorithm', 'data']
if term in text_lower),
'medical_terms': sum(1 for term in ['patient', 'treatment', 'diagnosis', 'medical', 'health']
if term in text_lower)
}
# Usage
extractor = DocumentFeatureExtractor("your-api-key")
# Extract features from document collection
document_paths = [
"financial_report.pdf",
"legal_contract.pdf",
"technical_manual.pdf"
]
feature_df = await extractor.extract_comprehensive_features(document_paths)
print(f"Feature matrix shape: {feature_df.shape}")
print(f"Available features: {list(feature_df.columns)}")
# Use features for ML model training
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
# Assuming you have labels
X = feature_df.drop(['filename'], axis=1).fillna(0)
y = ['financial', 'legal', 'technical'] # Your labels
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
model = RandomForestClassifier()
model.fit(X_train, y_train)
Ensure high-quality training data:
class TrainingDataQualityPipeline:
def __init__(self, api_key: str):
self.client = AsyncLexa(api_key=api_key)
async def validate_training_data(self, document_paths: List[str],
labels: List[str] = None) -> Dict:
"""Validate and analyze training data quality"""
async with self.client:
documents = await self.client.parse(document_paths)
quality_report = {
'total_documents': len(documents),
'parsing_success_rate': len(documents) / len(document_paths),
'content_analysis': {},
'recommendations': []
}
# Analyze content quality
text_lengths = []
error_documents = []
for i, doc in enumerate(documents):
if not doc.content or len(doc.content.strip()) < 50:
error_documents.append({
'filename': doc.filename,
'issue': 'insufficient_content',
'content_length': len(doc.content) if doc.content else 0
})
else:
text_lengths.append(len(doc.content))
# Content statistics
if text_lengths:
quality_report['content_analysis'] = {
'avg_content_length': np.mean(text_lengths),
'min_content_length': min(text_lengths),
'max_content_length': max(text_lengths),
'std_content_length': np.std(text_lengths),
'documents_with_errors': len(error_documents)
}
# Generate recommendations
if error_documents:
quality_report['recommendations'].append({
'type': 'content_quality',
'message': f'{len(error_documents)} documents have insufficient content',
'action': 'Review and exclude documents with less than 50 characters'
})
if text_lengths:
length_std = np.std(text_lengths)
length_mean = np.mean(text_lengths)
if length_std > length_mean * 0.5:
quality_report['recommendations'].append({
'type': 'length_variance',
'message': 'High variance in document lengths detected',
'action': 'Consider chunking strategy or separate models for different document types'
})
# Label balance analysis (if labels provided)
if labels:
label_counts = pd.Series(labels).value_counts()
min_class_size = label_counts.min()
max_class_size = label_counts.max()
if max_class_size > min_class_size * 3:
quality_report['recommendations'].append({
'type': 'class_imbalance',
'message': 'Significant class imbalance detected',
'action': 'Consider data augmentation or stratified sampling',
'label_distribution': label_counts.to_dict()
})
return quality_report
def suggest_chunking_strategy(self, documents, target_model: str = 'bert') -> Dict:
"""Suggest optimal chunking strategy for ML model"""
# Model-specific recommendations
chunking_recommendations = {
'bert': {'target_size': 512, 'overlap': 50, 'tolerance': 0.1},
'roberta': {'target_size': 512, 'overlap': 50, 'tolerance': 0.1},
'longformer': {'target_size': 2048, 'overlap': 100, 'tolerance': 0.15},
'bigbird': {'target_size': 2048, 'overlap': 100, 'tolerance': 0.15},
'gpt': {'target_size': 1024, 'overlap': 0, 'tolerance': 0.1}
}
base_config = chunking_recommendations.get(target_model.lower(), {
'target_size': 512, 'overlap': 50, 'tolerance': 0.1
})
# Analyze document characteristics
doc_lengths = [len(doc.content) for doc in documents if doc.content]
avg_length = np.mean(doc_lengths)
# Adjust recommendations based on document characteristics
if avg_length < 1000:
# Short documents - use smaller chunks
base_config['target_size'] = min(base_config['target_size'], 256)
elif avg_length > 10000:
# Long documents - consider larger chunks if model supports
if target_model.lower() in ['longformer', 'bigbird']:
base_config['target_size'] = 4096
return {
'recommended_config': base_config,
'document_stats': {
'avg_length': avg_length,
'total_documents': len(documents),
'min_length': min(doc_lengths) if doc_lengths else 0,
'max_length': max(doc_lengths) if doc_lengths else 0
},
'rationale': f"Optimized for {target_model} with average document length {avg_length:.0f} characters"
}
# Usage
quality_pipeline = TrainingDataQualityPipeline("your-api-key")
# Validate training data
document_paths = ["doc1.pdf", "doc2.pdf", "doc3.pdf"]
labels = ["class_a", "class_b", "class_a"]
quality_report = await quality_pipeline.validate_training_data(document_paths, labels)
print("Quality Report:", quality_report)
# Get chunking recommendations
docs = await AsyncLexa(api_key="your-api-key").parse(document_paths)
chunking_strategy = quality_pipeline.suggest_chunking_strategy(docs, target_model='bert')
print("Chunking Strategy:", chunking_strategy)
async def train_document_classifier(training_docs: Dict[str, List[str]]):
"""Complete pipeline for training document classifier"""
# 1. Data preparation
pipeline = MLDataPreparationPipeline("your-api-key")
df = await pipeline.prepare_training_data(training_docs, task_type='classification')
# 2. Feature extraction
extractor = DocumentFeatureExtractor("your-api-key")
all_docs = [doc for docs in training_docs.values() for doc in docs]
features_df = await extractor.extract_comprehensive_features(all_docs)
# 3. Combine data and features
combined_df = df.merge(features_df, left_on='source_filename', right_on='filename')
# 4. Train model
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
feature_columns = [col for col in combined_df.columns
if col not in ['text', 'label', 'source_filename', 'filename']]
X = combined_df[feature_columns].fillna(0)
y = combined_df['label']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y)
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# 5. Evaluate
y_pred = model.predict(X_test)
report = classification_report(y_test, y_pred)
return {
'model': model,
'test_accuracy': model.score(X_test, y_test),
'classification_report': report,
'feature_importance': dict(zip(feature_columns, model.feature_importances_))
}
async def create_ner_dataset(document_paths: List[str],
entity_types: List[str] = None):
"""Create NER training dataset from documents"""
if entity_types is None:
entity_types = ['PERSON', 'ORG', 'GPE', 'DATE', 'MONEY']
async with AsyncLexa(api_key="your-api-key") as client:
documents = await client.parse(document_paths)
ner_dataset = []
for doc in documents:
# Get sentences for NER training
sentences = doc.content.split('.')
for sentence in sentences:
sentence = sentence.strip()
if len(sentence) < 10: # Skip very short sentences
continue
# Extract entities (simplified - in production use spaCy/NER model)
entities = extract_entities_for_ner(sentence, entity_types)
if entities: # Only include sentences with entities
ner_dataset.append({
'text': sentence,
'entities': entities,
'source': doc.filename,
'length': len(sentence)
})
return ner_dataset
def extract_entities_for_ner(text: str, entity_types: List[str]) -> List[Dict]:
"""Extract entities in NER format (simplified)"""
import re
entities = []
# Date patterns
date_pattern = r'\b\d{1,2}/\d{1,2}/\d{4}\b|\b\d{4}-\d{2}-\d{2}\b'
for match in re.finditer(date_pattern, text):
entities.append({
'start': match.start(),
'end': match.end(),
'label': 'DATE',
'text': match.group()
})
# Money patterns
money_pattern = r'\$[\d,]+\.?\d*'
for match in re.finditer(money_pattern, text):
entities.append({
'start': match.start(),
'end': match.end(),
'label': 'MONEY',
'text': match.group()
})
# Simple person names (Title + Name pattern)
person_pattern = r'\b(?:Mr|Ms|Mrs|Dr)\.?\s+[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*'
for match in re.finditer(person_pattern, text):
entities.append({
'start': match.start(),
'end': match.end(),
'label': 'PERSON',
'text': match.group()
})
return entities
500+ docs/hour for training data preparation
50+ features extracted per document automatically
99.5% clean data rate for model training
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from cerevox import AsyncLexa
async def prepare_for_huggingface(documents, model_name="bert-base-uncased"):
"""Prepare Lexa output for Hugging Face models"""
tokenizer = AutoTokenizer.from_pretrained(model_name)
async with AsyncLexa(api_key="your-api-key") as client:
docs = await client.parse(documents)
# Get chunks sized for the model
max_length = tokenizer.model_max_length - 2 # Account for special tokens
chunks = docs.get_all_text_chunks(
target_size=max_length * 4, # Approximate character to token ratio
tolerance=0.1
)
# Tokenize and prepare
tokenized_data = []
for chunk in chunks:
tokens = tokenizer(
chunk,
truncation=True,
padding='max_length',
max_length=max_length,
return_tensors='pt'
)
tokenized_data.append({
'input_ids': tokens['input_ids'],
'attention_mask': tokens['attention_mask'],
'text': chunk
})
return tokenized_data
Start preparing ML datasets in minutes
Learn embedding and RAG workflows
Optimize for production ML pipelines
Scale your ML data preparation
Transform unstructured documents into ML-ready datasets with precision
Transform unstructured documents into high-quality training data for machine learning models. Lexa’s precision parsing creates clean, structured datasets from complex documents.
Extract structured data with 99%+ accuracy for training
Normalize data across multiple document formats
Process thousands of documents for large datasets
Extract features ready for ML pipelines
Transform documents into ML-ready datasets:
from cerevox import Lexa
import pandas as pd
from sklearn.model_selection import train_test_split
# Initialize client
client = Lexa(api_key="your-api-key")
# Process labeled documents for classification
def create_classification_dataset(document_categories):
"""Create text classification dataset"""
dataset = []
for category, doc_paths in document_categories.items():
documents = client.parse(doc_paths)
for doc in documents:
# Extract clean text chunks
chunks = doc.get_text_chunks(target_size=512)
for chunk in chunks:
# Clean and prepare text
cleaned_text = clean_text_for_ml(chunk)
dataset.append({
'text': cleaned_text,
'label': category,
'source': doc.filename,
'chunk_length': len(cleaned_text),
'word_count': len(cleaned_text.split())
})
# Convert to DataFrame
df = pd.DataFrame(dataset)
# Split into train/val/test
train_df, temp_df = train_test_split(df, test_size=0.3, stratify=df['label'])
val_df, test_df = train_test_split(temp_df, test_size=0.5, stratify=temp_df['label'])
return {
'train': train_df,
'validation': val_df,
'test': test_df,
'stats': df.groupby('label').size().to_dict()
}
def clean_text_for_ml(text):
"""Clean text for ML training"""
import re
# Remove extra whitespace
text = re.sub(r'\s+', ' ', text)
# Remove special characters but keep punctuation
text = re.sub(r'[^\w\s\.,!?;:-]', '', text)
# Strip and return
return text.strip()
# Usage
doc_categories = {
'financial': ['financial_report1.pdf', 'financial_report2.pdf'],
'legal': ['contract1.pdf', 'legal_doc1.pdf'],
'technical': ['manual1.pdf', 'spec1.pdf']
}
dataset = create_classification_dataset(doc_categories)
print(f"Training samples: {len(dataset['train'])}")
print(f"Class distribution: {dataset['stats']}")
Extract ML-ready features from complex documents:
from cerevox import AsyncLexa
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from typing import List, Dict
class DocumentFeatureExtractor:
def __init__(self, api_key: str):
self.client = AsyncLexa(api_key=api_key)
self.vectorizer = TfidfVectorizer(max_features=1000, stop_words='english')
async def extract_comprehensive_features(self, documents: List[str]) -> pd.DataFrame:
"""Extract comprehensive features for ML models"""
async with self.client:
docs = await self.client.parse(documents)
features = []
all_texts = []
for doc in docs:
# Document-level features
doc_features = {
'filename': doc.filename,
'total_pages': doc.total_pages,
'total_elements': doc.total_elements,
'table_count': len(doc.tables),
'image_count': len(doc.images) if hasattr(doc, 'images') else 0,
'file_size_kb': getattr(doc, 'file_size', 0) / 1024,
}
# Content-based features
full_text = doc.content
all_texts.append(full_text)
# Text statistics
doc_features.update({
'char_count': len(full_text),
'word_count': len(full_text.split()),
'sentence_count': len(full_text.split('.')),
'paragraph_count': len(full_text.split('\n\n')),
'avg_word_length': np.mean([len(word) for word in full_text.split()]),
'punctuation_ratio': sum(1 for c in full_text if c in '.,!?;:') / len(full_text),
'uppercase_ratio': sum(1 for c in full_text if c.isupper()) / len(full_text),
'digit_ratio': sum(1 for c in full_text if c.isdigit()) / len(full_text)
})
# Complexity features
doc_features.update({
'lexical_diversity': self._calculate_lexical_diversity(full_text),
'readability_score': self._calculate_readability(full_text),
'formality_score': self._calculate_formality(full_text)
})
# Domain-specific features
doc_features.update(self._extract_domain_features(full_text))
features.append(doc_features)
# Create DataFrame
df = pd.DataFrame(features)
# Add TF-IDF features
tfidf_matrix = self.vectorizer.fit_transform(all_texts)
tfidf_df = pd.DataFrame(
tfidf_matrix.toarray(),
columns=[f'tfidf_{i}' for i in range(tfidf_matrix.shape[1])]
)
# Combine all features
final_df = pd.concat([df, tfidf_df], axis=1)
return final_df
def _calculate_lexical_diversity(self, text: str) -> float:
"""Calculate lexical diversity (unique words / total words)"""
words = text.lower().split()
if not words:
return 0.0
return len(set(words)) / len(words)
def _calculate_readability(self, text: str) -> float:
"""Calculate simplified readability score"""
sentences = text.split('.')
words = text.split()
if not sentences or not words:
return 0.0
avg_sentence_length = len(words) / len(sentences)
avg_word_length = np.mean([len(word) for word in words])
# Simplified readability formula
return max(0, 100 - (avg_sentence_length * 0.5) - (avg_word_length * 2))
def _calculate_formality(self, text: str) -> float:
"""Calculate formality score based on linguistic features"""
formal_indicators = [
'therefore', 'furthermore', 'moreover', 'consequently',
'accordingly', 'nevertheless', 'notwithstanding'
]
informal_indicators = [
'gonna', 'wanna', 'kinda', 'sorta', 'yeah', 'okay'
]
text_lower = text.lower()
formal_count = sum(1 for indicator in formal_indicators if indicator in text_lower)
informal_count = sum(1 for indicator in informal_indicators if indicator in text_lower)
total_indicators = formal_count + informal_count
if total_indicators == 0:
return 0.5 # Neutral
return formal_count / total_indicators
def _extract_domain_features(self, text: str) -> Dict:
"""Extract domain-specific features"""
text_lower = text.lower()
return {
'financial_terms': sum(1 for term in ['revenue', 'profit', 'loss', 'investment', 'roi']
if term in text_lower),
'legal_terms': sum(1 for term in ['contract', 'agreement', 'clause', 'liability', 'party']
if term in text_lower),
'technical_terms': sum(1 for term in ['system', 'process', 'method', 'algorithm', 'data']
if term in text_lower),
'medical_terms': sum(1 for term in ['patient', 'treatment', 'diagnosis', 'medical', 'health']
if term in text_lower)
}
# Usage
extractor = DocumentFeatureExtractor("your-api-key")
# Extract features from document collection
document_paths = [
"financial_report.pdf",
"legal_contract.pdf",
"technical_manual.pdf"
]
feature_df = await extractor.extract_comprehensive_features(document_paths)
print(f"Feature matrix shape: {feature_df.shape}")
print(f"Available features: {list(feature_df.columns)}")
# Use features for ML model training
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
# Assuming you have labels
X = feature_df.drop(['filename'], axis=1).fillna(0)
y = ['financial', 'legal', 'technical'] # Your labels
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
model = RandomForestClassifier()
model.fit(X_train, y_train)
Ensure high-quality training data:
class TrainingDataQualityPipeline:
def __init__(self, api_key: str):
self.client = AsyncLexa(api_key=api_key)
async def validate_training_data(self, document_paths: List[str],
labels: List[str] = None) -> Dict:
"""Validate and analyze training data quality"""
async with self.client:
documents = await self.client.parse(document_paths)
quality_report = {
'total_documents': len(documents),
'parsing_success_rate': len(documents) / len(document_paths),
'content_analysis': {},
'recommendations': []
}
# Analyze content quality
text_lengths = []
error_documents = []
for i, doc in enumerate(documents):
if not doc.content or len(doc.content.strip()) < 50:
error_documents.append({
'filename': doc.filename,
'issue': 'insufficient_content',
'content_length': len(doc.content) if doc.content else 0
})
else:
text_lengths.append(len(doc.content))
# Content statistics
if text_lengths:
quality_report['content_analysis'] = {
'avg_content_length': np.mean(text_lengths),
'min_content_length': min(text_lengths),
'max_content_length': max(text_lengths),
'std_content_length': np.std(text_lengths),
'documents_with_errors': len(error_documents)
}
# Generate recommendations
if error_documents:
quality_report['recommendations'].append({
'type': 'content_quality',
'message': f'{len(error_documents)} documents have insufficient content',
'action': 'Review and exclude documents with less than 50 characters'
})
if text_lengths:
length_std = np.std(text_lengths)
length_mean = np.mean(text_lengths)
if length_std > length_mean * 0.5:
quality_report['recommendations'].append({
'type': 'length_variance',
'message': 'High variance in document lengths detected',
'action': 'Consider chunking strategy or separate models for different document types'
})
# Label balance analysis (if labels provided)
if labels:
label_counts = pd.Series(labels).value_counts()
min_class_size = label_counts.min()
max_class_size = label_counts.max()
if max_class_size > min_class_size * 3:
quality_report['recommendations'].append({
'type': 'class_imbalance',
'message': 'Significant class imbalance detected',
'action': 'Consider data augmentation or stratified sampling',
'label_distribution': label_counts.to_dict()
})
return quality_report
def suggest_chunking_strategy(self, documents, target_model: str = 'bert') -> Dict:
"""Suggest optimal chunking strategy for ML model"""
# Model-specific recommendations
chunking_recommendations = {
'bert': {'target_size': 512, 'overlap': 50, 'tolerance': 0.1},
'roberta': {'target_size': 512, 'overlap': 50, 'tolerance': 0.1},
'longformer': {'target_size': 2048, 'overlap': 100, 'tolerance': 0.15},
'bigbird': {'target_size': 2048, 'overlap': 100, 'tolerance': 0.15},
'gpt': {'target_size': 1024, 'overlap': 0, 'tolerance': 0.1}
}
base_config = chunking_recommendations.get(target_model.lower(), {
'target_size': 512, 'overlap': 50, 'tolerance': 0.1
})
# Analyze document characteristics
doc_lengths = [len(doc.content) for doc in documents if doc.content]
avg_length = np.mean(doc_lengths)
# Adjust recommendations based on document characteristics
if avg_length < 1000:
# Short documents - use smaller chunks
base_config['target_size'] = min(base_config['target_size'], 256)
elif avg_length > 10000:
# Long documents - consider larger chunks if model supports
if target_model.lower() in ['longformer', 'bigbird']:
base_config['target_size'] = 4096
return {
'recommended_config': base_config,
'document_stats': {
'avg_length': avg_length,
'total_documents': len(documents),
'min_length': min(doc_lengths) if doc_lengths else 0,
'max_length': max(doc_lengths) if doc_lengths else 0
},
'rationale': f"Optimized for {target_model} with average document length {avg_length:.0f} characters"
}
# Usage
quality_pipeline = TrainingDataQualityPipeline("your-api-key")
# Validate training data
document_paths = ["doc1.pdf", "doc2.pdf", "doc3.pdf"]
labels = ["class_a", "class_b", "class_a"]
quality_report = await quality_pipeline.validate_training_data(document_paths, labels)
print("Quality Report:", quality_report)
# Get chunking recommendations
docs = await AsyncLexa(api_key="your-api-key").parse(document_paths)
chunking_strategy = quality_pipeline.suggest_chunking_strategy(docs, target_model='bert')
print("Chunking Strategy:", chunking_strategy)
async def train_document_classifier(training_docs: Dict[str, List[str]]):
"""Complete pipeline for training document classifier"""
# 1. Data preparation
pipeline = MLDataPreparationPipeline("your-api-key")
df = await pipeline.prepare_training_data(training_docs, task_type='classification')
# 2. Feature extraction
extractor = DocumentFeatureExtractor("your-api-key")
all_docs = [doc for docs in training_docs.values() for doc in docs]
features_df = await extractor.extract_comprehensive_features(all_docs)
# 3. Combine data and features
combined_df = df.merge(features_df, left_on='source_filename', right_on='filename')
# 4. Train model
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
feature_columns = [col for col in combined_df.columns
if col not in ['text', 'label', 'source_filename', 'filename']]
X = combined_df[feature_columns].fillna(0)
y = combined_df['label']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y)
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# 5. Evaluate
y_pred = model.predict(X_test)
report = classification_report(y_test, y_pred)
return {
'model': model,
'test_accuracy': model.score(X_test, y_test),
'classification_report': report,
'feature_importance': dict(zip(feature_columns, model.feature_importances_))
}
async def create_ner_dataset(document_paths: List[str],
entity_types: List[str] = None):
"""Create NER training dataset from documents"""
if entity_types is None:
entity_types = ['PERSON', 'ORG', 'GPE', 'DATE', 'MONEY']
async with AsyncLexa(api_key="your-api-key") as client:
documents = await client.parse(document_paths)
ner_dataset = []
for doc in documents:
# Get sentences for NER training
sentences = doc.content.split('.')
for sentence in sentences:
sentence = sentence.strip()
if len(sentence) < 10: # Skip very short sentences
continue
# Extract entities (simplified - in production use spaCy/NER model)
entities = extract_entities_for_ner(sentence, entity_types)
if entities: # Only include sentences with entities
ner_dataset.append({
'text': sentence,
'entities': entities,
'source': doc.filename,
'length': len(sentence)
})
return ner_dataset
def extract_entities_for_ner(text: str, entity_types: List[str]) -> List[Dict]:
"""Extract entities in NER format (simplified)"""
import re
entities = []
# Date patterns
date_pattern = r'\b\d{1,2}/\d{1,2}/\d{4}\b|\b\d{4}-\d{2}-\d{2}\b'
for match in re.finditer(date_pattern, text):
entities.append({
'start': match.start(),
'end': match.end(),
'label': 'DATE',
'text': match.group()
})
# Money patterns
money_pattern = r'\$[\d,]+\.?\d*'
for match in re.finditer(money_pattern, text):
entities.append({
'start': match.start(),
'end': match.end(),
'label': 'MONEY',
'text': match.group()
})
# Simple person names (Title + Name pattern)
person_pattern = r'\b(?:Mr|Ms|Mrs|Dr)\.?\s+[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*'
for match in re.finditer(person_pattern, text):
entities.append({
'start': match.start(),
'end': match.end(),
'label': 'PERSON',
'text': match.group()
})
return entities
500+ docs/hour for training data preparation
50+ features extracted per document automatically
99.5% clean data rate for model training
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from cerevox import AsyncLexa
async def prepare_for_huggingface(documents, model_name="bert-base-uncased"):
"""Prepare Lexa output for Hugging Face models"""
tokenizer = AutoTokenizer.from_pretrained(model_name)
async with AsyncLexa(api_key="your-api-key") as client:
docs = await client.parse(documents)
# Get chunks sized for the model
max_length = tokenizer.model_max_length - 2 # Account for special tokens
chunks = docs.get_all_text_chunks(
target_size=max_length * 4, # Approximate character to token ratio
tolerance=0.1
)
# Tokenize and prepare
tokenized_data = []
for chunk in chunks:
tokens = tokenizer(
chunk,
truncation=True,
padding='max_length',
max_length=max_length,
return_tensors='pt'
)
tokenized_data.append({
'input_ids': tokens['input_ids'],
'attention_mask': tokens['attention_mask'],
'text': chunk
})
return tokenized_data
Start preparing ML datasets in minutes
Learn embedding and RAG workflows
Optimize for production ML pipelines
Scale your ML data preparation