Client Initialization
Synchronous Client
Basic Setup
Environment Variable
Custom Configuration
from cerevox import Lexa
# Initialize with API key
client = Lexa( api_key = "your-api-key" )
# Parse documents
documents = client.parse([ "document.pdf" ])
Asynchronous Client
Context Manager (Recommended)
Manual Management
import asyncio
from cerevox import AsyncLexa
async def main ():
async with AsyncLexa( api_key = "your-api-key" ) as client:
documents = await client.parse([ "document.pdf" ])
return documents
asyncio.run(main())
Client Configuration
Parameters
base_url
string
default: "https://data.cerevox.ai"
Base URL for the Cerevox API endpoint
Request timeout in seconds for API calls
Maximum number of retry attempts for failed requests
Delay in seconds between retry attempts
Environment Variables
Environment Setup
Python Usage
# Required
export CEREVOX_API_KEY = "your-api-key"
# Optional overrides
export CEREVOX_BASE_URL = "https://data.cerevox.ai"
export CEREVOX_TIMEOUT = "120"
export CEREVOX_MAX_RETRIES = "5"
Client Methods
Core Parsing Methods
parse() - Parse Local Files
Parse local files or file-like objects. documents = client.parse(
files = [ "document.pdf" , "report.docx" ],
mode = ProcessingMode. DEFAULT ,
progress_callback = None ,
timeout = 60.0 ,
poll_interval = 2.0
)
parse_urls() - Parse Remote Files
Parse files from URLs. documents = client.parse_urls(
urls = [ "https://example.com/document.pdf" ],
mode = ProcessingMode. DEFAULT ,
progress_callback = None ,
timeout = 120.0 ,
poll_interval = 2.0
)
get_job_status() - Check Job Status
Get the current status of a parsing job. status = client.get_job_status( job_id = "job_123" )
print ( f "Status: { status.status } " )
Cloud Storage Methods
# List S3 buckets
buckets = client.list_s3_buckets()
# List S3 folder contents
contents = client.list_s3_folder( "bucket-name" , "folder-path/" )
# Parse S3 folder
documents = client.parse_s3_folder(
bucket = "bucket-name" ,
folder_path = "documents/" ,
mode = ProcessingMode. DEFAULT
)
Microsoft SharePoint Integration
# List SharePoint sites
sites = client.list_sharepoint_sites()
# List drives in a site
drives = client.list_sharepoint_drives( "site-id" )
# Parse SharePoint folder
documents = client.parse_sharepoint_folder(
site_id = "site-id" ,
drive_id = "drive-id" ,
folder_path = "Documents/" ,
mode = ProcessingMode. DEFAULT
)
# List Box folders
folders = client.list_box_folders( parent_folder_id = "0" )
# Parse Box folder
documents = client.parse_box_folder(
folder_id = "123456789" ,
mode = ProcessingMode. DEFAULT
)
Processing Modes
Choose the right processing mode for your use case:
DEFAULT Fast and efficient
Optimized for speed
Good accuracy
Lower resource usage
Recommended for most use cases
ADVANCED Maximum accuracy
Highest accuracy
Enhanced table extraction
More thorough analysis
Best for complex documents
from cerevox import Lexa, ProcessingMode
client = Lexa( api_key = "your-api-key" )
# Default mode (recommended) - fast and efficient
documents = client.parse([ "document.pdf" ], mode = ProcessingMode. DEFAULT )
# Advanced mode for maximum accuracy
documents = client.parse([ "document.pdf" ], mode = ProcessingMode. ADVANCED )
Error Handling
The Lexa client provides comprehensive error handling:
Basic Error Handling
Advanced Error Handling
from cerevox import Lexa, LexaError
client = Lexa( api_key = "your-api-key" )
try :
documents = client.parse([ "document.pdf" ])
print ( f "Successfully parsed { len (documents) } documents" )
except LexaError as e:
print ( f "Lexa API error: { e.message } " )
print ( f "Error code: { e.error_code } " )
except Exception as e:
print ( f "Unexpected error: { e } " )
Best Practices
Use progress callbacks for long-running operations: def progress_callback ( status ):
print ( f "Status: { status.status } " )
if hasattr (status, 'progress' ) and status.progress:
print ( f "Progress: { status.progress } " )
documents = client.parse(
[ "large-document.pdf" ],
progress_callback = progress_callback,
timeout = 300.0 # 5 minutes for large files
)
Properly manage client resources: # ✅ Good: Use context manager for async
async with AsyncLexa( api_key = "key" ) as client:
documents = await client.parse([ "file.pdf" ])
# ✅ Good: Reuse sync client
client = Lexa( api_key = "key" )
for file_batch in file_batches:
documents = client.parse(file_batch)
process_documents(documents)
# ❌ Avoid: Creating new clients repeatedly
for file in files:
client = Lexa( api_key = "key" ) # Wasteful
documents = client.parse([ file ])