Examples
Cloud Integrations
Parse documents directly from cloud storage services with Lexa
Supported Cloud Services
Amazon S3
Parse documents from S3 buckets with IAM integration
Microsoft SharePoint
Access SharePoint sites and document libraries
Box
Parse files from Box folders and enterprise content
Dropbox
Process documents from Dropbox folders
Amazon S3 Integration
Basic S3 Operations
Copy
from cerevox import Lexa
client = Lexa(api_key="your-api-key")
# List all available S3 buckets
buckets = client.list_s3_buckets()
print(f"Found {len(buckets.buckets)} buckets:")
for bucket in buckets.buckets:
print(f" 📦 {bucket.name} (Created: {bucket.creation_date})")
Advanced S3 Patterns
Copy
from cerevox import Lexa
def s3_progress_callback(status):
print(f"📊 S3 Processing: {status.status}")
if hasattr(status, 'progress') and status.progress:
print(f" Progress: {status.progress}%")
if hasattr(status, 'files_processed'):
processed = getattr(status, 'files_processed', 0)
total = getattr(status, 'total_files', 0)
print(f" Files: {processed}/{total}")
client = Lexa(api_key="your-api-key")
# Parse with detailed progress monitoring
documents = client.parse_s3_folder(
bucket="large-document-bucket",
folder_path="annual-reports/",
progress_callback=s3_progress_callback,
timeout=600.0, # 10 minutes for large batch
poll_interval=5.0
)
print(f"✅ Completed: {len(documents)} documents")
Microsoft SharePoint Integration
SharePoint Operations
Copy
from cerevox import Lexa
client = Lexa(api_key="your-api-key")
# List all available SharePoint sites
sites = client.list_sharepoint_sites()
print(f"Found {len(sites.sites)} SharePoint sites:")
for site in sites.sites:
print(f" 🏢 {site.name}")
print(f" ID: {site.id}")
print(f" URL: {site.web_url}")
SharePoint Automation
Copy
from cerevox import Lexa, ProcessingMode
import json
from datetime import datetime
def process_sharepoint_site(site_id, output_dir="sharepoint_results"):
"""Complete SharePoint site processing workflow"""
client = Lexa(api_key="your-api-key")
# Step 1: Get all drives in the site
print("🔍 Discovering SharePoint structure...")
drives = client.list_sharepoint_drives(site_id)
all_results = []
for drive in drives.drives:
print(f"\n📁 Processing drive: {drive.name}")
try:
# Parse all documents in the drive
documents = client.parse_sharepoint_folder(
site_id=site_id,
drive_id=drive.id,
folder_path="", # Root folder
mode=ProcessingMode.DEFAULT,
timeout=600.0
)
# Process each document
for doc in documents:
result = {
'drive_name': drive.name,
'drive_id': drive.id,
'document_title': doc.title,
'source_file': doc.source_file,
'content_length': len(doc.content),
'page_count': doc.page_count,
'tables_count': len(doc.tables),
'images_count': len(doc.images),
'processed_at': datetime.now().isoformat(),
'preview': doc.content[:300] if doc.content else ""
}
# Extract table summaries
if doc.tables:
result['table_summary'] = [
{
'rows': table.rows,
'columns': table.columns,
'page': table.page_number
}
for table in doc.tables
]
# Get text chunks for analysis
chunks = doc.get_text_chunks(target_size=400)
result['chunks_count'] = len(chunks)
all_results.append(result)
print(f"✅ Processed {len(documents)} documents from {drive.name}")
except Exception as e:
print(f"❌ Failed to process drive {drive.name}: {e}")
continue
# Save results
output_file = f"{output_dir}/sharepoint_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(output_file, 'w') as f:
json.dump(all_results, f, indent=2)
print(f"\n📊 Analysis complete:")
print(f" Total documents: {len(all_results)}")
print(f" Results saved to: {output_file}")
return all_results
# Usage
results = process_sharepoint_site("your-sharepoint-site-id")
Box Integration
Box Operations
Copy
from cerevox import Lexa
client = Lexa(api_key="your-api-key")
# List root folders
folders = client.list_box_folders()
print(f"Root folders in Box:")
for folder in folders.folders:
print(f" 📁 {folder.name} (ID: {folder.id})")
print(f" Created: {folder.created_at}")
print(f" Modified: {folder.modified_at}")
# List specific folder contents
subfolder_id = "123456789"
subfolders = client.list_box_folders(subfolder_id)
print(f"\nSubfolders in {subfolder_id}:")
for folder in subfolders.folders:
print(f" 📁 {folder.name}")
Dropbox Integration
Dropbox Operations
Copy
from cerevox import Lexa
client = Lexa(api_key="your-api-key")
# List root folders
folders = client.list_dropbox_folders()
print("Root folders in Dropbox:")
for folder in folders.folders:
print(f" 📁 {folder.name}")
print(f" Path: {folder.path_display}")
# List specific folder
specific_folders = client.list_dropbox_folders("/Documents/Work")
print(f"\nContents of /Documents/Work:")
for folder in specific_folders.folders:
print(f" 📁 {folder.name}")
Multi-Cloud Processing
Unified Cloud Processing
Copy
from cerevox import Lexa, ProcessingMode
import asyncio
from datetime import datetime
class MultiCloudProcessor:
def __init__(self, api_key):
self.client = Lexa(api_key=api_key)
self.results = []
def process_s3_source(self, bucket, folder_path=""):
"""Process documents from S3"""
print(f"🔄 Processing S3: s3://{bucket}/{folder_path}")
try:
documents = self.client.parse_s3_folder(
bucket=bucket,
folder_path=folder_path,
mode=ProcessingMode.DEFAULT,
timeout=300.0
)
for doc in documents:
self.results.append({
'source': 'S3',
'location': f"s3://{bucket}/{doc.source_file}",
'document': doc,
'processed_at': datetime.now().isoformat()
})
print(f"✅ S3: Processed {len(documents)} documents")
return len(documents)
except Exception as e:
print(f"❌ S3 processing failed: {e}")
return 0
def process_sharepoint_source(self, site_id, drive_id, folder_path=""):
"""Process documents from SharePoint"""
print(f"🔄 Processing SharePoint: {site_id}/{drive_id}/{folder_path}")
try:
documents = self.client.parse_sharepoint_folder(
site_id=site_id,
drive_id=drive_id,
folder_path=folder_path,
mode=ProcessingMode.DEFAULT,
timeout=300.0
)
for doc in documents:
self.results.append({
'source': 'SharePoint',
'location': f"sharepoint://{site_id}/{drive_id}/{doc.source_file}",
'document': doc,
'processed_at': datetime.now().isoformat()
})
print(f"✅ SharePoint: Processed {len(documents)} documents")
return len(documents)
except Exception as e:
print(f"❌ SharePoint processing failed: {e}")
return 0
def process_box_source(self, folder_id):
"""Process documents from Box"""
print(f"🔄 Processing Box: {folder_id}")
try:
documents = self.client.parse_box_folder(
folder_id=folder_id,
mode=ProcessingMode.DEFAULT,
timeout=300.0
)
for doc in documents:
self.results.append({
'source': 'Box',
'location': f"box://{folder_id}/{doc.source_file}",
'document': doc,
'processed_at': datetime.now().isoformat()
})
print(f"✅ Box: Processed {len(documents)} documents")
return len(documents)
except Exception as e:
print(f"❌ Box processing failed: {e}")
return 0
def process_dropbox_source(self, folder_path):
"""Process documents from Dropbox"""
print(f"🔄 Processing Dropbox: {folder_path}")
try:
documents = self.client.parse_dropbox_folder(
folder_path=folder_path,
mode=ProcessingMode.DEFAULT,
timeout=300.0
)
for doc in documents:
self.results.append({
'source': 'Dropbox',
'location': f"dropbox://{folder_path}/{doc.source_file}",
'document': doc,
'processed_at': datetime.now().isoformat()
})
print(f"✅ Dropbox: Processed {len(documents)} documents")
return len(documents)
except Exception as e:
print(f"❌ Dropbox processing failed: {e}")
return 0
def get_summary(self):
"""Get processing summary"""
by_source = {}
total_docs = len(self.results)
total_content = 0
total_tables = 0
for result in self.results:
source = result['source']
doc = result['document']
if source not in by_source:
by_source[source] = {
'count': 0,
'content_chars': 0,
'tables': 0
}
by_source[source]['count'] += 1
by_source[source]['content_chars'] += len(doc.content)
by_source[source]['tables'] += len(doc.tables)
total_content += len(doc.content)
total_tables += len(doc.tables)
return {
'total_documents': total_docs,
'total_content_chars': total_content,
'total_tables': total_tables,
'by_source': by_source
}
# Usage
processor = MultiCloudProcessor(api_key="your-api-key")
# Process from multiple cloud sources
processor.process_s3_source("my-s3-bucket", "documents/")
processor.process_sharepoint_source("site-id", "drive-id", "Shared Documents/")
processor.process_box_source("box-folder-id")
processor.process_dropbox_source("/Work Documents")
# Get summary
summary = processor.get_summary()
print(f"\n📊 Multi-Cloud Processing Summary:")
print(f" Total documents: {summary['total_documents']}")
print(f" Total content: {summary['total_content_chars']} characters")
print(f" Total tables: {summary['total_tables']}")
for source, stats in summary['by_source'].items():
print(f"\n {source}:")
print(f" Documents: {stats['count']}")
print(f" Content: {stats['content_chars']} chars")
print(f" Tables: {stats['tables']}")
Production Cloud Patterns
Robust Cloud Processing
Copy
from cerevox import Lexa, LexaError, ProcessingMode
import logging
import time
from pathlib import Path
import json
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ProductionCloudProcessor:
def __init__(self, api_key, config=None):
self.client = Lexa(
api_key=api_key,
timeout=300.0,
max_retries=3
)
self.config = config or {
'max_retries': 3,
'retry_delay': 2.0,
'batch_size': 20,
'processing_mode': ProcessingMode.DEFAULT
}
self.stats = {
'total_processed': 0,
'total_failed': 0,
'total_retries': 0,
'processing_time': 0
}
def process_with_resilience(self, process_func, *args, **kwargs):
"""Execute cloud processing with retry logic"""
max_retries = self.config['max_retries']
retry_delay = self.config['retry_delay']
for attempt in range(max_retries):
try:
start_time = time.time()
result = process_func(*args, **kwargs)
processing_time = time.time() - start_time
self.stats['processing_time'] += processing_time
self.stats['total_processed'] += len(result) if result else 0
logger.info(f"✅ Processing successful: {len(result) if result else 0} documents")
return result
except LexaError as e:
logger.warning(f"Attempt {attempt + 1} failed: {e.message}")
if attempt < max_retries - 1:
if e.error_code == "RATE_LIMIT_EXCEEDED":
wait_time = int(getattr(e, 'retry_after', retry_delay * (2 ** attempt)))
else:
wait_time = retry_delay * (2 ** attempt)
logger.info(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
self.stats['total_retries'] += 1
else:
logger.error(f"❌ All attempts failed: {e.message}")
self.stats['total_failed'] += 1
raise e
except Exception as e:
logger.error(f"❌ Unexpected error: {e}")
self.stats['total_failed'] += 1
if attempt == max_retries - 1:
raise e
time.sleep(retry_delay * (2 ** attempt))
self.stats['total_retries'] += 1
return None
def process_s3_resilient(self, bucket, folder_path=""):
"""Resilient S3 processing"""
return self.process_with_resilience(
self.client.parse_s3_folder,
bucket=bucket,
folder_path=folder_path,
mode=self.config['processing_mode'],
timeout=300.0
)
def process_sharepoint_resilient(self, site_id, drive_id, folder_path=""):
"""Resilient SharePoint processing"""
return self.process_with_resilience(
self.client.parse_sharepoint_folder,
site_id=site_id,
drive_id=drive_id,
folder_path=folder_path,
mode=self.config['processing_mode'],
timeout=300.0
)
def save_checkpoint(self, results, checkpoint_file):
"""Save processing checkpoint"""
checkpoint_data = {
'results': [
{
'source_file': doc.source_file,
'content_length': len(doc.content),
'tables_count': len(doc.tables),
'images_count': len(doc.images)
}
for doc in results
],
'stats': self.stats,
'timestamp': time.time()
}
with open(checkpoint_file, 'w') as f:
json.dump(checkpoint_data, f, indent=2)
logger.info(f"💾 Checkpoint saved: {checkpoint_file}")
def get_processing_report(self):
"""Generate processing report"""
return {
'summary': {
'total_processed': self.stats['total_processed'],
'total_failed': self.stats['total_failed'],
'total_retries': self.stats['total_retries'],
'success_rate': (
self.stats['total_processed'] /
(self.stats['total_processed'] + self.stats['total_failed'])
if (self.stats['total_processed'] + self.stats['total_failed']) > 0
else 0
) * 100,
'total_processing_time': self.stats['processing_time'],
'avg_processing_time': (
self.stats['processing_time'] / self.stats['total_processed']
if self.stats['total_processed'] > 0
else 0
)
},
'recommendations': self._get_recommendations()
}
def _get_recommendations(self):
"""Get performance recommendations"""
recommendations = []
success_rate = (
self.stats['total_processed'] /
(self.stats['total_processed'] + self.stats['total_failed'])
if (self.stats['total_processed'] + self.stats['total_failed']) > 0
else 0
) * 100
if success_rate < 90:
recommendations.append("Consider increasing retry limits or timeout values")
if self.stats['total_retries'] > self.stats['total_processed'] * 0.5:
recommendations.append("High retry rate detected - check network connectivity")
avg_time = (
self.stats['processing_time'] / self.stats['total_processed']
if self.stats['total_processed'] > 0
else 0
)
if avg_time > 10:
recommendations.append("Consider using FAST processing mode for better performance")
return recommendations
# Usage
processor = ProductionCloudProcessor(
api_key="your-api-key",
config={
'max_retries': 5,
'retry_delay': 3.0,
'processing_mode': ProcessingMode.DEFAULT
}
)
# Process with resilience
try:
s3_docs = processor.process_s3_resilient("my-bucket", "documents/")
processor.save_checkpoint(s3_docs, "s3_checkpoint.json")
sharepoint_docs = processor.process_sharepoint_resilient(
"site-id", "drive-id", "Shared Documents/"
)
processor.save_checkpoint(sharepoint_docs, "sharepoint_checkpoint.json")
except Exception as e:
logger.error(f"Critical failure: {e}")
# Generate report
report = processor.get_processing_report()
print(f"\n📊 Processing Report:")
print(f" Success Rate: {report['summary']['success_rate']:.1f}%")
print(f" Total Processed: {report['summary']['total_processed']}")
print(f" Total Failed: {report['summary']['total_failed']}")
print(f" Average Time: {report['summary']['avg_processing_time']:.1f}s per document")
if report['recommendations']:
print(f"\n💡 Recommendations:")
for rec in report['recommendations']:
print(f" • {rec}")
Next Steps
Explore advanced patterns for sophisticated document processing workflows.
On this page
- Supported Cloud Services
- Amazon S3 Integration
- Basic S3 Operations
- Advanced S3 Patterns
- Microsoft SharePoint Integration
- SharePoint Operations
- SharePoint Automation
- Box Integration
- Box Operations
- Dropbox Integration
- Dropbox Operations
- Multi-Cloud Processing
- Unified Cloud Processing
- Production Cloud Patterns
- Robust Cloud Processing
Assistant
Responses are generated using AI and may contain mistakes.