Exception Hierarchy

from cerevox import (
    LexaError,                   # Base exception
    LexaAuthError,               # Authentication/authorization issues
    LexaRateLimitError,          # Rate limiting
    LexaTimeoutError,            # Request timeouts
    LexaJobFailedError,          # Document processing failures
    LexaUnsupportedFileError,    # Unsupported file types
    LexaValidationError,         # Request validation errors
    LexaQuotaExceededError,      # Usage quota exceeded
    LexaServerError              # Server-side errors
)

Error Types

LexaError (Base Exception)

All Lexa-specific exceptions inherit from LexaError.
from cerevox import Lexa, LexaError

client = Lexa(api_key="your-api-key")

try:
    documents = client.parse(["document.pdf"])
except LexaError as e:
    print(f"Lexa error: {e.message}")
    print(f"Status code: {e.status_code}")
    print(f"Request ID: {e.request_id}")
    print(f"Retry suggested: {e.retry_suggested}")

LexaAuthError

Raised when API key authentication or authorization fails.
# Invalid API key
client = Lexa(api_key="invalid-key")

# Missing API key
client = Lexa()  # No CEREVOX_API_KEY environment variable

# Expired or revoked API key
# Access forbidden due to permissions
Status Codes:
  • 401 - Invalid or expired API key
  • 403 - Access forbidden (insufficient permissions)

LexaRateLimitError

Raised when API rate limits are exceeded.
from cerevox import Lexa, LexaRateLimitError
import time

def parse_with_retry(client, files, max_retries=3):
    for attempt in range(max_retries):
        try:
            return client.parse(files)
        except LexaRateLimitError as e:
            if attempt == max_retries - 1:
                raise e
            
            # Use recommended retry delay
            wait_time = e.get_retry_delay()
            print(f"Rate limited. Waiting {wait_time} seconds...")
            time.sleep(wait_time)
    
    raise LexaRateLimitError("Max retries exceeded")
Status Code: 429

LexaValidationError

Raised when request parameters are invalid.
from cerevox import Lexa, LexaValidationError

client = Lexa(api_key="your-api-key")

try:
    # Invalid file path or parameters
    documents = client.parse(["/nonexistent/file.pdf"])
except LexaValidationError as e:
    print(f"Validation error: {e.message}")
    print(f"Validation errors: {e.validation_errors}")
    print(f"Retry suggested: {e.retry_suggested}")  # Always False
Status Code: 400

LexaJobFailedError

Raised when document processing fails on the server side.
from cerevox import Lexa, LexaJobFailedError

try:
    documents = client.parse(["corrupted-document.pdf"])
except LexaJobFailedError as e:
    print(f"Processing failed: {e.message}")
    print(f"Job ID: {e.job_id}")
    print(f"Failure reason: {e.failure_reason}")
    print(f"Retry suggested: {e.retry_suggested}")
Common Failure Reasons:
  • invalid_file_format - Unsupported file format (not retryable)
  • file_corrupted - File is corrupted (not retryable)
  • file_too_large - File exceeds size limits (not retryable)
  • unsupported_format - Format not supported (not retryable)
  • temporary_server_error - Temporary issue (retryable)

LexaUnsupportedFileError

Raised when attempting to process unsupported file formats.
from cerevox import Lexa, LexaUnsupportedFileError

try:
    documents = client.parse(["document.xyz"])
except LexaUnsupportedFileError as e:
    print(f"Unsupported file type: {e.message}")
    print(f"File type: {e.file_type}")
    print(f"Supported types: {e.supported_types}")
    print(f"Retry suggested: {e.retry_suggested}")  # Always False
Status Code: 415

LexaTimeoutError

Raised when operations exceed timeout limits.
from cerevox import Lexa, LexaTimeoutError
import time

def parse_with_custom_timeout(client, files, base_timeout=60):
    """Parse with dynamic timeout based on file size"""
    
    # Calculate timeout based on file sizes
    total_size = sum(os.path.getsize(f) for f in files if isinstance(f, str))
    timeout = base_timeout + (total_size // (1024 * 1024))  # +1s per MB
    
    try:
        return client.parse(files, timeout=timeout)
    except LexaTimeoutError as e:
        print(f"Operation timed out after {e.timeout_duration} seconds")
        print(f"Retry suggested: {e.retry_suggested}")  # True
        print("Try with longer timeout or split the files")
        raise
Status Code: 408

LexaQuotaExceededError

Raised when usage quotas are exceeded.
from cerevox import Lexa, LexaQuotaExceededError

try:
    documents = client.parse(["document.pdf"])
except LexaQuotaExceededError as e:
    print(f"Quota exceeded: {e.message}")
    print(f"Quota type: {e.quota_type}")
    print(f"Reset time: {e.reset_time}")
    print(f"Retry suggested: {e.retry_suggested}")  # True if reset_time exists
    
    if e.reset_time:
        print(f"Quota resets at: {e.reset_time}")
    else:
        print("Contact support to increase quota")
Status Code: 402

LexaServerError

Raised for server-side errors (5xx status codes).
from cerevox import Lexa, LexaServerError
import time

def robust_parse(client, files, max_retries=5):
    """Parse with server error recovery"""
    for attempt in range(max_retries):
        try:
            return client.parse(files)
        except LexaServerError as e:
            if attempt == max_retries - 1:
                raise e
            
            # Exponential backoff for server errors
            wait_time = 2 ** attempt
            print(f"Server error on attempt {attempt + 1}: {e.message}")
            print(f"Retrying in {wait_time} seconds...")
            time.sleep(wait_time)
    
    raise LexaServerError("Max server error retries exceeded")
Status Codes: 500, 502, 503, 504

Intelligent Error Classification

The SDK provides intelligent error classification through the create_error_from_response function.
from cerevox.exceptions import create_error_from_response

# Automatically classifies errors based on:
# - HTTP status code
# - Response error_type field
# - Error message content
# - Additional response metadata

def handle_api_response(status_code, response_data, request_id=None):
    """Example of automatic error classification"""
    if status_code >= 400:
        error = create_error_from_response(status_code, response_data, request_id)
        
        # Error is automatically classified to the correct type
        if isinstance(error, LexaRateLimitError):
            print(f"Rate limited. Retry after {error.get_retry_delay()}s")
        elif isinstance(error, LexaAuthError):
            print("Authentication failed. Check API key.")
        elif isinstance(error, LexaJobFailedError):
            print(f"Job {error.job_id} failed: {error.failure_reason}")
        
        raise error

Retry Strategies

The SDK provides intelligent retry guidance through the get_retry_strategy function.
from cerevox.exceptions import get_retry_strategy
import time

def smart_retry_parse(client, files):
    """Parse with intelligent retry strategy"""
    max_attempts = 3
    
    for attempt in range(max_attempts):
        try:
            return client.parse(files)
        except LexaError as e:
            strategy = get_retry_strategy(e)
            
            if not strategy["should_retry"] or attempt == max_attempts - 1:
                print(f"Not retrying: {strategy['reason']}")
                raise e
            
            delay = strategy["delay"]
            backoff = strategy["backoff"]
            
            # Apply backoff strategy
            if backoff == "exponential":
                delay = delay * (2 ** attempt)
            elif backoff == "linear":
                delay = delay * (attempt + 1)
            # "fixed" backoff uses delay as-is
            
            print(f"Retrying in {delay}s ({strategy['reason']})")
            time.sleep(delay)
    
    raise RuntimeError("Max retries exceeded")

Best Practices

Comprehensive Error Handling

from cerevox import Lexa, LexaError, LexaAuthError, LexaRateLimitError
from cerevox.exceptions import get_retry_strategy
import logging
import time

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class RobustLexaClient:
    def __init__(self, api_key, max_retries=3):
        self.client = Lexa(api_key=api_key)
        self.max_retries = max_retries
    
    def parse_documents(self, files, mode=None):
        """Parse documents with comprehensive error handling"""
        for attempt in range(self.max_retries):
            try:
                return self.client.parse(files, mode=mode)
            
            except LexaError as e:
                strategy = get_retry_strategy(e)
                
                # Log the error
                logger.error(
                    f"Parsing failed (attempt {attempt + 1}): {e.message}",
                    extra={
                        'error_type': type(e).__name__,
                        'status_code': e.status_code,
                        'request_id': e.request_id,
                        'retry_suggested': e.retry_suggested
                    }
                )
                
                # Don't retry certain error types
                if isinstance(e, (LexaAuthError, LexaValidationError, LexaUnsupportedFileError)):
                    logger.error(f"Non-retryable error: {e.message}")
                    raise e
                
                # Check if we should retry
                if not strategy["should_retry"] or attempt == self.max_retries - 1:
                    logger.error(f"Not retrying: {strategy['reason']}")
                    raise e
                
                # Calculate delay with backoff
                delay = strategy["delay"]
                if strategy["backoff"] == "exponential":
                    delay = delay * (2 ** attempt)
                elif strategy["backoff"] == "linear":
                    delay = delay * (attempt + 1)
                
                logger.warning(f"Retrying in {delay}s ({strategy['reason']})")
                time.sleep(delay)
        
        raise RuntimeError("Max retries exceeded")

# Usage
client = RobustLexaClient(api_key="your-api-key")
try:
    documents = client.parse_documents(["document.pdf"])
    print(f"Successfully parsed {len(documents)} documents")
except LexaError as e:
    print(f"Parsing failed: {e.message}")

Error Monitoring and Logging

import logging
from cerevox import Lexa, LexaError

# Custom error handler
class LexaErrorHandler(logging.Handler):
    def emit(self, record):
        if record.name == 'cerevox' and record.levelno >= logging.ERROR:
            # Send to monitoring service
            self.send_to_monitoring(record)
    
    def send_to_monitoring(self, record):
        # Implementation depends on your monitoring solution
        pass

# Configure logging
logger = logging.getLogger('cerevox')
logger.addHandler(LexaErrorHandler())

def monitored_parse(client, files):
    """Parse with error monitoring"""
    try:
        return client.parse(files)
    except LexaError as e:
        logger.error(
            f"Parsing failed",
            extra={
                'error_type': type(e).__name__,
                'status_code': e.status_code,
                'request_id': e.request_id,
                'files': len(files) if isinstance(files, list) else 1,
                'message': e.message,
                'retry_suggested': e.retry_suggested
            }
        )
        raise

Graceful Degradation

from cerevox import Lexa, ProcessingMode, LexaError, LexaJobFailedError

def parse_with_fallback(client, files):
    """Parse with fallback to simpler processing modes"""
    
    # Try processing modes in order of preference
    modes = [ProcessingMode.ADVANCED, ProcessingMode.DEFAULT]
    
    for mode in modes:
        try:
            return client.parse(files, mode=mode)
        except LexaJobFailedError as e:
            if not e.retry_suggested and mode != ProcessingMode.DEFAULT:
                print(f"Failed with {mode.value} mode, trying simpler mode...")
                continue
            raise e
        except LexaError as e:
            if mode == ProcessingMode.DEFAULT:
                raise e  # All modes failed
            continue
    
    raise RuntimeError("All processing modes failed")

def parse_with_chunking(client, files, chunk_size=10):
    """Parse large batches in smaller chunks"""
    all_documents = []
    
    for i in range(0, len(files), chunk_size):
        chunk = files[i:i + chunk_size]
        try:
            documents = client.parse(chunk)
            all_documents.extend(documents)
        except LexaError as e:
            print(f"Chunk {i//chunk_size + 1} failed: {e.message}")
            # Continue with remaining chunks if error allows
            if not e.retry_suggested:
                continue
            raise e
    
    return all_documents

Error Recovery Patterns

Circuit Breaker Pattern

import time
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    def __init__(self, failure_threshold=5, recovery_timeout=60):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
    
    def call(self, func, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
            else:
                raise RuntimeError("Circuit breaker is OPEN")
        
        try:
            result = func(*args, **kwargs)
            self.on_success()
            return result
        except Exception as e:
            self.on_failure()
            raise e
    
    def on_success(self):
        self.failure_count = 0
        self.state = CircuitState.CLOSED
    
    def on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

# Usage
circuit_breaker = CircuitBreaker()
client = Lexa(api_key="your-api-key")

try:
    documents = circuit_breaker.call(client.parse, ["document.pdf"])
except RuntimeError as e:
    print(f"Circuit breaker active: {e}")

Next Steps

Learn about client configuration for optimal performance and reliability.