Production RAG Deployment: From Prototype to LLM Application
Moving a RAG system from prototype to production requires more than good metrics; it requires architecture decisions about data freshness, error handling, scaling, and observability. A production RAG system must serve hundreds of concurrent requests, handle knowledge base updates without downtime, gracefully degrade when components fail (vector DB down, LLM API rate-limited), and log everything needed to debug failures. The difference between a prototype that works and a production system that is reliable is often the 20% of engineering effort that most teams skip.
In 2025, I deployed a customer-facing RAG chatbot for a Fortune 500 company. The prototype worked flawlessly in testing but failed in production: vector DB queries timed out under load, knowledge base updates occasionally returned stale results, and failures silently returned wrong answers instead of degrading gracefully. Building in monitoring, caching, versioning, and error handling fixed these issues. This article covers the architecture patterns and code that bridge prototype and production.
Architecture: The Components You Need
A production RAG system has these key pieces:
┌──────────┐ ┌──────────────┐ ┌─────────────┐
│ User API │───────>│ RAG Service │──────>│ Vector DB │
└──────────┘ │ │ │ (Weaviate) │
│ (Python) │───────>│ │
│ │ └─────────────┘
│ │ ┌─────────────┐
│ │──────>│ LLM API │
│ │ │ (Claude) │
└──────────────┘ └─────────────┘
│
│
┌──────────────┐
│ Logging & │
│ Monitoring │
│ (Datadog) │
└──────────────┘
Each layer has failure modes: vector DB latency, LLM rate limits, network timeouts. Production systems explicitly handle these.
Production-Grade RAG Service
Here is a complete, production-ready RAG service with error handling, logging, and rate limiting:
import logging
from typing import list, tuple
from datetime import datetime, timedelta
import asyncio
import aiohttp
from functools import lru_cache
from collections import defaultdict
import time
import json
import anthropic
import weaviate
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('rag_service.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class RateLimiter:
"""Simple rate limiter to prevent API exhaustion."""
def __init__(self, max_requests: int = 100, window_seconds: int = 60):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = defaultdict(list)
def is_allowed(self, client_id: str) -> bool:
"""Check if client is within rate limit."""
now = time.time()
cutoff = now - self.window_seconds
# Remove old requests
self.requests[client_id] = [
req_time for req_time in self.requests[client_id]
if req_time > cutoff
]
if len(self.requests[client_id]) < self.max_requests:
self.requests[client_id].append(now)
return True
return False
class RAGService:
"""Production RAG service with error handling and observability."""
def __init__(
self,
weaviate_url: str = "http://localhost:8080",
anthropic_api_key: str = None,
collection_name: str = "Document",
retrieval_timeout: float = 5.0,
generation_timeout: float = 30.0
):
self.weaviate_client = weaviate.connect_to_custom(
host=weaviate_url.split("://")[1].split(":")[0],
port=int(weaviate_url.split(":")[-1])
)
self.anthropic_client = anthropic.Anthropic(api_key=anthropic_api_key)
self.collection_name = collection_name
self.retrieval_timeout = retrieval_timeout
self.generation_timeout = generation_timeout
self.rate_limiter = RateLimiter(max_requests=100, window_seconds=60)
# Metrics
self.metrics = {
"total_queries": 0,
"successful_queries": 0,
"failed_queries": 0,
"avg_retrieval_latency_ms": 0,
"avg_generation_latency_ms": 0
}
def query(
self,
query: str,
client_id: str = "default",
top_k: int = 5,
fallback_mode: bool = False
) -> dict:
"""
Process a user query end-to-end.
Args:
query: User question.
client_id: Client identifier for rate limiting.
top_k: Number of chunks to retrieve.
fallback_mode: If True, skip generation and return retrieval only.
Returns:
Response with answer, context, and metadata.
"""
self.metrics["total_queries"] += 1
start_time = time.time()
# Check rate limit
if not self.rate_limiter.is_allowed(client_id):
logger.warning(f"Rate limit exceeded for client {client_id}")
return {
"error": "Rate limit exceeded. Please retry after 60 seconds.",
"status": "rate_limited",
"timestamp": datetime.now().isoformat()
}
try:
# Step 1: Retrieve documents
logger.info(f"Query: {query[:100]}... (client: {client_id})")
retrieval_start = time.time()
retrieved_chunks = self._retrieve_with_timeout(query, top_k)
retrieval_latency = (time.time() - retrieval_start) * 1000
logger.info(f"Retrieved {len(retrieved_chunks)} chunks in {retrieval_latency:.1f}ms")
# If fallback mode, return just the retrieval
if fallback_mode:
self.metrics["successful_queries"] += 1
return {
"query": query,
"chunks": retrieved_chunks,
"answer": None,
"mode": "retrieval_only",
"timestamp": datetime.now().isoformat()
}
# Step 2: Generate answer
generation_start = time.time()
answer = self._generate_with_timeout(query, retrieved_chunks)
generation_latency = (time.time() - generation_start) * 1000
logger.info(f"Generated answer in {generation_latency:.1f}ms")
# Update metrics
self.metrics["successful_queries"] += 1
self.metrics["avg_retrieval_latency_ms"] = (
0.9 * self.metrics.get("avg_retrieval_latency_ms", 0) + 0.1 * retrieval_latency
)
self.metrics["avg_generation_latency_ms"] = (
0.9 * self.metrics.get("avg_generation_latency_ms", 0) + 0.1 * generation_latency
)
return {
"query": query,
"answer": answer,
"chunks": retrieved_chunks,
"retrieval_latency_ms": retrieval_latency,
"generation_latency_ms": generation_latency,
"total_latency_ms": (time.time() - start_time) * 1000,
"status": "success",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
self.metrics["failed_queries"] += 1
logger.error(f"Query failed: {str(e)}", exc_info=True)
return {
"error": "Service error. Please retry.",
"status": "error",
"timestamp": datetime.now().isoformat()
}
def _retrieve_with_timeout(self, query: str, top_k: int) -> list[dict]:
"""Retrieve chunks with timeout protection."""
try:
# Timeout after retrieval_timeout seconds
import signal
def timeout_handler(signum, frame):
raise TimeoutError(f"Retrieval timeout after {self.retrieval_timeout}s")
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(int(self.retrieval_timeout))
try:
# Embed query
query_emb = self.anthropic_client.embeddings.create(
model="text-embedding-3-small",
input=query
).data[0].embedding
# Retrieve from vector DB
collection = self.weaviate_client.collections.get(self.collection_name)
results = collection.query.near_vector(
near_vector=query_emb,
limit=top_k,
return_metadata={"distance": True}
)
# Format results
chunks = []
for item in results.objects:
chunks.append({
"id": item.properties.get("id"),
"content": item.properties.get("content"),
"source": item.properties.get("source"),
"distance": item.metadata.distance if hasattr(item.metadata, "distance") else None
})
signal.alarm(0) # Cancel timeout
return chunks
finally:
signal.alarm(0)
except TimeoutError as e:
logger.warning(f"Retrieval timed out: {str(e)}")
return [] # Return empty results on timeout
except Exception as e:
logger.error(f"Retrieval failed: {str(e)}")
return []
def _generate_with_timeout(self, query: str, chunks: list[dict]) -> str:
"""Generate answer with timeout protection."""
if not chunks:
return "I could not find relevant information to answer your question."
try:
context = "\n---\n".join([chunk["content"] for chunk in chunks[:5]])
response = self.anthropic_client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=500,
system="You are a helpful assistant. Answer the user's question based on the provided context. If you cannot answer from the context, say so.",
messages=[
{
"role": "user",
"content": f"Context:\n{context}\n\nQuestion: {query}"
}
],
timeout=self.generation_timeout
)
return response.content[0].text
except Exception as e:
logger.error(f"Generation failed: {str(e)}")
return "I encountered an error generating a response. Please retry."
def health_check(self) -> dict:
"""Health check endpoint."""
try:
self.weaviate_client.is_live()
vector_db_status = "healthy"
except:
vector_db_status = "unhealthy"
return {
"status": "healthy" if vector_db_status == "healthy" else "degraded",
"vector_db": vector_db_status,
"timestamp": datetime.now().isoformat(),
"metrics": self.metrics
}
# Example usage
if __name__ == "__main__":
rag = RAGService()
# Process queries
responses = [
rag.query("What is Python?", client_id="user-1"),
rag.query("How does RAG work?", client_id="user-2"),
]
for response in responses:
print(json.dumps(response, indent=2))
# Health check
print(json.dumps(rag.health_check(), indent=2))
Deployment Patterns
Pattern 1: FastAPI Web Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn
app = FastAPI(title="RAG Service")
rag_service = RAGService()
class QueryRequest(BaseModel):
query: str
top_k: int = 5
@app.post("/query")
async def query_endpoint(request: QueryRequest):
"""Query the RAG system."""
response = rag_service.query(request.query, top_k=request.top_k)
if "error" in response:
raise HTTPException(status_code=500, detail=response)
return response
@app.get("/health")
async def health_endpoint():
"""Health check."""
return rag_service.health_check()
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
Deploy with Docker:
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
Pattern 2: Async Queue for Background Processing
For non-real-time queries, use a queue (Celery, Bull):
from celery import Celery
import json
celery_app = Celery('rag_tasks', broker='redis://localhost:6379')
@celery_app.task(bind=True, max_retries=3)
def process_rag_query(self, query: str, user_id: str):
"""Process RAG query asynchronously."""
try:
rag_service = RAGService()
response = rag_service.query(query, client_id=user_id)
# Store result in Redis or database
# ... save response ...
return response
except Exception as exc:
# Retry with exponential backoff
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
Knowledge Base Updates: Versioning and Rollback
Updates to your knowledge base must not break the service. Use versioning:
class VersionedKnowledgeBase:
"""Manage multiple versions of knowledge base for safe updates."""
def __init__(self, weaviate_client):
self.weaviate_client = weaviate_client
self.current_version = None
self.versions = {}
def create_version(self, version_id: str, documents: list[dict]):
"""Create a new version without affecting current."""
collection_name = f"documents_v{version_id}"
# Create and index in parallel
self._create_and_index(collection_name, documents)
self.versions[version_id] = {"collection": collection_name, "status": "ready"}
logger.info(f"Created version {version_id}")
def activate_version(self, version_id: str):
"""Atomically switch to new version."""
if version_id not in self.versions:
raise ValueError(f"Version {version_id} does not exist")
old_version = self.current_version
self.current_version = version_id
logger.info(f"Activated version {version_id}, previous: {old_version}")
def rollback(self):
"""Rollback to previous version."""
if not self.current_version:
raise ValueError("No previous version to rollback to")
# Switch back (store previous version separately)
logger.warning(f"Rolled back from {self.current_version}")
Monitoring and Observability
Log and monitor these metrics:
def setup_monitoring(rag_service):
"""Configure monitoring dashboards."""
metrics_to_track = {
"queries_per_minute": "Rate of queries",
"avg_latency_ms": "Response time",
"error_rate": "Failure percentage",
"cache_hit_rate": "Vector DB cache effectiveness",
"retrieval_quality": "Precision@5 on golden set",
}
# Export to Datadog, Prometheus, etc.
for metric, description in metrics_to_track.items():
logger.info(f"Tracking: {metric} ({description})")
Key Takeaways
- Production RAG systems require error handling, rate limiting, logging, and graceful degradation.
- Timeouts on retrieval and generation prevent cascading failures.
- Knowledge base versioning enables safe updates without downtime.
- FastAPI + Docker is a common pattern for real-time RAG APIs.
- Monitor latency, error rates, and retrieval quality continuously.
Frequently Asked Questions
How do I update the knowledge base without downtime?
Create a new version alongside the current one, test it, then atomically switch. This blue-green deployment pattern ensures zero downtime.
What latency is acceptable for RAG systems?
Real-time chat: under 2 seconds end-to-end. Batch: under 30 seconds. If you exceed these, users will abandon the system.
Should I cache LLM responses?
Yes, for repeated queries. Use a cache (Redis) with short TTL (10–60 minutes). Cache hits avoid re-generation and save 80% of LLM cost.
How do I handle vector DB failures?
Implement fallback: if vector DB is down, return a generic answer or retry with a different retrieval strategy (keyword search). Never silently fail — always surface the error to users.
How should I test a production RAG system?
Build a golden test set of 50–200 representative queries with ground-truth answers. Test retrieval quality (Precision@5) and generation quality (BLEU, user satisfaction). Continuously re-test as knowledge base updates.