Exception Hierarchy
Copy
from cerevox import (
LexaError, # Base exception
LexaAuthError, # Authentication/authorization issues
LexaRateLimitError, # Rate limiting
LexaTimeoutError, # Request timeouts
LexaJobFailedError, # Document processing failures
LexaUnsupportedFileError, # Unsupported file types
LexaValidationError, # Request validation errors
LexaQuotaExceededError, # Usage quota exceeded
LexaServerError # Server-side errors
)
Error Types
LexaError (Base Exception)
All Lexa-specific exceptions inherit fromLexaError
.
Copy
from cerevox import Lexa, LexaError
client = Lexa(api_key="your-api-key")
try:
documents = client.parse(["document.pdf"])
except LexaError as e:
print(f"Lexa error: {e.message}")
print(f"Status code: {e.status_code}")
print(f"Request ID: {e.request_id}")
print(f"Retry suggested: {e.retry_suggested}")
LexaAuthError
Raised when API key authentication or authorization fails.Copy
# Invalid API key
client = Lexa(api_key="invalid-key")
# Missing API key
client = Lexa() # No CEREVOX_API_KEY environment variable
# Expired or revoked API key
# Access forbidden due to permissions
401
- Invalid or expired API key403
- Access forbidden (insufficient permissions)
LexaRateLimitError
Raised when API rate limits are exceeded.Copy
from cerevox import Lexa, LexaRateLimitError
import time
def parse_with_retry(client, files, max_retries=3):
for attempt in range(max_retries):
try:
return client.parse(files)
except LexaRateLimitError as e:
if attempt == max_retries - 1:
raise e
# Use recommended retry delay
wait_time = e.get_retry_delay()
print(f"Rate limited. Waiting {wait_time} seconds...")
time.sleep(wait_time)
raise LexaRateLimitError("Max retries exceeded")
429
LexaValidationError
Raised when request parameters are invalid.Copy
from cerevox import Lexa, LexaValidationError
client = Lexa(api_key="your-api-key")
try:
# Invalid file path or parameters
documents = client.parse(["/nonexistent/file.pdf"])
except LexaValidationError as e:
print(f"Validation error: {e.message}")
print(f"Validation errors: {e.validation_errors}")
print(f"Retry suggested: {e.retry_suggested}") # Always False
400
LexaJobFailedError
Raised when document processing fails on the server side.Copy
from cerevox import Lexa, LexaJobFailedError
try:
documents = client.parse(["corrupted-document.pdf"])
except LexaJobFailedError as e:
print(f"Processing failed: {e.message}")
print(f"Job ID: {e.job_id}")
print(f"Failure reason: {e.failure_reason}")
print(f"Retry suggested: {e.retry_suggested}")
invalid_file_format
- Unsupported file format (not retryable)file_corrupted
- File is corrupted (not retryable)file_too_large
- File exceeds size limits (not retryable)unsupported_format
- Format not supported (not retryable)temporary_server_error
- Temporary issue (retryable)
LexaUnsupportedFileError
Raised when attempting to process unsupported file formats.Copy
from cerevox import Lexa, LexaUnsupportedFileError
try:
documents = client.parse(["document.xyz"])
except LexaUnsupportedFileError as e:
print(f"Unsupported file type: {e.message}")
print(f"File type: {e.file_type}")
print(f"Supported types: {e.supported_types}")
print(f"Retry suggested: {e.retry_suggested}") # Always False
415
LexaTimeoutError
Raised when operations exceed timeout limits.Copy
from cerevox import Lexa, LexaTimeoutError
import time
def parse_with_custom_timeout(client, files, base_timeout=60):
"""Parse with dynamic timeout based on file size"""
# Calculate timeout based on file sizes
total_size = sum(os.path.getsize(f) for f in files if isinstance(f, str))
timeout = base_timeout + (total_size // (1024 * 1024)) # +1s per MB
try:
return client.parse(files, timeout=timeout)
except LexaTimeoutError as e:
print(f"Operation timed out after {e.timeout_duration} seconds")
print(f"Retry suggested: {e.retry_suggested}") # True
print("Try with longer timeout or split the files")
raise
408
LexaQuotaExceededError
Raised when usage quotas are exceeded.Copy
from cerevox import Lexa, LexaQuotaExceededError
try:
documents = client.parse(["document.pdf"])
except LexaQuotaExceededError as e:
print(f"Quota exceeded: {e.message}")
print(f"Quota type: {e.quota_type}")
print(f"Reset time: {e.reset_time}")
print(f"Retry suggested: {e.retry_suggested}") # True if reset_time exists
if e.reset_time:
print(f"Quota resets at: {e.reset_time}")
else:
print("Contact support to increase quota")
402
LexaServerError
Raised for server-side errors (5xx status codes).Copy
from cerevox import Lexa, LexaServerError
import time
def robust_parse(client, files, max_retries=5):
"""Parse with server error recovery"""
for attempt in range(max_retries):
try:
return client.parse(files)
except LexaServerError as e:
if attempt == max_retries - 1:
raise e
# Exponential backoff for server errors
wait_time = 2 ** attempt
print(f"Server error on attempt {attempt + 1}: {e.message}")
print(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
raise LexaServerError("Max server error retries exceeded")
500
, 502
, 503
, 504
Intelligent Error Classification
The SDK provides intelligent error classification through thecreate_error_from_response
function.
Copy
from cerevox.exceptions import create_error_from_response
# Automatically classifies errors based on:
# - HTTP status code
# - Response error_type field
# - Error message content
# - Additional response metadata
def handle_api_response(status_code, response_data, request_id=None):
"""Example of automatic error classification"""
if status_code >= 400:
error = create_error_from_response(status_code, response_data, request_id)
# Error is automatically classified to the correct type
if isinstance(error, LexaRateLimitError):
print(f"Rate limited. Retry after {error.get_retry_delay()}s")
elif isinstance(error, LexaAuthError):
print("Authentication failed. Check API key.")
elif isinstance(error, LexaJobFailedError):
print(f"Job {error.job_id} failed: {error.failure_reason}")
raise error
Retry Strategies
The SDK provides intelligent retry guidance through theget_retry_strategy
function.
Copy
from cerevox.exceptions import get_retry_strategy
import time
def smart_retry_parse(client, files):
"""Parse with intelligent retry strategy"""
max_attempts = 3
for attempt in range(max_attempts):
try:
return client.parse(files)
except LexaError as e:
strategy = get_retry_strategy(e)
if not strategy["should_retry"] or attempt == max_attempts - 1:
print(f"Not retrying: {strategy['reason']}")
raise e
delay = strategy["delay"]
backoff = strategy["backoff"]
# Apply backoff strategy
if backoff == "exponential":
delay = delay * (2 ** attempt)
elif backoff == "linear":
delay = delay * (attempt + 1)
# "fixed" backoff uses delay as-is
print(f"Retrying in {delay}s ({strategy['reason']})")
time.sleep(delay)
raise RuntimeError("Max retries exceeded")
Best Practices
Comprehensive Error Handling
Copy
from cerevox import Lexa, LexaError, LexaAuthError, LexaRateLimitError
from cerevox.exceptions import get_retry_strategy
import logging
import time
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RobustLexaClient:
def __init__(self, api_key, max_retries=3):
self.client = Lexa(api_key=api_key)
self.max_retries = max_retries
def parse_documents(self, files, mode=None):
"""Parse documents with comprehensive error handling"""
for attempt in range(self.max_retries):
try:
return self.client.parse(files, mode=mode)
except LexaError as e:
strategy = get_retry_strategy(e)
# Log the error
logger.error(
f"Parsing failed (attempt {attempt + 1}): {e.message}",
extra={
'error_type': type(e).__name__,
'status_code': e.status_code,
'request_id': e.request_id,
'retry_suggested': e.retry_suggested
}
)
# Don't retry certain error types
if isinstance(e, (LexaAuthError, LexaValidationError, LexaUnsupportedFileError)):
logger.error(f"Non-retryable error: {e.message}")
raise e
# Check if we should retry
if not strategy["should_retry"] or attempt == self.max_retries - 1:
logger.error(f"Not retrying: {strategy['reason']}")
raise e
# Calculate delay with backoff
delay = strategy["delay"]
if strategy["backoff"] == "exponential":
delay = delay * (2 ** attempt)
elif strategy["backoff"] == "linear":
delay = delay * (attempt + 1)
logger.warning(f"Retrying in {delay}s ({strategy['reason']})")
time.sleep(delay)
raise RuntimeError("Max retries exceeded")
# Usage
client = RobustLexaClient(api_key="your-api-key")
try:
documents = client.parse_documents(["document.pdf"])
print(f"Successfully parsed {len(documents)} documents")
except LexaError as e:
print(f"Parsing failed: {e.message}")
Error Monitoring and Logging
Copy
import logging
from cerevox import Lexa, LexaError
# Custom error handler
class LexaErrorHandler(logging.Handler):
def emit(self, record):
if record.name == 'cerevox' and record.levelno >= logging.ERROR:
# Send to monitoring service
self.send_to_monitoring(record)
def send_to_monitoring(self, record):
# Implementation depends on your monitoring solution
pass
# Configure logging
logger = logging.getLogger('cerevox')
logger.addHandler(LexaErrorHandler())
def monitored_parse(client, files):
"""Parse with error monitoring"""
try:
return client.parse(files)
except LexaError as e:
logger.error(
f"Parsing failed",
extra={
'error_type': type(e).__name__,
'status_code': e.status_code,
'request_id': e.request_id,
'files': len(files) if isinstance(files, list) else 1,
'message': e.message,
'retry_suggested': e.retry_suggested
}
)
raise
Graceful Degradation
Copy
from cerevox import Lexa, ProcessingMode, LexaError, LexaJobFailedError
def parse_with_fallback(client, files):
"""Parse with fallback to simpler processing modes"""
# Try processing modes in order of preference
modes = [ProcessingMode.ADVANCED, ProcessingMode.DEFAULT]
for mode in modes:
try:
return client.parse(files, mode=mode)
except LexaJobFailedError as e:
if not e.retry_suggested and mode != ProcessingMode.DEFAULT:
print(f"Failed with {mode.value} mode, trying simpler mode...")
continue
raise e
except LexaError as e:
if mode == ProcessingMode.DEFAULT:
raise e # All modes failed
continue
raise RuntimeError("All processing modes failed")
def parse_with_chunking(client, files, chunk_size=10):
"""Parse large batches in smaller chunks"""
all_documents = []
for i in range(0, len(files), chunk_size):
chunk = files[i:i + chunk_size]
try:
documents = client.parse(chunk)
all_documents.extend(documents)
except LexaError as e:
print(f"Chunk {i//chunk_size + 1} failed: {e.message}")
# Continue with remaining chunks if error allows
if not e.retry_suggested:
continue
raise e
return all_documents
Error Recovery Patterns
Circuit Breaker Pattern
Copy
import time
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
else:
raise RuntimeError("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self.on_success()
return result
except Exception as e:
self.on_failure()
raise e
def on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
def on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
# Usage
circuit_breaker = CircuitBreaker()
client = Lexa(api_key="your-api-key")
try:
documents = circuit_breaker.call(client.parse, ["document.pdf"])
except RuntimeError as e:
print(f"Circuit breaker active: {e}")
Next Steps
Learn about client configuration for optimal performance and reliability.