How to Build a RAG Chatbot in 6 Steps

A RAG chatbot augments language models with external knowledge retrieval, eliminating hallucinations by grounding responses in your actual data.

Instead of relying on pre-trained knowledge cutoffs, RAG dynamically fetches relevant context from your documents, databases, or APIs—making LLMs useful for real-world applications where accuracy matters.

If your goals include higher answer accuracy, traceable citations, and production-ready latency, this guide walks you through a pragmatic build—from BM25 and GraphRAG foundations to hybrid retrieval, reranking, and low-cost deployment.

We’ll keep this tutorial skimmable and hands-on, with code blocks you can paste into your project. Where many guides leap straight to vector databases, we’ll start with what actually moves metrics in practice.

Quick mindset: Ship a simple, measurable baseline fast (BM25), then layer in semantic vectors, reranking, and caching only where they demonstrably improve outcomes on your real user queries.

Why Most RAG Tutorials Get It Wrong

Everyone jumps straight to vector databases and semantic search. But here's what they don't tell you: BM25 finds the most relevant documents for a given query by examining two things: How often do the query words appear in each document? This keyword-based approach often outperforms pure semantic search for technical queries, product names, and exact matches.

The reality? Hybrid search is ideal for use cases where you want to enable semantic search capabilities for a more human-like search experience but also require exact phrase matching for specific terms, such as product names or serial numbers. You need both—and starting with BM25 gives you a working baseline faster.

Tip: Treat vectors as an optimization, not a religion. If BM25 plus a reranker nails your domain, skip the embedding pipeline and spend time on chunking, caching, and UX.

Step 1: Skip the Vector Database Hype (Start With BM25)

Before spending money on Pinecone or wrestling with FAISS installations, implement a BM25-based retriever. It's fast, requires zero embeddings, and works surprisingly well for many use cases.

from rank_bm25 import BM25Okapi
import json
import numpy as np

class SimpleBM25Retriever:
    def __init__(self, documents):
        # Tokenize documents for BM25
        self.documents = documents
        self.tokenized_docs = [doc.lower().split() for doc in documents]
        
        # Initialize BM25 with custom parameters
        # k1 controls term frequency saturation (1.2-2.0 typical)
        # b controls document length normalization (0.75 typical)
        self.bm25 = BM25Okapi(
            self.tokenized_docs, 
            k1=1.5, 
            b=0.75
        )
    
    def search(self, query, top_k=5):
        tokenized_query = query.lower().split()
        scores = self.bm25.get_scores(tokenized_query)
        
        # Get top-k document indices
        top_indices = np.argsort(scores)[::-1][:top_k]
        
        results = []
        for idx in top_indices:
            results.append({
                'document': self.documents[idx],
                'score': scores[idx],
                'index': idx
            })
        
        return results

This baseline retriever handles 80% of use cases without any ML infrastructure. Test it first—you might not need vectors at all.

Where BM25 shines

  • Exact strings & IDs: SKUs, ticket numbers, error codes, API names, product model numbers.
  • Long-tail queries: Niche terms with sparse semantic neighbors.
  • Compliance & policy snippets: Users often paste exact clauses; BM25 nails them.

Sanity checks to run (5 minutes)

  • Throw 20 real queries at BM25 and eyeball top-3 hits.
  • If your top-3 are consistently relevant (≥80%), delay vectors and jump to Step 4 (Reranking).

The GraphRAG Alternative

For queries requiring relationship traversal ("Who directed the movie where the Inception actor played a thief?"), consider GraphRAG instead of vectors. GraphRAG extends the capabilities of RAG by using knowledge graphs to represent information, allowing it to handle complex queries that require multi-hop reasoning rather than just relying on semantic similarity.

from neo4j import GraphDatabase
import json

class GraphRAGRetriever:
    def __init__(self, uri, user, password):
        self.driver = GraphDatabase.driver(uri, auth=(user, password))
    
    def extract_entities_and_relations(self, text):
        """Extract graph structure from text using NER and relation extraction"""
        # In production, use spaCy or an LLM for entity extraction
        # This is simplified for demonstration
        
        with self.driver.session() as session:
            # Create nodes and relationships
            session.run("""
                MERGE (e1:Entity {name: $entity1})
                MERGE (e2:Entity {name: $entity2})
                MERGE (e1)-[r:RELATES_TO {type: $relation}]->(e2)
            """, entity1="Leonardo DiCaprio", 
                 entity2="Inception", 
                 relation="ACTED_IN")
    
    def graph_search(self, query):
        """Traverse graph based on query patterns"""
        with self.driver.session() as session:
            # Example: Find movies by actor
            result = session.run("""
                MATCH (a:Entity)-[:ACTED_IN]->(m:Entity)
                WHERE a.name CONTAINS $actor_hint
                RETURN m.name as movie, a.name as actor
                LIMIT 5
            """, actor_hint=query)
            
            return [record for record in result]
When to try GraphRAG: Your domain has rich entities (people, products, contracts) with explicit relationships (owns, depends_on, references, supersedes) and your users ask multi-hop questions.

Step 2: Implement Smart Chunking That Actually Works

Chunking is a critical preprocessing step in RAG pipelines. It involves splitting documents into smaller, manageable pieces that can be efficiently indexed, retrieved, and used as context during response generation.

But here's the issue: most tutorials use naive fixed-size chunking. Real documents have structure—use it.

import re
from typing import List, Dict
import hashlib

class SemanticChunker:
    def __init__(self, 
                 chunk_size=512,
                 chunk_overlap=128,
                 min_chunk_size=100):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.min_chunk_size = min_chunk_size
    
    def chunk_by_structure(self, text: str) -> List[Dict]:
        """Smart chunking that respects document structure"""
        chunks = []
        
        # First, try to split by markdown headers
        header_pattern = r'^#{1,6}\s+.+$'
        sections = re.split(f'({header_pattern})', text, flags=re.MULTILINE)
        
        current_chunk = ""
        current_metadata = {}
        
        for i, section in enumerate(sections):
            if re.match(header_pattern, section):
                # This is a header
                if current_chunk and len(current_chunk) > self.min_chunk_size:
                    chunks.append({
                        'text': current_chunk.strip(),
                        'metadata': current_metadata,
                        'chunk_id': self._generate_chunk_id(current_chunk)
                    })
                current_metadata = {'header': section.strip()}
                current_chunk = section + "\n"
            else:
                # Regular content
                if len(current_chunk) + len(section) < self.chunk_size:
                    current_chunk += section
                else:
                    # Split large sections semantically
                    sentences = re.split(r'(?<=[.!?])\s+', section)
                    
                    for sentence in sentences:
                        if len(current_chunk) + len(sentence) < self.chunk_size:
                            current_chunk += " " + sentence
                        else:
                            if len(current_chunk) > self.min_chunk_size:
                                chunks.append({
                                    'text': current_chunk.strip(),
                                    'metadata': current_metadata,
                                    'chunk_id': self._generate_chunk_id(current_chunk)
                                })
                            
                            # Start new chunk with overlap
                            overlap_text = self._get_overlap(current_chunk)
                            current_chunk = overlap_text + sentence
        
        # Don't forget the last chunk
        if current_chunk and len(current_chunk) > self.min_chunk_size:
            chunks.append({
                'text': current_chunk.strip(),
                'metadata': current_metadata,
                'chunk_id': self._generate_chunk_id(current_chunk)
            })
        
        return chunks
    
    def _get_overlap(self, text: str) -> str:
        """Extract overlap text from the end of current chunk"""
        words = text.split()
        overlap_words = int(self.chunk_overlap / 5)  # Rough estimate
        return " ".join(words[-overlap_words:]) if len(words) > overlap_words else ""
    
    def _generate_chunk_id(self, text: str) -> str:
        """Generate unique ID for chunk"""
        return hashlib.md5(text.encode()).hexdigest()[:8]

Advanced: Contextual Chunking

The method is called "Contextual Retrieval" and uses two sub-techniques: Contextual Embeddings and Contextual BM25. This method can reduce the number of failed retrievals by 49% and, when combined with reranking, by 67%.

Add context to each chunk before embedding:

def add_chunk_context(chunks: List[Dict], document_title: str) -> List[Dict]:
    """Add document context to each chunk for better retrieval"""
    
    for i, chunk in enumerate(chunks):
        # Build context from surrounding chunks
        context_before = chunks[i-1]['text'][:200] if i > 0 else ""
        context_after = chunks[i+1]['text'][:200] if i < len(chunks)-1 else ""
        
        # Prepend contextual information
        chunk['contextualized_text'] = f"""
        Document: {document_title}
        Section: {chunk.get('metadata', {}).get('header', 'Main content')}
        Context: This chunk appears after discussing '{context_before}' 
                 and before '{context_after}'.
        
        Content: {chunk['text']}
        """
    
    return chunks
Practical guardrails: Keep chunks self-contained, but not isolated. Preserve headings in metadata for better reranking and for citations in your final answer.

Step 3: Build Your Hybrid Retrieval Pipeline

Now combine BM25 with semantic search for the best of both worlds. But here's the trick: weight them dynamically based on query type.

import numpy as np
from sentence_transformers import SentenceTransformer
from typing import List, Tuple
import re

class HybridRetriever:
    def __init__(self, documents: List[str]):
        self.documents = documents
        
        # Initialize BM25
        self.bm25_retriever = SimpleBM25Retriever(documents)
        
        # Initialize embedding model (use a small, fast one)
        self.encoder = SentenceTransformer('all-MiniLM-L6-v2')
        
        # Pre-compute document embeddings
        self.doc_embeddings = self.encoder.encode(documents, 
                                                 convert_to_tensor=True)
    
    def detect_query_type(self, query: str) -> Tuple[float, float]:
        """Dynamically adjust weights based on query characteristics"""
        
        # Check for exact match indicators
        has_quotes = '"' in query
        has_product_code = bool(re.search(r'\b[A-Z]{2,}-\d+\b', query))
        has_specific_terms = any(term in query.lower() 
                                for term in ['exact', 'specifically', 'model number'])
        
        # Check for semantic indicators  
        has_semantic_words = any(word in query.lower() 
                               for word in ['similar', 'like', 'about', 'related'])
        is_question = query.strip().endswith('?')
        
        # Calculate weights
        if has_quotes or has_product_code or has_specific_terms:
            # Favor BM25 for exact matches
            return 0.7, 0.3
        elif has_semantic_words or is_question:
            # Favor semantic search
            return 0.3, 0.7
        else:
            # Balanced approach
            return 0.5, 0.5
    
    def hybrid_search(self, query: str, top_k: int = 5) -> List[Dict]:
        # Get dynamic weights
        bm25_weight, semantic_weight = self.detect_query_type(query)
        
        # BM25 search
        bm25_results = self.bm25_retriever.search(query, top_k=top_k*2)
        
        # Semantic search
        query_embedding = self.encoder.encode(query, convert_to_tensor=True)
        semantic_scores = np.dot(self.doc_embeddings, query_embedding)
        top_semantic_idx = np.argsort(semantic_scores)[::-1][:top_k*2]
        
        # Normalize and combine scores
        scores_dict = {}
        
        # Add BM25 scores
        max_bm25 = max([r['score'] for r in bm25_results]) + 1e-6
        for result in bm25_results:
            idx = result['index']
            normalized_score = result['score'] / max_bm25
            scores_dict[idx] = bm25_weight * normalized_score
        
        # Add semantic scores
        max_semantic = semantic_scores.max() + 1e-6
        for idx in top_semantic_idx:
            normalized_score = semantic_scores[idx] / max_semantic
            if idx in scores_dict:
                scores_dict[idx] += semantic_weight * normalized_score
            else:
                scores_dict[idx] = semantic_weight * normalized_score
        
        # Sort by combined score
        sorted_indices = sorted(scores_dict.keys(), 
                              key=lambda x: scores_dict[x], 
                              reverse=True)[:top_k]
        
        results = []
        for idx in sorted_indices:
            results.append({
                'document': self.documents[idx],
                'score': scores_dict[idx],
                'index': idx,
                'retrieval_method': 'hybrid'
            })
        
        return results

Why hybrid beats either alone

  • BM25 gives you high-precision anchors for exact terms.
  • Semantic vectors capture paraphrases and fuzzy intent.
  • Dynamic weighting prevents you from hard-coding a one-size-fits-all α/β that degrades edge cases.
Debug trick: Log the chosen weights and top candidates for 100 queries. You’ll quickly see which patterns want BM25-heavy vs. semantic-heavy mixes.

Step 4: Add a Reranker (The Missing Performance Multiplier)

Cross-encoders are the standard models used for reranking in a RAG framework. Unlike retriever functions used in the initial retrieval step, which just take into account the similarity scores of different text chunks, cross-encoders are able to perform a more in-depth comparison of each of the retrieved text chunks with the user's query.

Most RAG implementations skip this crucial step. Don't.

from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch

class CrossEncoderReranker:
    def __init__(self, model_name='cross-encoder/ms-marco-TinyBERT-L-2-v2'):
        """Initialize a lightweight cross-encoder for reranking"""
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModelForSequenceClassification.from_pretrained(model_name)
        self.model.eval()
    
    def rerank(self, query: str, documents: List[str], top_k: int = 3) -> List[Dict]:
        """Rerank documents using cross-encoder scoring"""
        
        pairs = [[query, doc] for doc in documents]
        
        # Tokenize all pairs
        with torch.no_grad():
            inputs = self.tokenizer(pairs, 
                                   padding=True, 
                                   truncation=True, 
                                   max_length=512,
                                   return_tensors='pt')
            
            # Get relevance scores
            scores = self.model(**inputs).logits.squeeze(-1)
            scores = torch.sigmoid(scores).numpy()
        
        # Sort by score
        sorted_indices = scores.argsort()[::-1][:top_k]
        
        results = []
        for idx in sorted_indices:
            results.append({
                'document': documents[idx],
                'relevance_score': float(scores[idx]),
                'original_rank': idx
            })
        
        return results

class LLMReranker:
    """Alternative: Use an LLM for reranking when precision is critical"""
    
    def __init__(self, model_name="gpt-3.5-turbo"):
        self.model = model_name
    
    def rerank_with_llm(self, query: str, documents: List[str]) -> List[Dict]:
        """Use LLM to score relevance - more accurate but slower"""
        
        prompt = f"""Given the query: "{query}"
        
        Score each document's relevance from 0-10:
        
        {chr(10).join([f"Document {i+1}: {doc[:200]}..." for i, doc in enumerate(documents)])}
        
        Return only scores as: [score1, score2, ...]"""
        
        # In production, call your LLM API here
        # This is a placeholder for the pattern
        scores = [7, 3, 9, 5, 8]  # Mock scores
        
        ranked_docs = []
        for i, (doc, score) in enumerate(zip(documents, scores)):
            ranked_docs.append({
                'document': doc,
                'llm_relevance': score,
                'index': i
            })
        
        return sorted(ranked_docs, key=lambda x: x['llm_relevance'], reverse=True)

Pragmatic guidance

  • Start with a tiny cross-encoder for cost/latency.
  • Keep top-10 from retrieval, rerank to top-3, and pass those to generation.
  • If latency allows, sample an LLM-based reranker on a subset to validate improvements before switching.

Step 5: Construct the Generation Chain

Now assemble everything into a production-ready pipeline. But here's where we diverge from typical tutorials—implement response caching and fallback strategies.

import hashlib
import json
from typing import Dict, List, Optional
import redis

class RAGPipeline:
    def __init__(self, 
                 documents: List[str],
                 use_cache: bool = True,
                 cache_ttl: int = 3600):
        
        # Initialize components
        self.hybrid_retriever = HybridRetriever(documents)
        self.reranker = CrossEncoderReranker()
        
        # Initialize cache (Redis for production, dict for development)
        self.use_cache = use_cache
        if use_cache:
            try:
                self.cache = redis.Redis(host='localhost', port=6379, db=0)
                self.cache_ttl = cache_ttl
            except:
                # Fallback to in-memory cache
                self.cache = {}
                print("Redis not available, using in-memory cache")
        
    def generate_cache_key(self, query: str, context: str) -> str:
        """Generate deterministic cache key"""
        combined = f"{query}:{context[:500]}"
        return hashlib.md5(combined.encode()).hexdigest()
    
    def retrieve_and_generate(self, 
                             query: str,
                             llm_function,
                             max_context_length: int = 2000) -> Dict:
        
        # Check cache first
        if self.use_cache:
            cache_key = self.generate_cache_key(query, query)
            cached = self._get_from_cache(cache_key)
            if cached:
                return cached
        
        # Step 1: Hybrid retrieval
        initial_results = self.hybrid_retriever.hybrid_search(query, top_k=10)
        
        # Step 2: Reranking
        documents = [r['document'] for r in initial_results]
        reranked = self.reranker.rerank(query, documents, top_k=3)
        
        # Step 3: Build context (with token limit management)
        context = self._build_context(reranked, max_context_length)
        
        # Step 4: Generate response
        response = self._generate_with_fallback(query, context, llm_function)
        
        # Cache the result
        if self.use_cache:
            self._cache_result(cache_key, response)
        
        return response
    
    def _build_context(self, documents: List[Dict], max_length: int) -> str:
        """Build context with smart truncation"""
        context = ""
        
        for i, doc in enumerate(documents):
            doc_text = doc['document']
            
            # Add source citation
            formatted = f"\n[Source {i+1} - Relevance: {doc['relevance_score']:.2f}]\n{doc_text}\n"
            
            # Check if adding this would exceed limit
            if len(context) + len(formatted) > max_length:
                # Truncate the last document to fit
                remaining = max_length - len(context) - 50  # Buffer
                if remaining > 100:  # Only add if meaningful
                    formatted = formatted[:remaining] + "..."
                    context += formatted
                break
            
            context += formatted
        
        return context
    
    def _generate_with_fallback(self, query: str, context: str, llm_function) -> Dict:
        """Generate with fallback strategies"""
        
        prompt = f"""Answer the question based on the context provided. 
        If the context doesn't contain the answer, say "I cannot find this information in the provided context."
        
        Context:
        {context}
        
        Question: {query}
        
        Answer:"""
        
        try:
            # Primary LLM call
            response = llm_function(prompt)
            
            # Validate response
            if self._is_valid_response(response):
                return {
                    'answer': response,
                    'context_used': context[:500],
                    'status': 'success'
                }
            else:
                # Fallback to simpler extraction
                return self._extractive_fallback(query, context)
                
        except Exception as e:
            # Ultimate fallback: extractive answer
            return {
                'answer': self._extractive_fallback(query, context)['answer'],
                'context_used': context[:500],
                'status': 'fallback',
                'error': str(e)
            }
    
    def _extractive_fallback(self, query: str, context: str) -> Dict:
        """Simple extractive answer when generation fails"""
        
        # Find the most relevant sentence
        sentences = context.split('.')
        query_words = set(query.lower().split())
        
        best_sentence = ""
        best_score = 0
        
        for sentence in sentences:
            sentence_words = set(sentence.lower().split())
            overlap = len(query_words.intersection(sentence_words))
            
            if overlap > best_score:
                best_score = overlap
                best_sentence = sentence.strip()
        
        return {
            'answer': best_sentence if best_sentence else "No relevant information found.",
            'method': 'extractive_fallback'
        }
    
    def _is_valid_response(self, response: str) -> bool:
        """Validate LLM response"""
        if not response or len(response) < 10:
            return False
        
        # Check for common failure patterns
        failure_patterns = [
            "i cannot assist",
            "i don't have access",
            "error occurred",
            "context is unclear"
        ]
        
        response_lower = response.lower()
        return not any(pattern in response_lower for pattern in failure_patterns)
    
    def _get_from_cache(self, key: str) -> Optional[Dict]:
        """Retrieve from cache"""
        if isinstance(self.cache, dict):
            return self.cache.get(key)
        else:
            cached = self.cache.get(key)
            return json.loads(cached) if cached else None
    
    def _cache_result(self, key: str, result: Dict):
        """Store in cache"""
        if isinstance(self.cache, dict):
            self.cache[key] = result
        else:
            self.cache.setex(key, self.cache_ttl, json.dumps(result))

Production notes

  • Cache keys should include a hash of normalized query + top context IDs to avoid stale collisions.
  • Always log which sources made it into the context; this is invaluable for debugging user complaints.
  • Add a truthiness check in the prompt (“If not in context, say you can’t find it”) to curb hallucinations.

Step 6: Deploy Without the Cloud Tax

Instead of defaulting to expensive managed services, here's a production setup that runs on a $20/month VPS:

# docker-compose.yml for self-hosted RAG
"""
version: '3.8'

services:
  qdrant:
    image: qdrant/qdrant
    ports:
      - "6333:6333"
    volumes:
      - ./qdrant_storage:/qdrant/storage
  
  redis:
    image: redis:alpine
    ports:
      - "6379:6379"
    volumes:
      - ./redis_data:/data
  
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - QDRANT_URL=http://qdrant:6333
      - REDIS_URL=redis://redis:6379
    depends_on:
      - qdrant
      - redis
"""

# FastAPI server with streaming responses
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
import asyncio
from typing import AsyncGenerator

app = FastAPI()

# Initialize your RAG pipeline
rag_pipeline = RAGPipeline(documents=load_your_documents())

@app.post("/chat")
async def chat_endpoint(query: str):
    """Streaming RAG endpoint"""
    
    async def generate_stream() -> AsyncGenerator[str, None]:
        # Retrieve context
        context = rag_pipeline.hybrid_retriever.hybrid_search(query, top_k=5)
        
        # Stream the response
        response = rag_pipeline.retrieve_and_generate(
            query,
            llm_function=your_llm_function
        )
        
        # Stream response in chunks
        for i in range(0, len(response['answer']), 20):
            chunk = response['answer'][i:i+20]
            yield f"data: {json.dumps({'text': chunk})}\n\n"
            await asyncio.sleep(0.1)  # Simulate streaming
    
    return StreamingResponse(
        generate_stream(),
        media_type="text/event-stream"
    )

@app.get("/health")
async def health_check():
    return {"status": "healthy", "cache_size": len(rag_pipeline.cache)}

Production Optimization Tricks

  1. Use SQLite for Small-Scale Vector Storage

Instead of complex vector databases, SQLite with the sqlite-vss extension handles millions of vectors efficiently:

import sqlite3
import sqlite_vss
import numpy as np

def setup_sqlite_vector_store():
    conn = sqlite3.connect('vectors.db')
    conn.enable_load_extension(True)
    sqlite_vss.load(conn)
    
    # Create vector table
    conn.execute("""
        CREATE VIRTUAL TABLE IF NOT EXISTS doc_vectors USING vss0(
            embedding(384),
            document_id INTEGER PRIMARY KEY,
            document TEXT
        )
    """)
    
    return conn

def search_vectors(conn, query_embedding, k=5):
    results = conn.execute("""
        SELECT document_id, document, distance
        FROM doc_vectors
        WHERE vss_search(embedding, ?)
        LIMIT ?
    """, (query_embedding.tobytes(), k))
    
    return results.fetchall()
  1. Implement Request Batching
from collections import defaultdict
import asyncio

class BatchedRAG:
    def __init__(self, rag_pipeline, batch_size=10, batch_timeout=0.1):
        self.rag_pipeline = rag_pipeline
        self.batch_size = batch_size
        self.batch_timeout = batch_timeout
        self.pending_requests = []
        self.results = {}
    
    async def process_query(self, query_id: str, query: str):
        """Add query to batch and wait for result"""
        future = asyncio.Future()
        self.pending_requests.append((query_id, query, future))
        
        # Trigger batch processing if size reached
        if len(self.pending_requests) >= self.batch_size:
            await self._process_batch()
        else:
            # Schedule timeout-based processing
            asyncio.create_task(self._timeout_trigger())
        
        return await future
    
    async def _process_batch(self):
        """Process all pending requests in batch"""
        if not self.pending_requests:
            return
        
        batch = self.pending_requests[:self.batch_size]
        self.pending_requests = self.pending_requests[self.batch_size:]
        
        # Batch retrieve
        queries = [q for _, q, _ in batch]
        contexts = self._batch_retrieve(queries)
        
        # Generate responses
        for (query_id, query, future), context in zip(batch, contexts):
            response = await self._generate_response(query, context)
            future.set_result(response)
    
    def _batch_retrieve(self, queries: List[str]) -> List[str]:
        """Efficient batch retrieval"""
        # Encode all queries at once
        all_results = []
        for query in queries:
            results = self.rag_pipeline.hybrid_retriever.hybrid_search(query)
            all_results.append(results)
        return all_results
Ops checklist: Add /health, /metrics, and /cache introspection endpoints. Log query → weights → candidates → reranked → final context for a 1% sample to investigate misses.

Performance Metrics That Actually Matter

Track these metrics in production:

  1. First Token Latency: Time to first streamed token (target: <500ms)
  2. Retrieval Precision@3: Are top 3 results relevant? (target: >80%)
  3. Cache Hit Rate: Percentage of cached responses (target: >30%)
  4. Fallback Rate: How often generation fails (target: <5%)
class RAGMetrics:
    def __init__(self):
        self.metrics = defaultdict(list)
    
    def track_retrieval(self, query, retrieved_docs, relevant_docs):
        """Track retrieval metrics"""
        # Precision@k
        k = 3
        top_k = retrieved_docs[:k]
        relevant_in_top_k = sum(1 for doc in top_k if doc in relevant_docs)
        precision_at_k = relevant_in_top_k / k
        
        self.metrics['precision@3'].append(precision_at_k)
        
        # Mean Reciprocal Rank (MRR)
        for i, doc in enumerate(retrieved_docs):
            if doc in relevant_docs:
                self.metrics['mrr'].append(1 / (i + 1))
                break
        else:
            self.metrics['mrr'].append(0)
    
    def get_summary(self):
        return {
            metric: np.mean(values) 
            for metric, values in self.metrics.items()
        }

Benchlike, not benchmark: Use your own gold labels. Ask SMEs to mark “good enough” vs. “wrong,” then optimize Precision@3 and MRR on that set. That’s how you avoid overfitting to public leaderboards that don’t represent your corpus.

The Uncomfortable Truth About RAG

Here's what vendors won't tell you: Include too much in a chunk and the vector loses the ability to be specific to anything it discusses. Include too little and you lose the context of the data. There's no perfect chunk size—it's all trade-offs.

The real optimization happens at the retrieval layer, not the LLM. A fast BM25 search with good reranking beats an expensive vector database with poor chunking every time.

Heuristic to live by: If reranking a smaller candidate pool gives a bigger lift than swapping LLMs, your bottleneck is retrieval—not generation.

What's Next?

  1. Test BM25 First: Before investing in vector infrastructure, establish a baseline with keyword search.
  2. Measure Real Queries: Log actual user queries and optimize for those patterns, not synthetic benchmarks.
  3. Consider GraphRAG: For relationship-heavy data, graph traversal beats semantic similarity.
  4. Cache Aggressively: Most queries follow a power law—cache the common ones.
  5. Monitor Fallbacks: When generation fails, your extractive fallback is your safety net.

Remember: The best RAG system is the one that ships. Start simple with BM25, add vectors when needed, and only adopt GraphRAG when relationships matter. Your users care about accuracy and speed, not your embedding model.

Marius Bernard

Marius Bernard

Marius Bernard is a Product Advisor, Technical SEO, & Brand Ambassador at Roundproxies. He was the lead author for the SEO chapter of the 2024 Web and a reviewer for the 2023 SEO chapter.