A RAG chatbot augments language models with external knowledge retrieval, eliminating hallucinations by grounding responses in your actual data.
Instead of relying on pre-trained knowledge cutoffs, RAG dynamically fetches relevant context from your documents, databases, or APIs—making LLMs useful for real-world applications where accuracy matters.
If your goals include higher answer accuracy, traceable citations, and production-ready latency, this guide walks you through a pragmatic build—from BM25 and GraphRAG foundations to hybrid retrieval, reranking, and low-cost deployment.
We’ll keep this tutorial skimmable and hands-on, with code blocks you can paste into your project. Where many guides leap straight to vector databases, we’ll start with what actually moves metrics in practice.
Quick mindset: Ship a simple, measurable baseline fast (BM25), then layer in semantic vectors, reranking, and caching only where they demonstrably improve outcomes on your real user queries.
Why Most RAG Tutorials Get It Wrong
Everyone jumps straight to vector databases and semantic search. But here's what they don't tell you: BM25 finds the most relevant documents for a given query by examining two things: How often do the query words appear in each document? This keyword-based approach often outperforms pure semantic search for technical queries, product names, and exact matches.
The reality? Hybrid search is ideal for use cases where you want to enable semantic search capabilities for a more human-like search experience but also require exact phrase matching for specific terms, such as product names or serial numbers. You need both—and starting with BM25 gives you a working baseline faster.
Tip: Treat vectors as an optimization, not a religion. If BM25 plus a reranker nails your domain, skip the embedding pipeline and spend time on chunking, caching, and UX.
Step 1: Skip the Vector Database Hype (Start With BM25)
Before spending money on Pinecone or wrestling with FAISS installations, implement a BM25-based retriever. It's fast, requires zero embeddings, and works surprisingly well for many use cases.
from rank_bm25 import BM25Okapi
import json
import numpy as np
class SimpleBM25Retriever:
def __init__(self, documents):
# Tokenize documents for BM25
self.documents = documents
self.tokenized_docs = [doc.lower().split() for doc in documents]
# Initialize BM25 with custom parameters
# k1 controls term frequency saturation (1.2-2.0 typical)
# b controls document length normalization (0.75 typical)
self.bm25 = BM25Okapi(
self.tokenized_docs,
k1=1.5,
b=0.75
)
def search(self, query, top_k=5):
tokenized_query = query.lower().split()
scores = self.bm25.get_scores(tokenized_query)
# Get top-k document indices
top_indices = np.argsort(scores)[::-1][:top_k]
results = []
for idx in top_indices:
results.append({
'document': self.documents[idx],
'score': scores[idx],
'index': idx
})
return results
This baseline retriever handles 80% of use cases without any ML infrastructure. Test it first—you might not need vectors at all.
Where BM25 shines
- Exact strings & IDs: SKUs, ticket numbers, error codes, API names, product model numbers.
- Long-tail queries: Niche terms with sparse semantic neighbors.
- Compliance & policy snippets: Users often paste exact clauses; BM25 nails them.
Sanity checks to run (5 minutes)
- Throw 20 real queries at BM25 and eyeball top-3 hits.
- If your top-3 are consistently relevant (≥80%), delay vectors and jump to Step 4 (Reranking).
The GraphRAG Alternative
For queries requiring relationship traversal ("Who directed the movie where the Inception actor played a thief?"), consider GraphRAG instead of vectors. GraphRAG extends the capabilities of RAG by using knowledge graphs to represent information, allowing it to handle complex queries that require multi-hop reasoning rather than just relying on semantic similarity.
from neo4j import GraphDatabase
import json
class GraphRAGRetriever:
def __init__(self, uri, user, password):
self.driver = GraphDatabase.driver(uri, auth=(user, password))
def extract_entities_and_relations(self, text):
"""Extract graph structure from text using NER and relation extraction"""
# In production, use spaCy or an LLM for entity extraction
# This is simplified for demonstration
with self.driver.session() as session:
# Create nodes and relationships
session.run("""
MERGE (e1:Entity {name: $entity1})
MERGE (e2:Entity {name: $entity2})
MERGE (e1)-[r:RELATES_TO {type: $relation}]->(e2)
""", entity1="Leonardo DiCaprio",
entity2="Inception",
relation="ACTED_IN")
def graph_search(self, query):
"""Traverse graph based on query patterns"""
with self.driver.session() as session:
# Example: Find movies by actor
result = session.run("""
MATCH (a:Entity)-[:ACTED_IN]->(m:Entity)
WHERE a.name CONTAINS $actor_hint
RETURN m.name as movie, a.name as actor
LIMIT 5
""", actor_hint=query)
return [record for record in result]
When to try GraphRAG: Your domain has rich entities (people, products, contracts) with explicit relationships (owns, depends_on, references, supersedes) and your users ask multi-hop questions.
Step 2: Implement Smart Chunking That Actually Works
Chunking is a critical preprocessing step in RAG pipelines. It involves splitting documents into smaller, manageable pieces that can be efficiently indexed, retrieved, and used as context during response generation.
But here's the issue: most tutorials use naive fixed-size chunking. Real documents have structure—use it.
import re
from typing import List, Dict
import hashlib
class SemanticChunker:
def __init__(self,
chunk_size=512,
chunk_overlap=128,
min_chunk_size=100):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.min_chunk_size = min_chunk_size
def chunk_by_structure(self, text: str) -> List[Dict]:
"""Smart chunking that respects document structure"""
chunks = []
# First, try to split by markdown headers
header_pattern = r'^#{1,6}\s+.+$'
sections = re.split(f'({header_pattern})', text, flags=re.MULTILINE)
current_chunk = ""
current_metadata = {}
for i, section in enumerate(sections):
if re.match(header_pattern, section):
# This is a header
if current_chunk and len(current_chunk) > self.min_chunk_size:
chunks.append({
'text': current_chunk.strip(),
'metadata': current_metadata,
'chunk_id': self._generate_chunk_id(current_chunk)
})
current_metadata = {'header': section.strip()}
current_chunk = section + "\n"
else:
# Regular content
if len(current_chunk) + len(section) < self.chunk_size:
current_chunk += section
else:
# Split large sections semantically
sentences = re.split(r'(?<=[.!?])\s+', section)
for sentence in sentences:
if len(current_chunk) + len(sentence) < self.chunk_size:
current_chunk += " " + sentence
else:
if len(current_chunk) > self.min_chunk_size:
chunks.append({
'text': current_chunk.strip(),
'metadata': current_metadata,
'chunk_id': self._generate_chunk_id(current_chunk)
})
# Start new chunk with overlap
overlap_text = self._get_overlap(current_chunk)
current_chunk = overlap_text + sentence
# Don't forget the last chunk
if current_chunk and len(current_chunk) > self.min_chunk_size:
chunks.append({
'text': current_chunk.strip(),
'metadata': current_metadata,
'chunk_id': self._generate_chunk_id(current_chunk)
})
return chunks
def _get_overlap(self, text: str) -> str:
"""Extract overlap text from the end of current chunk"""
words = text.split()
overlap_words = int(self.chunk_overlap / 5) # Rough estimate
return " ".join(words[-overlap_words:]) if len(words) > overlap_words else ""
def _generate_chunk_id(self, text: str) -> str:
"""Generate unique ID for chunk"""
return hashlib.md5(text.encode()).hexdigest()[:8]
Advanced: Contextual Chunking
The method is called "Contextual Retrieval" and uses two sub-techniques: Contextual Embeddings and Contextual BM25. This method can reduce the number of failed retrievals by 49% and, when combined with reranking, by 67%.
Add context to each chunk before embedding:
def add_chunk_context(chunks: List[Dict], document_title: str) -> List[Dict]:
"""Add document context to each chunk for better retrieval"""
for i, chunk in enumerate(chunks):
# Build context from surrounding chunks
context_before = chunks[i-1]['text'][:200] if i > 0 else ""
context_after = chunks[i+1]['text'][:200] if i < len(chunks)-1 else ""
# Prepend contextual information
chunk['contextualized_text'] = f"""
Document: {document_title}
Section: {chunk.get('metadata', {}).get('header', 'Main content')}
Context: This chunk appears after discussing '{context_before}'
and before '{context_after}'.
Content: {chunk['text']}
"""
return chunks
Practical guardrails: Keep chunks self-contained, but not isolated. Preserve headings in metadata for better reranking and for citations in your final answer.
Step 3: Build Your Hybrid Retrieval Pipeline
Now combine BM25 with semantic search for the best of both worlds. But here's the trick: weight them dynamically based on query type.
import numpy as np
from sentence_transformers import SentenceTransformer
from typing import List, Tuple
import re
class HybridRetriever:
def __init__(self, documents: List[str]):
self.documents = documents
# Initialize BM25
self.bm25_retriever = SimpleBM25Retriever(documents)
# Initialize embedding model (use a small, fast one)
self.encoder = SentenceTransformer('all-MiniLM-L6-v2')
# Pre-compute document embeddings
self.doc_embeddings = self.encoder.encode(documents,
convert_to_tensor=True)
def detect_query_type(self, query: str) -> Tuple[float, float]:
"""Dynamically adjust weights based on query characteristics"""
# Check for exact match indicators
has_quotes = '"' in query
has_product_code = bool(re.search(r'\b[A-Z]{2,}-\d+\b', query))
has_specific_terms = any(term in query.lower()
for term in ['exact', 'specifically', 'model number'])
# Check for semantic indicators
has_semantic_words = any(word in query.lower()
for word in ['similar', 'like', 'about', 'related'])
is_question = query.strip().endswith('?')
# Calculate weights
if has_quotes or has_product_code or has_specific_terms:
# Favor BM25 for exact matches
return 0.7, 0.3
elif has_semantic_words or is_question:
# Favor semantic search
return 0.3, 0.7
else:
# Balanced approach
return 0.5, 0.5
def hybrid_search(self, query: str, top_k: int = 5) -> List[Dict]:
# Get dynamic weights
bm25_weight, semantic_weight = self.detect_query_type(query)
# BM25 search
bm25_results = self.bm25_retriever.search(query, top_k=top_k*2)
# Semantic search
query_embedding = self.encoder.encode(query, convert_to_tensor=True)
semantic_scores = np.dot(self.doc_embeddings, query_embedding)
top_semantic_idx = np.argsort(semantic_scores)[::-1][:top_k*2]
# Normalize and combine scores
scores_dict = {}
# Add BM25 scores
max_bm25 = max([r['score'] for r in bm25_results]) + 1e-6
for result in bm25_results:
idx = result['index']
normalized_score = result['score'] / max_bm25
scores_dict[idx] = bm25_weight * normalized_score
# Add semantic scores
max_semantic = semantic_scores.max() + 1e-6
for idx in top_semantic_idx:
normalized_score = semantic_scores[idx] / max_semantic
if idx in scores_dict:
scores_dict[idx] += semantic_weight * normalized_score
else:
scores_dict[idx] = semantic_weight * normalized_score
# Sort by combined score
sorted_indices = sorted(scores_dict.keys(),
key=lambda x: scores_dict[x],
reverse=True)[:top_k]
results = []
for idx in sorted_indices:
results.append({
'document': self.documents[idx],
'score': scores_dict[idx],
'index': idx,
'retrieval_method': 'hybrid'
})
return results
Why hybrid beats either alone
- BM25 gives you high-precision anchors for exact terms.
- Semantic vectors capture paraphrases and fuzzy intent.
- Dynamic weighting prevents you from hard-coding a one-size-fits-all α/β that degrades edge cases.
Debug trick: Log the chosen weights and top candidates for 100 queries. You’ll quickly see which patterns want BM25-heavy vs. semantic-heavy mixes.
Step 4: Add a Reranker (The Missing Performance Multiplier)
Cross-encoders are the standard models used for reranking in a RAG framework. Unlike retriever functions used in the initial retrieval step, which just take into account the similarity scores of different text chunks, cross-encoders are able to perform a more in-depth comparison of each of the retrieved text chunks with the user's query.
Most RAG implementations skip this crucial step. Don't.
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
class CrossEncoderReranker:
def __init__(self, model_name='cross-encoder/ms-marco-TinyBERT-L-2-v2'):
"""Initialize a lightweight cross-encoder for reranking"""
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForSequenceClassification.from_pretrained(model_name)
self.model.eval()
def rerank(self, query: str, documents: List[str], top_k: int = 3) -> List[Dict]:
"""Rerank documents using cross-encoder scoring"""
pairs = [[query, doc] for doc in documents]
# Tokenize all pairs
with torch.no_grad():
inputs = self.tokenizer(pairs,
padding=True,
truncation=True,
max_length=512,
return_tensors='pt')
# Get relevance scores
scores = self.model(**inputs).logits.squeeze(-1)
scores = torch.sigmoid(scores).numpy()
# Sort by score
sorted_indices = scores.argsort()[::-1][:top_k]
results = []
for idx in sorted_indices:
results.append({
'document': documents[idx],
'relevance_score': float(scores[idx]),
'original_rank': idx
})
return results
class LLMReranker:
"""Alternative: Use an LLM for reranking when precision is critical"""
def __init__(self, model_name="gpt-3.5-turbo"):
self.model = model_name
def rerank_with_llm(self, query: str, documents: List[str]) -> List[Dict]:
"""Use LLM to score relevance - more accurate but slower"""
prompt = f"""Given the query: "{query}"
Score each document's relevance from 0-10:
{chr(10).join([f"Document {i+1}: {doc[:200]}..." for i, doc in enumerate(documents)])}
Return only scores as: [score1, score2, ...]"""
# In production, call your LLM API here
# This is a placeholder for the pattern
scores = [7, 3, 9, 5, 8] # Mock scores
ranked_docs = []
for i, (doc, score) in enumerate(zip(documents, scores)):
ranked_docs.append({
'document': doc,
'llm_relevance': score,
'index': i
})
return sorted(ranked_docs, key=lambda x: x['llm_relevance'], reverse=True)
Pragmatic guidance
- Start with a tiny cross-encoder for cost/latency.
- Keep top-10 from retrieval, rerank to top-3, and pass those to generation.
- If latency allows, sample an LLM-based reranker on a subset to validate improvements before switching.
Step 5: Construct the Generation Chain
Now assemble everything into a production-ready pipeline. But here's where we diverge from typical tutorials—implement response caching and fallback strategies.
import hashlib
import json
from typing import Dict, List, Optional
import redis
class RAGPipeline:
def __init__(self,
documents: List[str],
use_cache: bool = True,
cache_ttl: int = 3600):
# Initialize components
self.hybrid_retriever = HybridRetriever(documents)
self.reranker = CrossEncoderReranker()
# Initialize cache (Redis for production, dict for development)
self.use_cache = use_cache
if use_cache:
try:
self.cache = redis.Redis(host='localhost', port=6379, db=0)
self.cache_ttl = cache_ttl
except:
# Fallback to in-memory cache
self.cache = {}
print("Redis not available, using in-memory cache")
def generate_cache_key(self, query: str, context: str) -> str:
"""Generate deterministic cache key"""
combined = f"{query}:{context[:500]}"
return hashlib.md5(combined.encode()).hexdigest()
def retrieve_and_generate(self,
query: str,
llm_function,
max_context_length: int = 2000) -> Dict:
# Check cache first
if self.use_cache:
cache_key = self.generate_cache_key(query, query)
cached = self._get_from_cache(cache_key)
if cached:
return cached
# Step 1: Hybrid retrieval
initial_results = self.hybrid_retriever.hybrid_search(query, top_k=10)
# Step 2: Reranking
documents = [r['document'] for r in initial_results]
reranked = self.reranker.rerank(query, documents, top_k=3)
# Step 3: Build context (with token limit management)
context = self._build_context(reranked, max_context_length)
# Step 4: Generate response
response = self._generate_with_fallback(query, context, llm_function)
# Cache the result
if self.use_cache:
self._cache_result(cache_key, response)
return response
def _build_context(self, documents: List[Dict], max_length: int) -> str:
"""Build context with smart truncation"""
context = ""
for i, doc in enumerate(documents):
doc_text = doc['document']
# Add source citation
formatted = f"\n[Source {i+1} - Relevance: {doc['relevance_score']:.2f}]\n{doc_text}\n"
# Check if adding this would exceed limit
if len(context) + len(formatted) > max_length:
# Truncate the last document to fit
remaining = max_length - len(context) - 50 # Buffer
if remaining > 100: # Only add if meaningful
formatted = formatted[:remaining] + "..."
context += formatted
break
context += formatted
return context
def _generate_with_fallback(self, query: str, context: str, llm_function) -> Dict:
"""Generate with fallback strategies"""
prompt = f"""Answer the question based on the context provided.
If the context doesn't contain the answer, say "I cannot find this information in the provided context."
Context:
{context}
Question: {query}
Answer:"""
try:
# Primary LLM call
response = llm_function(prompt)
# Validate response
if self._is_valid_response(response):
return {
'answer': response,
'context_used': context[:500],
'status': 'success'
}
else:
# Fallback to simpler extraction
return self._extractive_fallback(query, context)
except Exception as e:
# Ultimate fallback: extractive answer
return {
'answer': self._extractive_fallback(query, context)['answer'],
'context_used': context[:500],
'status': 'fallback',
'error': str(e)
}
def _extractive_fallback(self, query: str, context: str) -> Dict:
"""Simple extractive answer when generation fails"""
# Find the most relevant sentence
sentences = context.split('.')
query_words = set(query.lower().split())
best_sentence = ""
best_score = 0
for sentence in sentences:
sentence_words = set(sentence.lower().split())
overlap = len(query_words.intersection(sentence_words))
if overlap > best_score:
best_score = overlap
best_sentence = sentence.strip()
return {
'answer': best_sentence if best_sentence else "No relevant information found.",
'method': 'extractive_fallback'
}
def _is_valid_response(self, response: str) -> bool:
"""Validate LLM response"""
if not response or len(response) < 10:
return False
# Check for common failure patterns
failure_patterns = [
"i cannot assist",
"i don't have access",
"error occurred",
"context is unclear"
]
response_lower = response.lower()
return not any(pattern in response_lower for pattern in failure_patterns)
def _get_from_cache(self, key: str) -> Optional[Dict]:
"""Retrieve from cache"""
if isinstance(self.cache, dict):
return self.cache.get(key)
else:
cached = self.cache.get(key)
return json.loads(cached) if cached else None
def _cache_result(self, key: str, result: Dict):
"""Store in cache"""
if isinstance(self.cache, dict):
self.cache[key] = result
else:
self.cache.setex(key, self.cache_ttl, json.dumps(result))
Production notes
- Cache keys should include a hash of normalized query + top context IDs to avoid stale collisions.
- Always log which sources made it into the context; this is invaluable for debugging user complaints.
- Add a truthiness check in the prompt (“If not in context, say you can’t find it”) to curb hallucinations.
Step 6: Deploy Without the Cloud Tax
Instead of defaulting to expensive managed services, here's a production setup that runs on a $20/month VPS:
# docker-compose.yml for self-hosted RAG
"""
version: '3.8'
services:
qdrant:
image: qdrant/qdrant
ports:
- "6333:6333"
volumes:
- ./qdrant_storage:/qdrant/storage
redis:
image: redis:alpine
ports:
- "6379:6379"
volumes:
- ./redis_data:/data
api:
build: .
ports:
- "8000:8000"
environment:
- QDRANT_URL=http://qdrant:6333
- REDIS_URL=redis://redis:6379
depends_on:
- qdrant
- redis
"""
# FastAPI server with streaming responses
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
import asyncio
from typing import AsyncGenerator
app = FastAPI()
# Initialize your RAG pipeline
rag_pipeline = RAGPipeline(documents=load_your_documents())
@app.post("/chat")
async def chat_endpoint(query: str):
"""Streaming RAG endpoint"""
async def generate_stream() -> AsyncGenerator[str, None]:
# Retrieve context
context = rag_pipeline.hybrid_retriever.hybrid_search(query, top_k=5)
# Stream the response
response = rag_pipeline.retrieve_and_generate(
query,
llm_function=your_llm_function
)
# Stream response in chunks
for i in range(0, len(response['answer']), 20):
chunk = response['answer'][i:i+20]
yield f"data: {json.dumps({'text': chunk})}\n\n"
await asyncio.sleep(0.1) # Simulate streaming
return StreamingResponse(
generate_stream(),
media_type="text/event-stream"
)
@app.get("/health")
async def health_check():
return {"status": "healthy", "cache_size": len(rag_pipeline.cache)}
Production Optimization Tricks
- Use SQLite for Small-Scale Vector Storage
Instead of complex vector databases, SQLite with the sqlite-vss
extension handles millions of vectors efficiently:
import sqlite3
import sqlite_vss
import numpy as np
def setup_sqlite_vector_store():
conn = sqlite3.connect('vectors.db')
conn.enable_load_extension(True)
sqlite_vss.load(conn)
# Create vector table
conn.execute("""
CREATE VIRTUAL TABLE IF NOT EXISTS doc_vectors USING vss0(
embedding(384),
document_id INTEGER PRIMARY KEY,
document TEXT
)
""")
return conn
def search_vectors(conn, query_embedding, k=5):
results = conn.execute("""
SELECT document_id, document, distance
FROM doc_vectors
WHERE vss_search(embedding, ?)
LIMIT ?
""", (query_embedding.tobytes(), k))
return results.fetchall()
- Implement Request Batching
from collections import defaultdict
import asyncio
class BatchedRAG:
def __init__(self, rag_pipeline, batch_size=10, batch_timeout=0.1):
self.rag_pipeline = rag_pipeline
self.batch_size = batch_size
self.batch_timeout = batch_timeout
self.pending_requests = []
self.results = {}
async def process_query(self, query_id: str, query: str):
"""Add query to batch and wait for result"""
future = asyncio.Future()
self.pending_requests.append((query_id, query, future))
# Trigger batch processing if size reached
if len(self.pending_requests) >= self.batch_size:
await self._process_batch()
else:
# Schedule timeout-based processing
asyncio.create_task(self._timeout_trigger())
return await future
async def _process_batch(self):
"""Process all pending requests in batch"""
if not self.pending_requests:
return
batch = self.pending_requests[:self.batch_size]
self.pending_requests = self.pending_requests[self.batch_size:]
# Batch retrieve
queries = [q for _, q, _ in batch]
contexts = self._batch_retrieve(queries)
# Generate responses
for (query_id, query, future), context in zip(batch, contexts):
response = await self._generate_response(query, context)
future.set_result(response)
def _batch_retrieve(self, queries: List[str]) -> List[str]:
"""Efficient batch retrieval"""
# Encode all queries at once
all_results = []
for query in queries:
results = self.rag_pipeline.hybrid_retriever.hybrid_search(query)
all_results.append(results)
return all_results
Ops checklist: Add /health, /metrics, and /cache introspection endpoints. Log query → weights → candidates → reranked → final context for a 1% sample to investigate misses.
Performance Metrics That Actually Matter
Track these metrics in production:
- First Token Latency: Time to first streamed token (target: <500ms)
- Retrieval Precision@3: Are top 3 results relevant? (target: >80%)
- Cache Hit Rate: Percentage of cached responses (target: >30%)
- Fallback Rate: How often generation fails (target: <5%)
class RAGMetrics:
def __init__(self):
self.metrics = defaultdict(list)
def track_retrieval(self, query, retrieved_docs, relevant_docs):
"""Track retrieval metrics"""
# Precision@k
k = 3
top_k = retrieved_docs[:k]
relevant_in_top_k = sum(1 for doc in top_k if doc in relevant_docs)
precision_at_k = relevant_in_top_k / k
self.metrics['precision@3'].append(precision_at_k)
# Mean Reciprocal Rank (MRR)
for i, doc in enumerate(retrieved_docs):
if doc in relevant_docs:
self.metrics['mrr'].append(1 / (i + 1))
break
else:
self.metrics['mrr'].append(0)
def get_summary(self):
return {
metric: np.mean(values)
for metric, values in self.metrics.items()
}
Benchlike, not benchmark: Use your own gold labels. Ask SMEs to mark “good enough” vs. “wrong,” then optimize Precision@3 and MRR on that set. That’s how you avoid overfitting to public leaderboards that don’t represent your corpus.
The Uncomfortable Truth About RAG
Here's what vendors won't tell you: Include too much in a chunk and the vector loses the ability to be specific to anything it discusses. Include too little and you lose the context of the data. There's no perfect chunk size—it's all trade-offs.
The real optimization happens at the retrieval layer, not the LLM. A fast BM25 search with good reranking beats an expensive vector database with poor chunking every time.
Heuristic to live by: If reranking a smaller candidate pool gives a bigger lift than swapping LLMs, your bottleneck is retrieval—not generation.
What's Next?
- Test BM25 First: Before investing in vector infrastructure, establish a baseline with keyword search.
- Measure Real Queries: Log actual user queries and optimize for those patterns, not synthetic benchmarks.
- Consider GraphRAG: For relationship-heavy data, graph traversal beats semantic similarity.
- Cache Aggressively: Most queries follow a power law—cache the common ones.
- Monitor Fallbacks: When generation fails, your extractive fallback is your safety net.
Remember: The best RAG system is the one that ships. Start simple with BM25, add vectors when needed, and only adopt GraphRAG when relationships matter. Your users care about accuracy and speed, not your embedding model.