Supported Cloud Services

Amazon S3

Parse documents from S3 buckets with IAM integration

Microsoft SharePoint

Access SharePoint sites and document libraries

Box

Parse files from Box folders and enterprise content

Dropbox

Process documents from Dropbox folders

Amazon S3 Integration

Basic S3 Operations

from cerevox import Lexa

client = Lexa(api_key="your-api-key")

# List all available S3 buckets
buckets = client.list_s3_buckets()

print(f"Found {len(buckets.buckets)} buckets:")
for bucket in buckets.buckets:
    print(f"  📦 {bucket.name} (Created: {bucket.creation_date})")

Advanced S3 Patterns

from cerevox import Lexa

def s3_progress_callback(status):
    print(f"📊 S3 Processing: {status.status}")
    if hasattr(status, 'progress') and status.progress:
        print(f"   Progress: {status.progress}%")
    if hasattr(status, 'files_processed'):
        processed = getattr(status, 'files_processed', 0)
        total = getattr(status, 'total_files', 0)
        print(f"   Files: {processed}/{total}")

client = Lexa(api_key="your-api-key")

# Parse with detailed progress monitoring
documents = client.parse_s3_folder(
    bucket="large-document-bucket",
    folder_path="annual-reports/",
    progress_callback=s3_progress_callback,
    timeout=600.0,  # 10 minutes for large batch
    poll_interval=5.0
)

print(f"✅ Completed: {len(documents)} documents")

Microsoft SharePoint Integration

SharePoint Operations

from cerevox import Lexa

client = Lexa(api_key="your-api-key")

# List all available SharePoint sites
sites = client.list_sharepoint_sites()

print(f"Found {len(sites.sites)} SharePoint sites:")
for site in sites.sites:
    print(f"  🏢 {site.name}")
    print(f"      ID: {site.id}")
    print(f"      URL: {site.web_url}")

SharePoint Automation

from cerevox import Lexa, ProcessingMode
import json
from datetime import datetime

def process_sharepoint_site(site_id, output_dir="sharepoint_results"):
    """Complete SharePoint site processing workflow"""
    
    client = Lexa(api_key="your-api-key")
    
    # Step 1: Get all drives in the site
    print("🔍 Discovering SharePoint structure...")
    drives = client.list_sharepoint_drives(site_id)
    
    all_results = []
    
    for drive in drives.drives:
        print(f"\n📁 Processing drive: {drive.name}")
        
        try:
            # Parse all documents in the drive
            documents = client.parse_sharepoint_folder(
                site_id=site_id,
                drive_id=drive.id,
                folder_path="",  # Root folder
                mode=ProcessingMode.DEFAULT,
                timeout=600.0
            )
            
            # Process each document
            for doc in documents:
                result = {
                    'drive_name': drive.name,
                    'drive_id': drive.id,
                    'document_title': doc.title,
                    'source_file': doc.source_file,
                    'content_length': len(doc.content),
                    'page_count': doc.page_count,
                    'tables_count': len(doc.tables),
                    'images_count': len(doc.images),
                    'processed_at': datetime.now().isoformat(),
                    'preview': doc.content[:300] if doc.content else ""
                }
                
                # Extract table summaries
                if doc.tables:
                    result['table_summary'] = [
                        {
                            'rows': table.rows,
                            'columns': table.columns,
                            'page': table.page_number
                        }
                        for table in doc.tables
                    ]
                
                # Get text chunks for analysis
                chunks = doc.get_text_chunks(target_size=400)
                result['chunks_count'] = len(chunks)
                
                all_results.append(result)
            
            print(f"✅ Processed {len(documents)} documents from {drive.name}")
            
        except Exception as e:
            print(f"❌ Failed to process drive {drive.name}: {e}")
            continue
    
    # Save results
    output_file = f"{output_dir}/sharepoint_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
    with open(output_file, 'w') as f:
        json.dump(all_results, f, indent=2)
    
    print(f"\n📊 Analysis complete:")
    print(f"   Total documents: {len(all_results)}")
    print(f"   Results saved to: {output_file}")
    
    return all_results

# Usage
results = process_sharepoint_site("your-sharepoint-site-id")

Box Integration

Box Operations

from cerevox import Lexa

client = Lexa(api_key="your-api-key")

# List root folders
folders = client.list_box_folders()

print(f"Root folders in Box:")
for folder in folders.folders:
    print(f"  📁 {folder.name} (ID: {folder.id})")
    print(f"      Created: {folder.created_at}")
    print(f"      Modified: {folder.modified_at}")

# List specific folder contents
subfolder_id = "123456789"
subfolders = client.list_box_folders(subfolder_id)

print(f"\nSubfolders in {subfolder_id}:")
for folder in subfolders.folders:
    print(f"  📁 {folder.name}")

Dropbox Integration

Dropbox Operations

from cerevox import Lexa

client = Lexa(api_key="your-api-key")

# List root folders
folders = client.list_dropbox_folders()

print("Root folders in Dropbox:")
for folder in folders.folders:
    print(f"  📁 {folder.name}")
    print(f"      Path: {folder.path_display}")

# List specific folder
specific_folders = client.list_dropbox_folders("/Documents/Work")

print(f"\nContents of /Documents/Work:")
for folder in specific_folders.folders:
    print(f"  📁 {folder.name}")

Multi-Cloud Processing

Unified Cloud Processing

from cerevox import Lexa, ProcessingMode
import asyncio
from datetime import datetime

class MultiCloudProcessor:
    def __init__(self, api_key):
        self.client = Lexa(api_key=api_key)
        self.results = []
    
    def process_s3_source(self, bucket, folder_path=""):
        """Process documents from S3"""
        print(f"🔄 Processing S3: s3://{bucket}/{folder_path}")
        
        try:
            documents = self.client.parse_s3_folder(
                bucket=bucket,
                folder_path=folder_path,
                mode=ProcessingMode.DEFAULT,
                timeout=300.0
            )
            
            for doc in documents:
                self.results.append({
                    'source': 'S3',
                    'location': f"s3://{bucket}/{doc.source_file}",
                    'document': doc,
                    'processed_at': datetime.now().isoformat()
                })
            
            print(f"✅ S3: Processed {len(documents)} documents")
            return len(documents)
            
        except Exception as e:
            print(f"❌ S3 processing failed: {e}")
            return 0
    
    def process_sharepoint_source(self, site_id, drive_id, folder_path=""):
        """Process documents from SharePoint"""
        print(f"🔄 Processing SharePoint: {site_id}/{drive_id}/{folder_path}")
        
        try:
            documents = self.client.parse_sharepoint_folder(
                site_id=site_id,
                drive_id=drive_id,
                folder_path=folder_path,
                mode=ProcessingMode.DEFAULT,
                timeout=300.0
            )
            
            for doc in documents:
                self.results.append({
                    'source': 'SharePoint',
                    'location': f"sharepoint://{site_id}/{drive_id}/{doc.source_file}",
                    'document': doc,
                    'processed_at': datetime.now().isoformat()
                })
            
            print(f"✅ SharePoint: Processed {len(documents)} documents")
            return len(documents)
            
        except Exception as e:
            print(f"❌ SharePoint processing failed: {e}")
            return 0
    
    def process_box_source(self, folder_id):
        """Process documents from Box"""
        print(f"🔄 Processing Box: {folder_id}")
        
        try:
            documents = self.client.parse_box_folder(
                folder_id=folder_id,
                mode=ProcessingMode.DEFAULT,
                timeout=300.0
            )
            
            for doc in documents:
                self.results.append({
                    'source': 'Box',
                    'location': f"box://{folder_id}/{doc.source_file}",
                    'document': doc,
                    'processed_at': datetime.now().isoformat()
                })
            
            print(f"✅ Box: Processed {len(documents)} documents")
            return len(documents)
            
        except Exception as e:
            print(f"❌ Box processing failed: {e}")
            return 0
    
    def process_dropbox_source(self, folder_path):
        """Process documents from Dropbox"""
        print(f"🔄 Processing Dropbox: {folder_path}")
        
        try:
            documents = self.client.parse_dropbox_folder(
                folder_path=folder_path,
                mode=ProcessingMode.DEFAULT,
                timeout=300.0
            )
            
            for doc in documents:
                self.results.append({
                    'source': 'Dropbox',
                    'location': f"dropbox://{folder_path}/{doc.source_file}",
                    'document': doc,
                    'processed_at': datetime.now().isoformat()
                })
            
            print(f"✅ Dropbox: Processed {len(documents)} documents")
            return len(documents)
            
        except Exception as e:
            print(f"❌ Dropbox processing failed: {e}")
            return 0
    
    def get_summary(self):
        """Get processing summary"""
        by_source = {}
        total_docs = len(self.results)
        total_content = 0
        total_tables = 0
        
        for result in self.results:
            source = result['source']
            doc = result['document']
            
            if source not in by_source:
                by_source[source] = {
                    'count': 0,
                    'content_chars': 0,
                    'tables': 0
                }
            
            by_source[source]['count'] += 1
            by_source[source]['content_chars'] += len(doc.content)
            by_source[source]['tables'] += len(doc.tables)
            
            total_content += len(doc.content)
            total_tables += len(doc.tables)
        
        return {
            'total_documents': total_docs,
            'total_content_chars': total_content,
            'total_tables': total_tables,
            'by_source': by_source
        }

# Usage
processor = MultiCloudProcessor(api_key="your-api-key")

# Process from multiple cloud sources
processor.process_s3_source("my-s3-bucket", "documents/")
processor.process_sharepoint_source("site-id", "drive-id", "Shared Documents/")
processor.process_box_source("box-folder-id")
processor.process_dropbox_source("/Work Documents")

# Get summary
summary = processor.get_summary()
print(f"\n📊 Multi-Cloud Processing Summary:")
print(f"   Total documents: {summary['total_documents']}")
print(f"   Total content: {summary['total_content_chars']} characters")
print(f"   Total tables: {summary['total_tables']}")

for source, stats in summary['by_source'].items():
    print(f"\n   {source}:")
    print(f"     Documents: {stats['count']}")
    print(f"     Content: {stats['content_chars']} chars")
    print(f"     Tables: {stats['tables']}")

Production Cloud Patterns

Robust Cloud Processing

from cerevox import Lexa, LexaError, ProcessingMode
import logging
import time
from pathlib import Path
import json

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ProductionCloudProcessor:
    def __init__(self, api_key, config=None):
        self.client = Lexa(
            api_key=api_key,
            timeout=300.0,
            max_retries=3
        )
        
        self.config = config or {
            'max_retries': 3,
            'retry_delay': 2.0,
            'batch_size': 20,
            'processing_mode': ProcessingMode.DEFAULT
        }
        
        self.stats = {
            'total_processed': 0,
            'total_failed': 0,
            'total_retries': 0,
            'processing_time': 0
        }
    
    def process_with_resilience(self, process_func, *args, **kwargs):
        """Execute cloud processing with retry logic"""
        max_retries = self.config['max_retries']
        retry_delay = self.config['retry_delay']
        
        for attempt in range(max_retries):
            try:
                start_time = time.time()
                result = process_func(*args, **kwargs)
                
                processing_time = time.time() - start_time
                self.stats['processing_time'] += processing_time
                self.stats['total_processed'] += len(result) if result else 0
                
                logger.info(f"✅ Processing successful: {len(result) if result else 0} documents")
                return result
                
            except LexaError as e:
                logger.warning(f"Attempt {attempt + 1} failed: {e.message}")
                
                if attempt < max_retries - 1:
                    if e.error_code == "RATE_LIMIT_EXCEEDED":
                        wait_time = int(getattr(e, 'retry_after', retry_delay * (2 ** attempt)))
                    else:
                        wait_time = retry_delay * (2 ** attempt)
                    
                    logger.info(f"Retrying in {wait_time} seconds...")
                    time.sleep(wait_time)
                    self.stats['total_retries'] += 1
                else:
                    logger.error(f"❌ All attempts failed: {e.message}")
                    self.stats['total_failed'] += 1
                    raise e
            
            except Exception as e:
                logger.error(f"❌ Unexpected error: {e}")
                self.stats['total_failed'] += 1
                if attempt == max_retries - 1:
                    raise e
                time.sleep(retry_delay * (2 ** attempt))
                self.stats['total_retries'] += 1
        
        return None
    
    def process_s3_resilient(self, bucket, folder_path=""):
        """Resilient S3 processing"""
        return self.process_with_resilience(
            self.client.parse_s3_folder,
            bucket=bucket,
            folder_path=folder_path,
            mode=self.config['processing_mode'],
            timeout=300.0
        )
    
    def process_sharepoint_resilient(self, site_id, drive_id, folder_path=""):
        """Resilient SharePoint processing"""
        return self.process_with_resilience(
            self.client.parse_sharepoint_folder,
            site_id=site_id,
            drive_id=drive_id,
            folder_path=folder_path,
            mode=self.config['processing_mode'],
            timeout=300.0
        )
    
    def save_checkpoint(self, results, checkpoint_file):
        """Save processing checkpoint"""
        checkpoint_data = {
            'results': [
                {
                    'source_file': doc.source_file,
                    'content_length': len(doc.content),
                    'tables_count': len(doc.tables),
                    'images_count': len(doc.images)
                }
                for doc in results
            ],
            'stats': self.stats,
            'timestamp': time.time()
        }
        
        with open(checkpoint_file, 'w') as f:
            json.dump(checkpoint_data, f, indent=2)
        
        logger.info(f"💾 Checkpoint saved: {checkpoint_file}")
    
    def get_processing_report(self):
        """Generate processing report"""
        return {
            'summary': {
                'total_processed': self.stats['total_processed'],
                'total_failed': self.stats['total_failed'],
                'total_retries': self.stats['total_retries'],
                'success_rate': (
                    self.stats['total_processed'] / 
                    (self.stats['total_processed'] + self.stats['total_failed'])
                    if (self.stats['total_processed'] + self.stats['total_failed']) > 0 
                    else 0
                ) * 100,
                'total_processing_time': self.stats['processing_time'],
                'avg_processing_time': (
                    self.stats['processing_time'] / self.stats['total_processed']
                    if self.stats['total_processed'] > 0 
                    else 0
                )
            },
            'recommendations': self._get_recommendations()
        }
    
    def _get_recommendations(self):
        """Get performance recommendations"""
        recommendations = []
        
        success_rate = (
            self.stats['total_processed'] / 
            (self.stats['total_processed'] + self.stats['total_failed'])
            if (self.stats['total_processed'] + self.stats['total_failed']) > 0 
            else 0
        ) * 100
        
        if success_rate < 90:
            recommendations.append("Consider increasing retry limits or timeout values")
        
        if self.stats['total_retries'] > self.stats['total_processed'] * 0.5:
            recommendations.append("High retry rate detected - check network connectivity")
        
        avg_time = (
            self.stats['processing_time'] / self.stats['total_processed']
            if self.stats['total_processed'] > 0 
            else 0
        )
        
        if avg_time > 10:
            recommendations.append("Consider using FAST processing mode for better performance")
        
        return recommendations

# Usage
processor = ProductionCloudProcessor(
    api_key="your-api-key",
    config={
        'max_retries': 5,
        'retry_delay': 3.0,
        'processing_mode': ProcessingMode.DEFAULT
    }
)

# Process with resilience
try:
    s3_docs = processor.process_s3_resilient("my-bucket", "documents/")
    processor.save_checkpoint(s3_docs, "s3_checkpoint.json")
    
    sharepoint_docs = processor.process_sharepoint_resilient(
        "site-id", "drive-id", "Shared Documents/"
    )
    processor.save_checkpoint(sharepoint_docs, "sharepoint_checkpoint.json")
    
except Exception as e:
    logger.error(f"Critical failure: {e}")

# Generate report
report = processor.get_processing_report()
print(f"\n📊 Processing Report:")
print(f"   Success Rate: {report['summary']['success_rate']:.1f}%")
print(f"   Total Processed: {report['summary']['total_processed']}")
print(f"   Total Failed: {report['summary']['total_failed']}")
print(f"   Average Time: {report['summary']['avg_processing_time']:.1f}s per document")

if report['recommendations']:
    print(f"\n💡 Recommendations:")
    for rec in report['recommendations']:
        print(f"   • {rec}")

Next Steps

Explore advanced patterns for sophisticated document processing workflows.