Production Agent Architecture: Scaling Systems
Building agents for personal experiments is straightforward. Building agents that run 24/7, handle thousands of requests, recover gracefully from failures, and provide visibility into their operation is an entirely different challenge. Production agent systems require robust architecture: async task queues, distributed tracing, retry logic, health checks, cost monitoring, and fallback mechanisms. This final article teaches you to design and deploy reliable, scalable agent systems that perform under real-world stress.
A production agent system is not just a bigger version of a prototype. It's a system where failures are expected, monitored, and recovered from automatically. It's observable: every decision the agent makes is logged and queryable. It's resilient: when an external service fails, the agent gracefully degrades rather than crashes. And it's cost-aware: expensive API calls are budgeted and optimized.
Architecture Overview
A production agent system has these components:
User Requests
│
▼
┌──────────────────────┐
│ API Gateway │ (Authentication, rate limiting, routing)
│ (FastAPI/Flask) │
└──────────┬───────────┘
│
┌──────▼───────┐
│ Task Queue │ (Celery/RQ: decouple request from processing)
└──────┬───────┘
│
┌──────▼──────────────────────┐
│ Agent Worker Pool │ (Processes run agent logic)
│ (Multiple instances) │
└──────┬──────────────────────┘
│
┌──────▼─────────────┐
│ External Services │ (APIs, databases, caches)
│ (With retries) │
└────────────────────┘
Monitoring Layer (logs, metrics, traces)
Here's how it works:
- API Gateway receives a request, validates it, and enqueues it
- Task Queue stores the request for async processing
- Idle workers pick up tasks from the queue
- Each worker runs an agent instance that executes tools
- Results are stored and returned to the user
- Monitoring layers track success, failures, costs, and latency
Async Task Queue Implementation
Use Celery for distributed task processing:
from celery import Celery, Task
from kombu import Exchange, Queue
import anthropic
import json
from datetime import timedelta
# Configure Celery
celery_app = Celery('agent_system')
celery_app.conf.update(
broker_url='redis://localhost:6379/0',
result_backend='redis://localhost:6379/0',
task_serializer='json',
accept_content=['json'],
result_expires=3600,
task_track_started=True,
worker_prefetch_multiplier=1, # Process one task at a time
worker_max_tasks_per_child=100, # Restart workers periodically
)
# Define queues by priority
celery_app.conf.task_queues = (
Queue('high_priority', Exchange('high_priority'), routing_key='high'),
Queue('default', Exchange('default'), routing_key='default'),
Queue('low_priority', Exchange('low_priority'), routing_key='low'),
)
# Define task routes
celery_app.conf.task_routes = {
'tasks.run_agent_urgent': {'queue': 'high_priority'},
'tasks.run_agent': {'queue': 'default'},
'tasks.run_agent_batch': {'queue': 'low_priority'},
}
class AgentTask(Task):
"""Base task for agent operations with error handling."""
autoretry_for = (Exception,)
retry_kwargs = {'max_retries': 3}
retry_backoff = True
retry_backoff_max = 600
retry_jitter = True
@celery_app.task(base=AgentTask, bind=True)
def run_agent(self, user_request: str, tools: list, session_id: str) -> dict:
"""
Execute an agent asynchronously.
Args:
self: Celery task context
user_request: User's message to the agent
tools: Available tools
session_id: Request session ID for logging
Returns:
Dict with result, cost, and metadata
"""
try:
self.update_state(state='PROCESSING', meta={'current': 'Agent reasoning...'})
client = anthropic.Anthropic()
messages = [{"role": "user", "content": user_request}]
total_cost = 0
iterations = 0
max_iterations = 10
for iteration in range(max_iterations):
iterations += 1
# Call Claude
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=2048,
tools=tools,
messages=messages
)
# Track cost (simplified; use Anthropic's actual billing)
input_tokens = getattr(response.usage, 'input_tokens', 0)
output_tokens = getattr(response.usage, 'output_tokens', 0)
total_cost += (input_tokens * 0.003 + output_tokens * 0.015) / 1000000
if response.stop_reason == "end_turn":
final_answer = next(
(block.text for block in response.content if hasattr(block, 'text')),
"No answer"
)
return {
"success": True,
"result": final_answer,
"iterations": iterations,
"cost": total_cost,
"session_id": session_id
}
# Handle tool calls
messages.append({"role": "assistant", "content": response.content})
tool_results = []
for block in response.content:
if block.type == "tool_use":
try:
result = execute_tool_with_retry(block.name, block.input)
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": result
})
except Exception as e:
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": f"Tool error: {e}",
"is_error": True
})
messages.append({"role": "user", "content": tool_results})
self.update_state(state='PROCESSING', meta={
'current': f'Step {iteration + 1}/{max_iterations}'
})
return {
"success": False,
"error": "Max iterations exceeded",
"iterations": iterations,
"cost": total_cost,
"session_id": session_id
}
except Exception as e:
# Log detailed error for debugging
import traceback
error_log = traceback.format_exc()
return {
"success": False,
"error": str(e),
"error_log": error_log,
"session_id": session_id
}
def execute_tool_with_retry(tool_name: str, params: dict, max_retries: int = 3) -> str:
"""Execute a tool with exponential backoff retry."""
import time
for attempt in range(max_retries):
try:
# Simulate tool execution
if tool_name == "search_web":
return f"Search results for '{params.get('query')}': ..."
elif tool_name == "database_query":
return f"Query result: ..."
else:
raise ValueError(f"Unknown tool: {tool_name}")
except Exception as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff
time.sleep(wait_time)
else:
raise
# Usage
from celery.result import AsyncResult
# Enqueue a task
result = run_agent.delay(
user_request="What's the weather in NYC?",
tools=[...],
session_id="session_123"
)
# Check status
task_result = AsyncResult(result.id)
print(f"Task state: {task_result.state}")
print(f"Progress: {task_result.info}")
# Get result when ready
if task_result.ready():
output = task_result.get()
print(f"Result: {output}")
Observability and Monitoring
Make agents observable with structured logging and metrics:
import logging
from datetime import datetime
import json
from prometheus_client import Counter, Histogram, Gauge
# Setup structured logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/agents.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger("agent_system")
# Prometheus metrics
agent_requests = Counter(
'agent_requests_total',
'Total agent requests',
['status', 'tool']
)
agent_duration = Histogram(
'agent_request_duration_seconds',
'Agent request latency',
['tool']
)
agent_cost = Histogram(
'agent_cost_usd',
'Cost per request',
['tool'],
buckets=[0.01, 0.05, 0.10, 0.50, 1.00]
)
tool_errors = Counter(
'agent_tool_errors_total',
'Tool call failures',
['tool_name', 'error_type']
)
class ObservableAgent:
"""Agent with built-in observability."""
def __init__(self, agent_id: str):
self.agent_id = agent_id
self.logger = logger
def run(self, user_request: str, tools: list, session_id: str):
"""Run agent with comprehensive logging."""
import time
start_time = time.time()
self.logger.info(json.dumps({
"event": "agent_start",
"agent_id": self.agent_id,
"session_id": session_id,
"request_preview": user_request[:100],
"num_tools": len(tools)
}))
try:
# Run agent logic...
result = {}
duration = time.time() - start_time
# Record metrics
agent_requests.labels(status='success', tool=self.agent_id).inc()
agent_duration.labels(tool=self.agent_id).observe(duration)
if 'cost' in result:
agent_cost.labels(tool=self.agent_id).observe(result['cost'])
self.logger.info(json.dumps({
"event": "agent_complete",
"agent_id": self.agent_id,
"session_id": session_id,
"duration_seconds": duration,
"iterations": result.get('iterations'),
"cost": result.get('cost'),
"success": result.get('success')
}))
return result
except Exception as e:
duration = time.time() - start_time
agent_requests.labels(status='error', tool=self.agent_id).inc()
self.logger.error(json.dumps({
"event": "agent_error",
"agent_id": self.agent_id,
"session_id": session_id,
"error": str(e),
"duration_seconds": duration
}))
raise
Health Checks and Readiness
Implement health checks for production deployments:
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
import redis
app = FastAPI()
redis_client = redis.Redis(host='localhost', port=6379)
@app.get("/health")
def health_check():
"""Liveness probe: is the service running?"""
return {"status": "alive"}
@app.get("/ready")
def readiness_check():
"""Readiness probe: can the service handle requests?"""
checks = {}
# Check Redis connection (task queue backend)
try:
redis_client.ping()
checks["redis"] = "ok"
except Exception as e:
checks["redis"] = f"error: {e}"
# Check API key is configured
import os
if os.getenv("ANTHROPIC_API_KEY"):
checks["anthropic_api"] = "ok"
else:
checks["anthropic_api"] = "error: no API key"
# Determine overall readiness
all_ok = all(v == "ok" for v in checks.values())
if not all_ok:
raise HTTPException(status_code=503, detail=checks)
return {"status": "ready", "checks": checks}
@app.get("/metrics")
def metrics():
"""Expose Prometheus metrics."""
from prometheus_client import generate_latest
return generate_latest()
Graceful Degradation and Fallbacks
Handle failures elegantly:
class RobustAgent:
"""Agent with fallback strategies."""
def run(self, user_request: str, tools: list) -> str:
"""Run with fallbacks."""
# Try primary model first
try:
return self.run_with_model(
user_request,
tools,
model="claude-3-5-sonnet-20241022"
)
except Exception as e:
logger.warning(f"Primary model failed: {e}")
# Fallback to cheaper, faster model
try:
logger.info("Falling back to Claude Haiku")
return self.run_with_model(
user_request,
tools,
model="claude-3-haiku-20250305"
)
except Exception as e:
logger.warning(f"Haiku model failed: {e}")
# Final fallback: return a helpful message
return (
"I'm currently experiencing high load. "
"Try again in a few moments or contact support."
)
def run_with_model(self, request: str, tools: list, model: str) -> str:
client = anthropic.Anthropic()
# ... run agent with specified model
return result
Key Takeaways
- Production agents require async task queues (Celery/RQ) to decouple request handling from processing
- Structured logging and Prometheus metrics provide visibility into agent behavior and costs
- Health checks and readiness probes enable Kubernetes and load balancers to manage agents intelligently
- Graceful degradation (fallback models, degraded functionality) improves uptime
- Comprehensive error handling and retry logic make agents resilient to transient failures
- Cost monitoring at the request level enables budgeting and optimization
Frequently Asked Questions
How many agent worker processes do I need?
Start with 2-4 and autoscale based on queue depth and latency. Each worker is CPU-bound (waiting for API responses), so 1-2 per CPU core is typical.
What if the task queue goes down?
Implement dead-letter queues: failed tasks go to a separate queue for later retry. Or use multi-broker redundancy (Redis + RabbitMQ).
How do I debug a failed agent in production?
Log request/response at each step. Include session IDs in all logs. Use distributed tracing (OpenTelemetry) to follow requests across services. Replay failed sessions in a staging environment.
Should I use async/await within the agent loop?
Agents typically wait for LLM API responses, which are network I/O. Use async to maximize throughput: multiple agents can run concurrently while waiting for APIs.
How do I handle users canceling requests?
Track task IDs. Implement a cancel endpoint that revokes the Celery task. Return an HTTP 202 status initially, then let users poll for results via task ID.