Client
Basic Configuration
Copy
from cerevox import Lexa
# Minimal configuration
client = Lexa(api_key="your-api-key")
# Full configuration
client = Lexa(
api_key="your-api-key",
base_url="https://data.cerevox.ai",
timeout=120.0,
max_retries=5,
retry_delay=2.0,
poll_interval=3.0,
max_concurrent=10
)
Configuration Parameters
Your Cerevox API key. Get one at cerevox.ai/lexa
Base URL for the Cerevox API. Change for custom endpoints or testing
Default timeout for API requests in seconds. Individual operations can override this
Maximum number of automatic retry attempts for failed requests
Base delay between retry attempts. Uses exponential backoff
Default interval between job status checks during parsing
Maximum number of concurrent requests (async client only)
Environment Variables
Standard Environment Variables
Copy
# Required
export CEREVOX_API_KEY="your-api-key"
# Optional Configuration
export CEREVOX_BASE_URL="https://data.cerevox.ai"
export CEREVOX_TIMEOUT="120"
export CEREVOX_MAX_RETRIES="5"
export CEREVOX_RETRY_DELAY="2.0"
export CEREVOX_POLL_INTERVAL="3.0"
export CEREVOX_MAX_CONCURRENT="20"
# Logging Configuration
export CEREVOX_LOG_LEVEL="INFO"
export CEREVOX_LOG_FORMAT="json"
Development vs Production
Copy
# Development settings - more verbose, shorter timeouts
export CEREVOX_API_KEY="dev-api-key"
export CEREVOX_BASE_URL="https://dev.cerevox.ai"
export CEREVOX_TIMEOUT="30"
export CEREVOX_MAX_RETRIES="2"
export CEREVOX_LOG_LEVEL="DEBUG"
Performance Tuning
Timeout Configuration
Copy
import os
from cerevox import Lexa
def get_optimal_timeout(files):
"""Calculate optimal timeout based on file characteristics"""
if isinstance(files, str):
files = [files]
total_size = 0
for file in files:
if isinstance(file, str) and os.path.exists(file):
total_size += os.path.getsize(file)
# Base timeout + 1 second per MB
base_timeout = 60
size_timeout = total_size / (1024 * 1024) # MB
return base_timeout + size_timeout
# Usage
client = Lexa(api_key="your-api-key")
files = ["large-document.pdf", "report.docx"]
optimal_timeout = get_optimal_timeout(files)
documents = client.parse(
files,
timeout=optimal_timeout,
poll_interval=min(5.0, optimal_timeout / 20)
)
Concurrency Configuration
Copy
import asyncio
from cerevox import AsyncLexa
async def process_large_batch(files, batch_size=10, max_concurrent=5):
"""Process large batches with controlled concurrency"""
# Configure client with concurrency limits
async with AsyncLexa(
api_key="your-api-key",
max_concurrent=max_concurrent,
timeout=300.0
) as client:
# Create semaphore to limit concurrent batches
semaphore = asyncio.Semaphore(max_concurrent)
async def process_batch(batch):
async with semaphore:
return await client.parse(batch)
# Create batches
batches = [files[i:i + batch_size] for i in range(0, len(files), batch_size)]
# Process batches concurrently
tasks = [process_batch(batch) for batch in batches]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle results
all_documents = []
for result in results:
if isinstance(result, Exception):
print(f"Batch failed: {result}")
else:
all_documents.extend(result)
return all_documents
Security
API Key Management
Copy
import os
from pathlib import Path
from cerevox import Lexa
class SecureLexaClient:
def __init__(self):
self.api_key = self._get_secure_api_key()
self.client = Lexa(api_key=self.api_key)
def _get_secure_api_key(self):
"""Get API key from secure sources"""
# 1. Environment variable (recommended)
api_key = os.getenv("CEREVOX_API_KEY")
if api_key:
return api_key
# 2. Secure file (if environment variable not available)
key_file = Path.home() / ".cerevox" / "api_key"
if key_file.exists():
return key_file.read_text().strip()
# 3. AWS Secrets Manager, Azure Key Vault, etc.
# api_key = self._get_from_secrets_manager()
raise ValueError("No API key found. Set CEREVOX_API_KEY environment variable.")
def parse(self, *args, **kwargs):
return self.client.parse(*args, **kwargs)
# Usage
client = SecureLexaClient()
documents = client.parse(["document.pdf"])
Network Security
Copy
import ssl
from cerevox import Lexa
# Custom SSL context for enterprise environments
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = True
ssl_context.verify_mode = ssl.CERT_REQUIRED
# For self-signed certificates (development only)
# ssl_context.check_hostname = False
# ssl_context.verify_mode = ssl.CERT_NONE
client = Lexa(
api_key="your-api-key",
ssl_context=ssl_context
)
Logging
Basic Logging Setup
Copy
import logging
from cerevox import Lexa
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Enable Cerevox SDK logging
cerevox_logger = logging.getLogger('cerevox')
cerevox_logger.setLevel(logging.DEBUG)
client = Lexa(api_key="your-api-key")
documents = client.parse(["document.pdf"])
Production Logging
Copy
import logging
import logging.handlers
from pathlib import Path
from cerevox import Lexa
def setup_production_logging():
"""Setup production-grade logging"""
# Create logs directory
log_dir = Path("logs")
log_dir.mkdir(exist_ok=True)
# Configure root logger
logging.basicConfig(
level=logging.WARNING,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Cerevox-specific logger with file rotation
cerevox_logger = logging.getLogger('cerevox')
cerevox_logger.setLevel(logging.INFO)
# Rotating file handler
file_handler = logging.handlers.RotatingFileHandler(
log_dir / "cerevox.log",
maxBytes=10*1024*1024, # 10MB
backupCount=5
)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
cerevox_logger.addHandler(file_handler)
# Error-specific handler
error_handler = logging.handlers.RotatingFileHandler(
log_dir / "cerevox_errors.log",
maxBytes=5*1024*1024, # 5MB
backupCount=10
)
error_handler.setLevel(logging.ERROR)
error_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s\n%(exc_info)s'
))
cerevox_logger.addHandler(error_handler)
return cerevox_logger
# Setup logging
logger = setup_production_logging()
client = Lexa(api_key="your-api-key")
Framework Integration
Django
Copy
# settings.py
import os
# Cerevox configuration
CEREVOX_API_KEY = os.getenv('CEREVOX_API_KEY')
CEREVOX_TIMEOUT = 300.0
CEREVOX_MAX_RETRIES = 5
# Django logging configuration
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'verbose': {
'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}',
'style': '{',
},
},
'handlers': {
'cerevox_file': {
'level': 'INFO',
'class': 'logging.FileHandler',
'filename': 'logs/cerevox.log',
'formatter': 'verbose',
},
},
'loggers': {
'cerevox': {
'handlers': ['cerevox_file'],
'level': 'INFO',
'propagate': True,
},
},
}
FastAPI
Copy
# config.py
from pydantic import BaseSettings
class Settings(BaseSettings):
cerevox_api_key: str
cerevox_timeout: float = 120.0
cerevox_max_retries: int = 3
cerevox_base_url: str = "https://data.cerevox.ai"
class Config:
env_file = ".env"
settings = Settings()
Validation
Copy
from cerevox import Lexa, LexaError
import os
from typing import Optional
class ValidatedLexaConfig:
def __init__(
self,
api_key: Optional[str] = None,
base_url: Optional[str] = None,
timeout: float = 60.0,
max_retries: int = 3
):
self.api_key = self._validate_api_key(api_key)
self.base_url = self._validate_base_url(base_url)
self.timeout = self._validate_timeout(timeout)
self.max_retries = self._validate_max_retries(max_retries)
def _validate_api_key(self, api_key: Optional[str]) -> str:
if not api_key:
api_key = os.getenv('CEREVOX_API_KEY')
if not api_key:
raise ValueError("API key is required")
if not api_key.startswith('cx_'):
raise ValueError("Invalid API key format")
return api_key
def _validate_base_url(self, base_url: Optional[str]) -> str:
if not base_url:
base_url = os.getenv('CEREVOX_BASE_URL', 'https://data.cerevox.ai')
if not base_url.startswith(('http://', 'https://')):
raise ValueError("Base URL must start with http:// or https://")
return base_url
def _validate_timeout(self, timeout: float) -> float:
if timeout <= 0:
raise ValueError("Timeout must be positive")
if timeout > 3600: # 1 hour max
raise ValueError("Timeout too large (max 3600 seconds)")
return timeout
def _validate_max_retries(self, max_retries: int) -> int:
if max_retries < 0:
raise ValueError("Max retries cannot be negative")
if max_retries > 10:
raise ValueError("Max retries too large (max 10)")
return max_retries
def create_client(self) -> Lexa:
"""Create validated Lexa client"""
return Lexa(
api_key=self.api_key,
base_url=self.base_url,
timeout=self.timeout,
max_retries=self.max_retries
)
# Usage
try:
config = ValidatedLexaConfig(
timeout=120.0,
max_retries=5
)
client = config.create_client()
print("Client configured successfully")
except ValueError as e:
print(f"Configuration error: {e}")
Next Steps
Explore real-world examples to see these configurations in action.