Skip to main content

Hippo Best Practices

Maximize answer quality while achieving 80% cost reduction with these proven strategies.

Document Preparation

Upload High-Quality Documents

Prefer: Text-based PDFs (created from Word, Google Docs, etc.) Avoid: Scanned/image PDFs (OCR quality varies)
# Check if PDF is text-based
import PyPDF2

def is_text_pdf(file_path):
    with open(file_path, 'rb') as f:
        pdf = PyPDF2.PdfReader(f)
        text = pdf.pages[0].extract_text()
        return len(text.strip()) > 50  # Has extractable text

if is_text_pdf("document.pdf"):
    hippo.upload_file(folder_id, "document.pdf")
else:
    print("Warning: Scanned PDF - consider OCR first")
Impact: 30-40% better accuracy with text-based PDFs
Before uploading, remove:
  • Cover pages and blank pages
  • Table of contents (unless needed for answers)
  • Advertisements and promotional material
  • Appendices with irrelevant data
Impact: Faster processing + less noise in answers
Good formatting:
  • Clear headings and structure
  • Proper paragraph breaks
  • Readable fonts (not decorative)
  • Logical document flow
Bad formatting:
  • All-caps text
  • Excessive formatting
  • Broken layouts
  • Mixed languages without context
Impact: Better chunk quality → Better retrieval

Folder Organization

Strategic Document Grouping

Related Content Together

✅ Good: All product docs in one folder
product_folder = hippo.create_folder("Product V2 Docs")
hippo.upload_file(product_folder.id, "features.pdf")
hippo.upload_file(product_folder.id, "api.pdf")
hippo.upload_file(product_folder.id, "examples.pdf")
Impact: Better cross-document answers

Separate Unrelated Content

✅ Good: Separate folders for different products
product_a_folder = hippo.create_folder("Product A")
product_b_folder = hippo.create_folder("Product B")
❌ Bad: Mix all products in one folderImpact: Reduced confusion, better precision

Folder Size Sweet Spot

# Optimal folder sizes for best performance
folder_guidelines = {
    "Small": "5-20 documents",      # Fast, focused
    "Medium": "20-100 documents",   # Recommended
    "Large": "100-500 documents",   # Still good
    "Very Large": "500+ documents"  # Consider splitting
}
Recommendation: 20-100 related documents per folder for best results

Question Optimization

Write Clear, Specific Questions

  • Factual Questions
  • How-To Questions
  • Comparison Questions
✅ Good:
  • “What is the API rate limit for Pro plan users?”
  • “What is the refund window for digital products?”
  • “What authentication methods does the API support?”
❌ Bad:
  • “Tell me about limits”
  • “Refunds?”
  • “Auth”
Impact: 2-3x better answer relevance

Leverage Follow-Up Questions

# Use conversation context for follow-ups
def conversational_qa(hippo, chat_id):
    # Q1: Establish context
    a1 = hippo.submit_ask(
        chat_id,
        "What are the API authentication methods?"
    )
    print(f"Q1: {a1.response}\n")

    # Q2: Follow-up (uses Q1 context)
    a2 = hippo.submit_ask(
        chat_id,
        "Which one is most secure?"  # Refers to "methods" from Q1
    )
    print(f"Q2: {a2.response}\n")

    # Q3: Another follow-up (uses Q2 context)
    a3 = hippo.submit_ask(
        chat_id,
        "How do I implement it?"  # Refers to "secure method" from Q2
    )
    print(f"Q3: {a3.response}\n")

    return [a1, a2, a3]
Impact: Natural conversation flow → Better understanding

Performance Optimization

Use Async for Scale

# Sequential uploads - 30 seconds
for file in files:
    hippo.upload_file(folder_id, file)
Impact: 5-10x faster batch operations
import asyncio

async def batch_qa(hippo, chat_id, questions):
    """Ask multiple questions concurrently"""
    tasks = [
        hippo.submit_ask(chat_id, q)
        for q in questions
    ]

    answers = await asyncio.gather(*tasks)
    return answers

# Usage
questions = [
    "What is the API rate limit?",
    "What are the supported file formats?",
    "How do I authenticate?"
]

async with AsyncHippo() as hippo:
    answers = await batch_qa(hippo, chat_id, questions)

    for q, a in zip(questions, answers):
        print(f"Q: {q}")
        print(f"A: {a.response}\n")
Impact: 3-5x faster for multiple independent questions

Cost Optimization

Maximize the 80% Savings

Upload Once, Query Many

# ✅ Upload documents once
folder = hippo.create_folder("Docs")
hippo.upload_file(folder.id, "guide.pdf")

# ✅ Ask many questions (cost-effective)
chat = hippo.create_chat(folder.id)
for question in questions:
    answer = hippo.submit_ask(chat.id, question)
Impact: Amortize upload cost over many queries

Reuse Chats When Appropriate

# ✅ Reuse chat for related questions
support_chat = hippo.create_chat(folder.id, "Support")

# Multiple user questions use same chat
for user_question in user_questions:
    answer = hippo.submit_ask(support_chat.id, user_question)
Impact: Maintain context, reduce overhead

Precision Retrieval Benefits

Hippo automatically retrieves only relevant chunks:
# Traditional RAG
full_docs = load_documents()  # 50,000 tokens
cost_traditional = 50_000 * $0.001  # $0.05 per query

# Hippo RAG
answer = hippo.submit_ask(chat_id, question)
# → Retrieves ~15,000 tokens (70% smaller)
cost_hippo = 15_000 * $0.0002  # $0.003 per query

# Savings: 80% reduction
print(f"Traditional: ${cost_traditional:.3f}")
print(f"Hippo: ${cost_hippo:.3f}")
print(f"Savings: {(1 - cost_hippo/cost_traditional)*100:.0f}%")

Answer Quality

Verify with Confidence Scores

def get_verified_answer(hippo, chat_id, question):
    """Get answer with confidence verification"""
    answer = hippo.submit_ask(chat_id, question)

    if answer.confidence_score >= 0.9:
        status = "✅ High confidence"
    elif answer.confidence_score >= 0.7:
        status = "⚠️ Medium confidence - verify sources"
    else:
        status = "❌ Low confidence - may need more documents"

    return {
        'answer': answer.response,
        'confidence': answer.confidence_score,
        'status': status,
        'sources': answer.sources
    }

# Usage
result = get_verified_answer(hippo, chat_id, "What is the SLA?")
print(f"{result['status']}")
print(f"Answer: {result['answer']}")

Use Source Citations

def display_answer_with_sources(answer):
    """Show answer with full source attribution"""
    print(f"Answer: {answer.response}\n")
    print(f"Confidence: {answer.confidence_score:.2f}\n")

    if answer.sources:
        print(f"Sources ({len(answer.sources)}):")
        for i, source in enumerate(answer.sources, 1):
            print(f"{i}. {source.file_name} (Page {source.page_number})")
            print(f"   Relevance: {source.relevance_score:.2f}")
            print(f"   Excerpt: {source.excerpt[:100]}...\n")
    else:
        print("⚠️ No sources found - answer may be uncertain")

# Usage
answer = hippo.submit_ask(chat_id, question)
display_answer_with_sources(answer)

Maintenance & Monitoring

Regular Cleanup

def cleanup_workspace(hippo):
    """Clean up old/unused resources"""
    folders = hippo.get_folders()

    for folder in folders:
        # Delete empty folders
        if folder.file_count == 0:
            print(f"Deleting empty folder: {folder.name}")
            hippo.delete_folder(folder.id)
            continue

        # Clean up old test chats
        chats = hippo.get_chats(folder.id)
        for chat in chats:
            if "test" in chat.name.lower() and chat.message_count == 0:
                print(f"Deleting test chat: {chat.name}")
                hippo.delete_chat(chat.id)

# Run monthly
cleanup_workspace(hippo)

Monitor Usage

from cerevox import Account

account = Account(api_key="your-api-key")

# Check usage
usage = account.get_usage()

print(f"API Calls: {usage.total_requests}")
print(f"Documents Processed: {usage.documents_processed}")
print(f"Questions Asked: {usage.questions_asked}")

# Check if approaching limits
if usage.total_requests > usage.rate_limit * 0.8:
    print("⚠️ Approaching rate limit - consider upgrading")

Production Checklist

  • Use environment variables for API keys
  • Never commit API keys to version control
  • Implement user-specific chat isolation
  • Delete sensitive data when no longer needed
  • Review uploaded documents for PII/sensitive data
  • Use async API for production workloads
  • Implement connection pooling
  • Add retry logic for failed requests
  • Cache frequently asked questions if appropriate
  • Monitor response times
from cerevox import HippoError

try:
    answer = hippo.submit_ask(chat_id, question)
except HippoError as e:
    if "rate limit" in str(e).lower():
        # Handle rate limiting
        time.sleep(60)
        answer = hippo.submit_ask(chat_id, question)
    elif "not found" in str(e).lower():
        # Handle missing resources
        print(f"Error: Chat or folder not found")
    else:
        # Log and handle other errors
        logger.error(f"Hippo error: {e}")
  • Track answer confidence scores
  • Monitor API usage and costs
  • Log low-confidence answers for review
  • Set up alerts for errors
  • Review source citations quality
  • Document folder organization strategy
  • Keep inventory of uploaded documents
  • Document common questions and answers
  • Maintain change log for document updates
  • Create runbooks for common operations

Common Pitfalls to Avoid

Don’t:
  • Mix unrelated documents in one folder
  • Use vague question phrasing
  • Ignore confidence scores
  • Upload scanned PDFs without OCR
  • Create new chats for every question
  • Forget to clean up test resources
  • Share API keys or commit them to git
Do:
  • Group related documents logically
  • Ask specific, clear questions
  • Verify low-confidence answers with sources
  • Use text-based documents when possible
  • Reuse chats for related conversations
  • Regularly clean up unused resources
  • Use environment variables for API keys

Complete Production Example

import os
import asyncio
import logging
from cerevox import AsyncHippo, HippoError

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ProductionRAGSystem:
    def __init__(self):
        self.hippo = None
        self.folders = {}
        self.chats = {}

    async def setup(self):
        """Initialize production RAG system"""
        api_key = os.getenv("CEREVOX_API_KEY")
        if not api_key:
            raise ValueError("CEREVOX_API_KEY not set")

        self.hippo = AsyncHippo(api_key=api_key)

        # Create knowledge bases
        self.folders['support'] = await self.hippo.create_folder(
            "Customer Support KB",
            "Support docs, FAQs, troubleshooting"
        )

        logger.info(f"Created folder: {self.folders['support'].name}")

    async def upload_documents(self, folder_key, file_paths):
        """Batch upload with error handling"""
        folder_id = self.folders[folder_key].id

        tasks = []
        for path in file_paths:
            if os.path.exists(path):
                tasks.append(self.hippo.upload_file(folder_id, path))
            else:
                logger.warning(f"File not found: {path}")

        try:
            files = await asyncio.gather(*tasks, return_exceptions=True)

            successful = [f for f in files if not isinstance(f, Exception)]
            failed = [f for f in files if isinstance(f, Exception)]

            logger.info(f"Uploaded {len(successful)} files")
            if failed:
                logger.error(f"Failed uploads: {len(failed)}")

            return successful

        except Exception as e:
            logger.error(f"Upload error: {e}")
            return []

    async def ask_question(self, folder_key, question):
        """Ask with retry logic and validation"""
        # Get or create chat
        if folder_key not in self.chats:
            folder_id = self.folders[folder_key].id
            self.chats[folder_key] = await self.hippo.create_chat(
                folder_id,
                f"{folder_key.title()} Chat"
            )

        chat_id = self.chats[folder_key].id

        # Ask with retry
        max_retries = 3
        for attempt in range(max_retries):
            try:
                answer = await self.hippo.submit_ask(chat_id, question)

                # Log quality metrics
                logger.info(
                    f"Q&A - Confidence: {answer.confidence_score:.2f}, "
                    f"Sources: {len(answer.sources)}"
                )

                return {
                    'answer': answer.response,
                    'confidence': answer.confidence_score,
                    'sources': answer.sources,
                    'verified': answer.confidence_score >= 0.7
                }

            except HippoError as e:
                if attempt < max_retries - 1:
                    await asyncio.sleep(2 ** attempt)  # Exponential backoff
                    continue
                else:
                    logger.error(f"Failed after {max_retries} attempts: {e}")
                    raise

    async def cleanup(self):
        """Clean up resources"""
        if self.hippo:
            await self.hippo.close()

# Usage
async def main():
    system = ProductionRAGSystem()

    try:
        await system.setup()

        # Upload docs
        docs = ["faq.pdf", "guide.pdf", "troubleshooting.pdf"]
        await system.upload_documents('support', docs)

        # Ask questions
        result = await system.ask_question(
            'support',
            "How do I reset my password?"
        )

        print(f"Answer: {result['answer']}")
        print(f"Verified: {result['verified']}")

    finally:
        await system.cleanup()

# Run
asyncio.run(main())

Next Steps

I