Knowledge Management with Lexa
Transform your organization’s documents into intelligent, searchable knowledge bases. Lexa’s AI-powered parsing creates the foundation for next-generation knowledge management and RAG applications.Why Lexa for Knowledge Management?
Intelligent Chunking
Vector-optimized chunks preserve context and meaning
Unified Processing
Handle 12+ file formats in a single workflow
Enterprise Scale
Process thousands of documents with async operations
Context Preservation
Maintain document structure and relationships
Knowledge Base Applications
- Internal Documentation (policies, procedures, handbooks)
- Training Materials (onboarding docs, certification guides)
- Technical Documentation (API docs, system manuals)
- Research Archives (reports, whitepapers, studies)
- Customer Support (FAQs, troubleshooting guides)
- Compliance Documentation (regulations, audit materials)
- Product Documentation (user guides, specifications)
Quick Start: Build a Knowledge Base
Transform your document library into an intelligent knowledge system:from cerevox import Lexa
# Initialize client
client = Lexa(api_key="your-api-key")
# Process knowledge base documents
documents = client.parse([
"employee_handbook.pdf",
"company_policies.docx",
"technical_procedures.pdf"
])
# Create searchable knowledge chunks
knowledge_chunks = []
for doc in documents:
chunks = doc.get_text_chunks(target_size=512)
for chunk in chunks:
knowledge_chunks.append({
'content': chunk,
'source': doc.filename,
'document_type': classify_document(doc),
'chunk_id': f"{doc.filename}_{chunks.index(chunk)}"
})
print(f"Knowledge base: {len(knowledge_chunks)} searchable chunks")
def classify_document(doc):
"""Classify document by content"""
filename = doc.filename.lower()
if 'policy' in filename:
return 'policy'
elif 'handbook' in filename:
return 'handbook'
elif 'procedure' in filename:
return 'procedure'
else:
return 'general'
Advanced Knowledge Management
Multi-Source Knowledge Integration
Combine documents from various sources into a unified knowledge base:from cerevox import AsyncLexa
from typing import Dict, List
import asyncio
class EnterpriseKnowledgeManager:
def __init__(self, api_key: str):
self.client = AsyncLexa(api_key=api_key)
self.knowledge_domains = {}
async def ingest_by_domain(self, domain_documents: Dict[str, List[str]]):
"""Ingest documents organized by knowledge domain"""
async with self.client:
for domain, document_paths in domain_documents.items():
print(f"Processing {domain} domain...")
documents = await self.client.parse(document_paths)
# Create domain-specific chunks
domain_chunks = documents.get_all_text_chunks(
target_size=600,
tolerance=0.2
)
# Enhance with domain metadata
processed_chunks = []
for i, chunk in enumerate(domain_chunks):
doc_idx = i // len(domain_chunks) * len(documents)
source_doc = documents[doc_idx]
processed_chunks.append({
'content': chunk,
'domain': domain,
'source': source_doc.filename,
'confidence': self.calculate_relevance(chunk, domain),
'keywords': self.extract_keywords(chunk),
'section_type': self.identify_section_type(chunk)
})
self.knowledge_domains[domain] = processed_chunks
print(f" Added {len(processed_chunks)} chunks to {domain}")
total_chunks = sum(len(chunks) for chunks in self.knowledge_domains.values())
return f"Enterprise knowledge base: {total_chunks} chunks across {len(self.knowledge_domains)} domains"
def search_domain(self, domain: str, query: str, limit: int = 5):
"""Search within a specific knowledge domain"""
if domain not in self.knowledge_domains:
return []
domain_chunks = self.knowledge_domains[domain]
query_terms = query.lower().split()
results = []
for chunk in domain_chunks:
# Score based on term frequency and confidence
term_score = sum(1 for term in query_terms if term in chunk['content'].lower())
total_score = term_score * chunk['confidence']
if total_score > 0:
results.append({
'content': chunk['content'],
'source': chunk['source'],
'domain': chunk['domain'],
'score': total_score,
'section_type': chunk['section_type']
})
# Sort by relevance
results.sort(key=lambda x: x['score'], reverse=True)
return results[:limit]
def cross_domain_search(self, query: str, limit: int = 10):
"""Search across all knowledge domains"""
all_results = []
for domain in self.knowledge_domains:
domain_results = self.search_domain(domain, query, limit)
all_results.extend(domain_results)
# Sort all results by score
all_results.sort(key=lambda x: x['score'], reverse=True)
return all_results[:limit]
def calculate_relevance(self, chunk: str, domain: str) -> float:
"""Calculate content relevance to domain"""
# Simplified relevance scoring
domain_keywords = {
'hr': ['employee', 'benefit', 'policy', 'leave', 'compensation'],
'it': ['system', 'security', 'access', 'software', 'network'],
'finance': ['budget', 'expense', 'accounting', 'revenue', 'cost'],
'legal': ['contract', 'compliance', 'regulation', 'liability'],
'operations': ['process', 'procedure', 'workflow', 'standard']
}
if domain not in domain_keywords:
return 0.5 # Neutral relevance
keywords = domain_keywords[domain]
chunk_lower = chunk.lower()
matches = sum(1 for keyword in keywords if keyword in chunk_lower)
return min(1.0, matches / len(keywords) + 0.3)
def extract_keywords(self, text: str) -> List[str]:
"""Extract key terms from text"""
import re
from collections import Counter
# Simple keyword extraction
words = re.findall(r'\b[a-zA-Z]{3,}\b', text.lower())
# Filter common words
stop_words = {'the', 'and', 'for', 'are', 'but', 'not', 'you', 'all', 'can', 'had', 'her', 'was', 'one', 'our', 'out', 'day', 'get', 'has', 'him', 'his', 'how', 'man', 'new', 'now', 'old', 'see', 'two', 'way', 'who', 'boy', 'did', 'its', 'let', 'put', 'say', 'she', 'too', 'use'}
filtered_words = [word for word in words if word not in stop_words and len(word) > 3]
# Return top keywords
word_freq = Counter(filtered_words)
return [word for word, _ in word_freq.most_common(5)]
def identify_section_type(self, text: str) -> str:
"""Identify the type of content section"""
text_lower = text.lower()
if any(term in text_lower for term in ['procedure', 'step', 'process']):
return 'procedure'
elif any(term in text_lower for term in ['policy', 'rule', 'guideline']):
return 'policy'
elif any(term in text_lower for term in ['example', 'case study', 'scenario']):
return 'example'
elif any(term in text_lower for term in ['requirement', 'must', 'shall']):
return 'requirement'
else:
return 'information'
# Usage
kb_manager = EnterpriseKnowledgeManager("your-api-key")
# Organize documents by domain
domain_docs = {
'hr': ['employee_handbook.pdf', 'benefits_guide.pdf'],
'it': ['security_policy.pdf', 'system_procedures.docx'],
'finance': ['expense_policy.pdf', 'budget_guidelines.pdf'],
'legal': ['compliance_guide.pdf', 'contract_templates.pdf']
}
# Build domain-specific knowledge base
await kb_manager.ingest_by_domain(domain_docs)
# Search within specific domain
hr_results = kb_manager.search_domain('hr', 'vacation policy')
# Cross-domain search
all_results = kb_manager.cross_domain_search('security requirements')
Knowledge Base Analytics
Monitor and analyze your knowledge base performance:class KnowledgeAnalytics:
def __init__(self, knowledge_base: List[Dict]):
self.kb = knowledge_base
def analyze_coverage(self):
"""Analyze knowledge base coverage"""
from collections import Counter
# Domain distribution
domains = Counter(chunk['domain'] for chunk in self.kb)
# Source document distribution
sources = Counter(chunk['source'] for chunk in self.kb)
# Content type distribution
section_types = Counter(chunk['section_type'] for chunk in self.kb)
return {
'total_chunks': len(self.kb),
'domains': dict(domains),
'sources': dict(sources),
'section_types': dict(section_types),
'avg_chunk_length': sum(len(chunk['content']) for chunk in self.kb) / len(self.kb)
}
def identify_gaps(self, query_log: List[str]):
"""Identify knowledge gaps from query patterns"""
gap_analysis = {}
for query in query_log:
# Simplified gap detection
query_terms = set(query.lower().split())
# Find chunks that might answer this query
relevant_chunks = []
for chunk in self.kb:
chunk_terms = set(chunk['content'].lower().split())
overlap = len(query_terms & chunk_terms)
if overlap > 0:
relevant_chunks.append({
'chunk': chunk,
'overlap': overlap
})
if not relevant_chunks:
gap_analysis[query] = 'no_relevant_content'
elif max(c['overlap'] for c in relevant_chunks) < 2:
gap_analysis[query] = 'insufficient_coverage'
return gap_analysis
def suggest_improvements(self):
"""Suggest knowledge base improvements"""
analysis = self.analyze_coverage()
suggestions = []
# Check domain balance
domain_counts = analysis['domains']
max_domain = max(domain_counts.values())
min_domain = min(domain_counts.values())
if max_domain > min_domain * 3:
suggestions.append({
'type': 'domain_imbalance',
'message': f'Consider adding more content to underrepresented domains',
'details': domain_counts
})
# Check chunk size variation
avg_length = analysis['avg_chunk_length']
if avg_length < 300:
suggestions.append({
'type': 'chunk_size',
'message': 'Chunks may be too small for good context',
'recommendation': 'Consider increasing target_size to 500-800'
})
elif avg_length > 1200:
suggestions.append({
'type': 'chunk_size',
'message': 'Chunks may be too large for precise retrieval',
'recommendation': 'Consider decreasing target_size to 600-900'
})
return suggestions
# Usage with previous knowledge manager
analytics = KnowledgeAnalytics(
[chunk for chunks in kb_manager.knowledge_domains.values() for chunk in chunks]
)
coverage = analytics.analyze_coverage()
print(f"Knowledge base coverage: {coverage}")
# Analyze query gaps
sample_queries = [
"remote work policy",
"expense reimbursement process",
"security incident reporting",
"performance review cycle"
]
gaps = analytics.identify_gaps(sample_queries)
suggestions = analytics.suggest_improvements()
Real-World Knowledge Management Use Cases
Customer Support Knowledge Base
async def build_support_knowledge_base(support_docs):
"""Build customer support knowledge base"""
async with AsyncLexa(api_key="your-api-key") as client:
documents = await client.parse(support_docs)
support_kb = []
for doc in documents:
# Focus on Q&A and troubleshooting content
chunks = doc.get_text_chunks(target_size=400) # Good for FAQ format
for chunk in chunks:
# Classify support content type
content_type = classify_support_content(chunk)
if content_type != 'irrelevant':
support_kb.append({
'content': chunk,
'type': content_type,
'source': doc.filename,
'priority': calculate_support_priority(chunk)
})
return support_kb
def classify_support_content(text):
"""Classify customer support content"""
text_lower = text.lower()
if any(term in text_lower for term in ['question', 'q:', 'faq', 'how to']):
return 'faq'
elif any(term in text_lower for term in ['error', 'issue', 'problem', 'troubleshoot']):
return 'troubleshooting'
elif any(term in text_lower for term in ['step', 'guide', 'instruction']):
return 'guide'
else:
return 'general'
def calculate_support_priority(text):
"""Calculate priority based on urgency indicators"""
urgency_terms = ['critical', 'urgent', 'emergency', 'down', 'broken']
priority_score = sum(1 for term in urgency_terms if term in text.lower())
return min(5, priority_score + 1) # Scale 1-5
Training Documentation System
async def create_training_system(training_materials):
"""Create structured training documentation system"""
async with AsyncLexa(api_key="your-api-key") as client:
documents = await client.parse(training_materials)
training_modules = {}
for doc in documents:
# Identify training modules from content
module_name = extract_module_name(doc.filename)
# Create learning-optimized chunks
chunks = doc.get_text_chunks(
target_size=800, # Good for learning context
tolerance=0.25
)
learning_chunks = []
for i, chunk in enumerate(chunks):
learning_chunks.append({
'content': chunk,
'module': module_name,
'sequence': i,
'learning_type': identify_learning_type(chunk),
'difficulty': assess_difficulty(chunk),
'prerequisites': extract_prerequisites(chunk)
})
training_modules[module_name] = learning_chunks
return training_modules
def identify_learning_type(text):
"""Identify type of learning content"""
text_lower = text.lower()
if any(term in text_lower for term in ['example', 'case study', 'scenario']):
return 'example'
elif any(term in text_lower for term in ['concept', 'theory', 'principle']):
return 'concept'
elif any(term in text_lower for term in ['practice', 'exercise', 'hands-on']):
return 'practice'
else:
return 'information'
Vector Database Integration
Pinecone Knowledge Base
import pinecone
from cerevox import AsyncLexa
async def build_pinecone_knowledge_base(documents):
"""Build knowledge base in Pinecone vector database"""
# Initialize Pinecone
pinecone.init(api_key="your-pinecone-key", environment="your-env")
index = pinecone.Index("knowledge-base")
async with AsyncLexa(api_key="your-api-key") as client:
docs = await client.parse(documents)
# Create knowledge-optimized chunks
chunks = docs.get_all_text_chunks(
target_size=700, # Good for knowledge retrieval
tolerance=0.15
)
# Prepare vectors for Pinecone
vectors = []
for i, chunk in enumerate(chunks):
# Generate embedding (use your preferred embedding model)
embedding = generate_embedding(chunk)
# Create rich metadata
metadata = {
'content': chunk,
'source': docs[i // len(chunks) * len(docs)].filename,
'domain': classify_domain(chunk),
'chunk_index': i,
'content_type': identify_content_type(chunk)
}
vectors.append({
'id': f'kb_chunk_{i}',
'values': embedding,
'metadata': metadata
})
# Batch upsert to Pinecone
index.upsert(vectors=vectors)
return f"Knowledge base: {len(vectors)} chunks indexed in Pinecone"
async def query_knowledge_base(query: str, top_k: int = 5):
"""Query the Pinecone knowledge base"""
# Generate query embedding
query_embedding = generate_embedding(query)
# Search Pinecone
index = pinecone.Index("knowledge-base")
results = index.query(
vector=query_embedding,
top_k=top_k,
include_metadata=True
)
# Format results
knowledge_results = []
for match in results['matches']:
knowledge_results.append({
'content': match['metadata']['content'],
'source': match['metadata']['source'],
'domain': match['metadata']['domain'],
'relevance_score': match['score']
})
return knowledge_results
Performance for Knowledge Management
Document Processing
25 seconds for 1000+ page knowledge corpus
Chunk Quality
98.5% context preservation in chunking
Search Accuracy
95% relevant results in top 5 matches
Best Practices for Knowledge Bases
Optimal Chunking Strategy
# Different strategies for different knowledge types
knowledge_chunking_strategies = {
'technical_docs': {
'target_size': 800, # Preserve technical context
'tolerance': 0.1 # Less flexibility for precision
},
'policies': {
'target_size': 600, # Complete policy sections
'tolerance': 0.2 # Allow for natural breaks
},
'faqs': {
'target_size': 300, # One Q&A per chunk
'tolerance': 0.15 # Maintain Q&A integrity
},
'procedures': {
'target_size': 500, # Complete procedural steps
'tolerance': 0.1 # Maintain step sequences
}
}
async def smart_knowledge_chunking(documents, doc_type='general'):
"""Apply optimal chunking strategy based on document type"""
strategy = knowledge_chunking_strategies.get(doc_type, {
'target_size': 600,
'tolerance': 0.15
})
async with AsyncLexa(api_key="your-api-key") as client:
docs = await client.parse(documents)
return docs.get_all_text_chunks(
target_size=strategy['target_size'],
tolerance=strategy['tolerance']
)
Next Steps
Get Started
Build your first knowledge base in minutes
Vector Database Guide
Learn RAG implementation patterns
Best Practices
Optimize for production knowledge systems
API Reference
Explore advanced parsing methods

