Parse documents directly from cloud storage services with Lexa
from cerevox import Lexa
client = Lexa(api_key="your-api-key")
# List all available S3 buckets
buckets = client.list_s3_buckets()
print(f"Found {len(buckets.buckets)} buckets:")
for bucket in buckets.buckets:
print(f" 📦 {bucket.name} (Created: {bucket.creation_date})")
from cerevox import Lexa
def s3_progress_callback(status):
print(f"📊 S3 Processing: {status.status}")
if hasattr(status, 'progress') and status.progress:
print(f" Progress: {status.progress}%")
if hasattr(status, 'files_processed'):
processed = getattr(status, 'files_processed', 0)
total = getattr(status, 'total_files', 0)
print(f" Files: {processed}/{total}")
client = Lexa(api_key="your-api-key")
# Parse with detailed progress monitoring
documents = client.parse_s3_folder(
bucket="large-document-bucket",
folder_path="annual-reports/",
progress_callback=s3_progress_callback,
timeout=600.0, # 10 minutes for large batch
poll_interval=5.0
)
print(f"✅ Completed: {len(documents)} documents")
from cerevox import Lexa
client = Lexa(api_key="your-api-key")
# List all available SharePoint sites
sites = client.list_sharepoint_sites()
print(f"Found {len(sites.sites)} SharePoint sites:")
for site in sites.sites:
print(f" 🏢 {site.name}")
print(f" ID: {site.id}")
print(f" URL: {site.web_url}")
from cerevox import Lexa, ProcessingMode
import json
from datetime import datetime
def process_sharepoint_site(site_id, output_dir="sharepoint_results"):
"""Complete SharePoint site processing workflow"""
client = Lexa(api_key="your-api-key")
# Step 1: Get all drives in the site
print("🔍 Discovering SharePoint structure...")
drives = client.list_sharepoint_drives(site_id)
all_results = []
for drive in drives.drives:
print(f"\n📁 Processing drive: {drive.name}")
try:
# Parse all documents in the drive
documents = client.parse_sharepoint_folder(
site_id=site_id,
drive_id=drive.id,
folder_path="", # Root folder
mode=ProcessingMode.DEFAULT,
timeout=600.0
)
# Process each document
for doc in documents:
result = {
'drive_name': drive.name,
'drive_id': drive.id,
'document_title': doc.title,
'source_file': doc.source_file,
'content_length': len(doc.content),
'page_count': doc.page_count,
'tables_count': len(doc.tables),
'images_count': len(doc.images),
'processed_at': datetime.now().isoformat(),
'preview': doc.content[:300] if doc.content else ""
}
# Extract table summaries
if doc.tables:
result['table_summary'] = [
{
'rows': table.rows,
'columns': table.columns,
'page': table.page_number
}
for table in doc.tables
]
# Get text chunks for analysis
chunks = doc.get_text_chunks(target_size=400)
result['chunks_count'] = len(chunks)
all_results.append(result)
print(f"✅ Processed {len(documents)} documents from {drive.name}")
except Exception as e:
print(f"❌ Failed to process drive {drive.name}: {e}")
continue
# Save results
output_file = f"{output_dir}/sharepoint_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(output_file, 'w') as f:
json.dump(all_results, f, indent=2)
print(f"\n📊 Analysis complete:")
print(f" Total documents: {len(all_results)}")
print(f" Results saved to: {output_file}")
return all_results
# Usage
results = process_sharepoint_site("your-sharepoint-site-id")
from cerevox import Lexa
client = Lexa(api_key="your-api-key")
# List root folders
folders = client.list_box_folders()
print(f"Root folders in Box:")
for folder in folders.folders:
print(f" 📁 {folder.name} (ID: {folder.id})")
print(f" Created: {folder.created_at}")
print(f" Modified: {folder.modified_at}")
# List specific folder contents
subfolder_id = "123456789"
subfolders = client.list_box_folders(subfolder_id)
print(f"\nSubfolders in {subfolder_id}:")
for folder in subfolders.folders:
print(f" 📁 {folder.name}")
from cerevox import Lexa
client = Lexa(api_key="your-api-key")
# List root folders
folders = client.list_dropbox_folders()
print("Root folders in Dropbox:")
for folder in folders.folders:
print(f" 📁 {folder.name}")
print(f" Path: {folder.path_display}")
# List specific folder
specific_folders = client.list_dropbox_folders("/Documents/Work")
print(f"\nContents of /Documents/Work:")
for folder in specific_folders.folders:
print(f" 📁 {folder.name}")
from cerevox import Lexa, ProcessingMode
import asyncio
from datetime import datetime
class MultiCloudProcessor:
def __init__(self, api_key):
self.client = Lexa(api_key=api_key)
self.results = []
def process_s3_source(self, bucket, folder_path=""):
"""Process documents from S3"""
print(f"🔄 Processing S3: s3://{bucket}/{folder_path}")
try:
documents = self.client.parse_s3_folder(
bucket=bucket,
folder_path=folder_path,
mode=ProcessingMode.DEFAULT,
timeout=300.0
)
for doc in documents:
self.results.append({
'source': 'S3',
'location': f"s3://{bucket}/{doc.source_file}",
'document': doc,
'processed_at': datetime.now().isoformat()
})
print(f"✅ S3: Processed {len(documents)} documents")
return len(documents)
except Exception as e:
print(f"❌ S3 processing failed: {e}")
return 0
def process_sharepoint_source(self, site_id, drive_id, folder_path=""):
"""Process documents from SharePoint"""
print(f"🔄 Processing SharePoint: {site_id}/{drive_id}/{folder_path}")
try:
documents = self.client.parse_sharepoint_folder(
site_id=site_id,
drive_id=drive_id,
folder_path=folder_path,
mode=ProcessingMode.DEFAULT,
timeout=300.0
)
for doc in documents:
self.results.append({
'source': 'SharePoint',
'location': f"sharepoint://{site_id}/{drive_id}/{doc.source_file}",
'document': doc,
'processed_at': datetime.now().isoformat()
})
print(f"✅ SharePoint: Processed {len(documents)} documents")
return len(documents)
except Exception as e:
print(f"❌ SharePoint processing failed: {e}")
return 0
def process_box_source(self, folder_id):
"""Process documents from Box"""
print(f"🔄 Processing Box: {folder_id}")
try:
documents = self.client.parse_box_folder(
folder_id=folder_id,
mode=ProcessingMode.DEFAULT,
timeout=300.0
)
for doc in documents:
self.results.append({
'source': 'Box',
'location': f"box://{folder_id}/{doc.source_file}",
'document': doc,
'processed_at': datetime.now().isoformat()
})
print(f"✅ Box: Processed {len(documents)} documents")
return len(documents)
except Exception as e:
print(f"❌ Box processing failed: {e}")
return 0
def process_dropbox_source(self, folder_path):
"""Process documents from Dropbox"""
print(f"🔄 Processing Dropbox: {folder_path}")
try:
documents = self.client.parse_dropbox_folder(
folder_path=folder_path,
mode=ProcessingMode.DEFAULT,
timeout=300.0
)
for doc in documents:
self.results.append({
'source': 'Dropbox',
'location': f"dropbox://{folder_path}/{doc.source_file}",
'document': doc,
'processed_at': datetime.now().isoformat()
})
print(f"✅ Dropbox: Processed {len(documents)} documents")
return len(documents)
except Exception as e:
print(f"❌ Dropbox processing failed: {e}")
return 0
def get_summary(self):
"""Get processing summary"""
by_source = {}
total_docs = len(self.results)
total_content = 0
total_tables = 0
for result in self.results:
source = result['source']
doc = result['document']
if source not in by_source:
by_source[source] = {
'count': 0,
'content_chars': 0,
'tables': 0
}
by_source[source]['count'] += 1
by_source[source]['content_chars'] += len(doc.content)
by_source[source]['tables'] += len(doc.tables)
total_content += len(doc.content)
total_tables += len(doc.tables)
return {
'total_documents': total_docs,
'total_content_chars': total_content,
'total_tables': total_tables,
'by_source': by_source
}
# Usage
processor = MultiCloudProcessor(api_key="your-api-key")
# Process from multiple cloud sources
processor.process_s3_source("my-s3-bucket", "documents/")
processor.process_sharepoint_source("site-id", "drive-id", "Shared Documents/")
processor.process_box_source("box-folder-id")
processor.process_dropbox_source("/Work Documents")
# Get summary
summary = processor.get_summary()
print(f"\n📊 Multi-Cloud Processing Summary:")
print(f" Total documents: {summary['total_documents']}")
print(f" Total content: {summary['total_content_chars']} characters")
print(f" Total tables: {summary['total_tables']}")
for source, stats in summary['by_source'].items():
print(f"\n {source}:")
print(f" Documents: {stats['count']}")
print(f" Content: {stats['content_chars']} chars")
print(f" Tables: {stats['tables']}")
from cerevox import Lexa, LexaError, ProcessingMode
import logging
import time
from pathlib import Path
import json
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ProductionCloudProcessor:
def __init__(self, api_key, config=None):
self.client = Lexa(
api_key=api_key,
timeout=300.0,
max_retries=3
)
self.config = config or {
'max_retries': 3,
'retry_delay': 2.0,
'batch_size': 20,
'processing_mode': ProcessingMode.DEFAULT
}
self.stats = {
'total_processed': 0,
'total_failed': 0,
'total_retries': 0,
'processing_time': 0
}
def process_with_resilience(self, process_func, *args, **kwargs):
"""Execute cloud processing with retry logic"""
max_retries = self.config['max_retries']
retry_delay = self.config['retry_delay']
for attempt in range(max_retries):
try:
start_time = time.time()
result = process_func(*args, **kwargs)
processing_time = time.time() - start_time
self.stats['processing_time'] += processing_time
self.stats['total_processed'] += len(result) if result else 0
logger.info(f"✅ Processing successful: {len(result) if result else 0} documents")
return result
except LexaError as e:
logger.warning(f"Attempt {attempt + 1} failed: {e.message}")
if attempt < max_retries - 1:
if e.error_code == "RATE_LIMIT_EXCEEDED":
wait_time = int(getattr(e, 'retry_after', retry_delay * (2 ** attempt)))
else:
wait_time = retry_delay * (2 ** attempt)
logger.info(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
self.stats['total_retries'] += 1
else:
logger.error(f"❌ All attempts failed: {e.message}")
self.stats['total_failed'] += 1
raise e
except Exception as e:
logger.error(f"❌ Unexpected error: {e}")
self.stats['total_failed'] += 1
if attempt == max_retries - 1:
raise e
time.sleep(retry_delay * (2 ** attempt))
self.stats['total_retries'] += 1
return None
def process_s3_resilient(self, bucket, folder_path=""):
"""Resilient S3 processing"""
return self.process_with_resilience(
self.client.parse_s3_folder,
bucket=bucket,
folder_path=folder_path,
mode=self.config['processing_mode'],
timeout=300.0
)
def process_sharepoint_resilient(self, site_id, drive_id, folder_path=""):
"""Resilient SharePoint processing"""
return self.process_with_resilience(
self.client.parse_sharepoint_folder,
site_id=site_id,
drive_id=drive_id,
folder_path=folder_path,
mode=self.config['processing_mode'],
timeout=300.0
)
def save_checkpoint(self, results, checkpoint_file):
"""Save processing checkpoint"""
checkpoint_data = {
'results': [
{
'source_file': doc.source_file,
'content_length': len(doc.content),
'tables_count': len(doc.tables),
'images_count': len(doc.images)
}
for doc in results
],
'stats': self.stats,
'timestamp': time.time()
}
with open(checkpoint_file, 'w') as f:
json.dump(checkpoint_data, f, indent=2)
logger.info(f"💾 Checkpoint saved: {checkpoint_file}")
def get_processing_report(self):
"""Generate processing report"""
return {
'summary': {
'total_processed': self.stats['total_processed'],
'total_failed': self.stats['total_failed'],
'total_retries': self.stats['total_retries'],
'success_rate': (
self.stats['total_processed'] /
(self.stats['total_processed'] + self.stats['total_failed'])
if (self.stats['total_processed'] + self.stats['total_failed']) > 0
else 0
) * 100,
'total_processing_time': self.stats['processing_time'],
'avg_processing_time': (
self.stats['processing_time'] / self.stats['total_processed']
if self.stats['total_processed'] > 0
else 0
)
},
'recommendations': self._get_recommendations()
}
def _get_recommendations(self):
"""Get performance recommendations"""
recommendations = []
success_rate = (
self.stats['total_processed'] /
(self.stats['total_processed'] + self.stats['total_failed'])
if (self.stats['total_processed'] + self.stats['total_failed']) > 0
else 0
) * 100
if success_rate < 90:
recommendations.append("Consider increasing retry limits or timeout values")
if self.stats['total_retries'] > self.stats['total_processed'] * 0.5:
recommendations.append("High retry rate detected - check network connectivity")
avg_time = (
self.stats['processing_time'] / self.stats['total_processed']
if self.stats['total_processed'] > 0
else 0
)
if avg_time > 10:
recommendations.append("Consider using FAST processing mode for better performance")
return recommendations
# Usage
processor = ProductionCloudProcessor(
api_key="your-api-key",
config={
'max_retries': 5,
'retry_delay': 3.0,
'processing_mode': ProcessingMode.DEFAULT
}
)
# Process with resilience
try:
s3_docs = processor.process_s3_resilient("my-bucket", "documents/")
processor.save_checkpoint(s3_docs, "s3_checkpoint.json")
sharepoint_docs = processor.process_sharepoint_resilient(
"site-id", "drive-id", "Shared Documents/"
)
processor.save_checkpoint(sharepoint_docs, "sharepoint_checkpoint.json")
except Exception as e:
logger.error(f"Critical failure: {e}")
# Generate report
report = processor.get_processing_report()
print(f"\n📊 Processing Report:")
print(f" Success Rate: {report['summary']['success_rate']:.1f}%")
print(f" Total Processed: {report['summary']['total_processed']}")
print(f" Total Failed: {report['summary']['total_failed']}")
print(f" Average Time: {report['summary']['avg_processing_time']:.1f}s per document")
if report['recommendations']:
print(f"\n💡 Recommendations:")
for rec in report['recommendations']:
print(f" • {rec}")