Graceful Degradation: Services That Fail Elegantly
Graceful degradation is the art of keeping your service operational and useful even when parts of it fail. Instead of returning a 500 error when a recommendation engine is down, show users cached popular items. When the payment service is unreachable, let them complete their checkout and process payments in a background job once service recovers. When a database replica fails, use the primary with slightly increased latency instead of crashing. This article covers fallback strategies, feature flags, read-only modes, and other patterns that ensure your API remains functional and valuable even under partial failure.
I worked on an e-commerce platform where a search reranking service regularly became unresponsive due to ML model load. The naive fix was to return 503 when it failed. The graceful fix: if reranking times out, return unranked (but still relevant) results from the base search engine. Conversion rate dropped only 2% during outages instead of the 30% loss we saw with 503 errors. Users didn't even notice the service was degraded.
Graceful Degradation Strategies
Degradation takes many forms:
Fallback to Cache: Return yesterday's data instead of querying a failing service.
Feature Reduction: Disable advanced features (recommendations, sorting) but keep basic functionality (search, list).
Read-Only Mode: Accept queries but reject writes to protect a struggling database.
Load Shedding: Serve requests from a subset of expensive operations, rejecting others with a queue.
Async Processing: Accept a request, process later, return a job ID instead of waiting for a failing service.
Fallback to Cached Data
When a dependency fails, fall back to cached version:
import redis
import json
import logging
logger = logging.getLogger(__name__)
class APIWithFallback:
def __init__(self, redis_client, timeout: float = 5):
self.redis = redis_client
self.timeout = timeout
def get_user_preferences(self, user_id: int) -> dict:
"""Fetch user preferences with cached fallback."""
cache_key = f'user_prefs:{user_id}'
try:
# Try to fetch fresh data
prefs = fetch_from_preferences_service(user_id, timeout=self.timeout)
# Cache it for fallback
self.redis.setex(cache_key, 3600, json.dumps(prefs))
return prefs
except Exception as e:
# Preferences service failed
logger.warning(f"Preferences service failed: {e}")
# Return cached version
cached = self.redis.get(cache_key)
if cached:
logger.info(f"Returning cached preferences for user {user_id}")
return json.loads(cached)
# No cache; return defaults (graceful degradation)
logger.info(f"No cache for user {user_id}; using defaults")
return {
'theme': 'light',
'language': 'en',
'notifications': True
}
# Usage
prefs_api = APIWithFallback(redis_client)
@app.get('/users/{user_id}/preferences')
def get_preferences(user_id: int):
prefs = prefs_api.get_user_preferences(user_id)
return prefs
When preferences service fails, users get cached preferences or defaults. No 500 error, no frozen UI.
Feature Flags for Degradation
Feature flags let you disable expensive features when the system is stressed:
import time
from enum import Enum
class FeatureFlag(Enum):
RECOMMENDATIONS = "recommendations"
ADVANCED_SEARCH = "advanced_search"
PERSONALIZATION = "personalization"
class FeatureFlagManager:
"""Simple in-memory feature flag system."""
def __init__(self, redis_client):
self.redis = redis_client
def is_enabled(self, flag: FeatureFlag) -> bool:
"""Check if a feature is enabled."""
key = f'feature_flag:{flag.value}'
cached = self.redis.get(key)
if cached:
return cached.decode() == 'true'
# Default: enabled
return True
def disable(self, flag: FeatureFlag, duration_seconds: int = 3600):
"""Temporarily disable a feature."""
key = f'feature_flag:{flag.value}'
self.redis.setex(key, duration_seconds, 'false')
logging.warning(f"Feature {flag.value} disabled for {duration_seconds}s")
flags = FeatureFlagManager(redis_client)
@app.get('/search')
def search(query: str, include_recommendations: bool = True):
"""Search with optional recommendations."""
results = basic_search(query)
# Only add recommendations if feature is enabled and service is healthy
if include_recommendations and flags.is_enabled(FeatureFlag.RECOMMENDATIONS):
try:
recommendations = get_recommendations(query, timeout=2)
results['recommendations'] = recommendations
except Exception as e:
logger.warning(f"Recommendations failed: {e}")
# Gracefully skip recommendations; search still works
return results
When the recommendation engine times out, disable the flag and subsequent requests skip recommendations. Users still get search results.
Read-Only Mode for Database Failures
When writes are slow or failing, switch to read-only mode:
class DatabaseWithReadOnlyMode:
def __init__(self, redis_client):
self.redis = redis_client
self.read_only_key = 'db:read_only'
def is_read_only(self) -> bool:
"""Check if database is in read-only mode."""
return self.redis.exists(self.read_only_key)
def set_read_only(self, duration_seconds: int = 300):
"""Enable read-only mode."""
self.redis.setex(self.read_only_key, duration_seconds, '1')
logging.warning(f"Database in read-only mode for {duration_seconds}s")
def execute_write(self, query: str, params: dict):
"""Execute a write, or queue it if read-only."""
if self.is_read_only():
# Queue for later
job_id = queue_write(query, params)
return {
'status': 'queued',
'job_id': job_id,
'message': 'Database temporarily read-only. Your changes will be saved shortly.'
}
# Normal write
return execute_query(query, params)
db = DatabaseWithReadOnlyMode(redis_client)
@app.post('/articles/{article_id}')
def update_article(article_id: int, data: dict):
"""Update article; queue if database is struggling."""
result = db.execute_write(
"UPDATE articles SET content = %(content)s WHERE id = %(id)s",
{'content': data['content'], 'id': article_id}
)
if result.get('status') == 'queued':
return result, 202 # Accepted
else:
return result, 200
# Monitor database latency; enable read-only if too slow
def monitor_database_health():
while True:
start = time.time()
try:
db.execute_read("SELECT 1")
latency_ms = (time.time() - start) * 1000
if latency_ms > 1000: # Queries taking >1s
db.set_read_only(duration_seconds=300)
else:
redis_client.delete('db:read_only')
except Exception:
db.set_read_only(duration_seconds=60)
time.sleep(5)
# Run monitor in background thread
import threading
threading.Thread(target=monitor_database_health, daemon=True).start()
If database latency exceeds a threshold, new writes are queued for later processing. Users get a clear message, and the database has time to recover without being overwhelmed.
Async Processing: Accept, Process Later
For operations that are failing synchronously, accept the request and process asynchronously:
import uuid
import celery
app = FastAPI()
celery_app = celery.Celery('tasks')
class JobQueue:
"""Queue jobs for later processing."""
def __init__(self, redis_client):
self.redis = redis_client
def enqueue_job(self, job_type: str, data: dict) -> str:
"""Queue a job; return job ID."""
job_id = str(uuid.uuid4())
self.redis.hset(f'job:{job_id}', mapping={
'type': job_type,
'data': json.dumps(data),
'status': 'pending',
'created_at': datetime.utcnow().isoformat()
})
return job_id
def get_job_status(self, job_id: str) -> dict:
"""Get job status."""
job = self.redis.hgetall(f'job:{job_id}')
return job if job else None
job_queue = JobQueue(redis_client)
@app.post('/reports/generate')
def generate_report(report_type: str):
"""
Generate a report asynchronously.
Instead of waiting 30 seconds for the report engine to finish,
return a job ID immediately.
"""
job_id = job_queue.enqueue_job('report_generation', {
'type': report_type,
'user_id': current_user.id
})
# Trigger background processing
process_report.delay(job_id)
return {
'status': 'processing',
'job_id': job_id,
'check_url': f'/jobs/{job_id}'
}, 202 # Accepted
@app.get('/jobs/{job_id}')
def get_job_status(job_id: str):
"""Check status of a job."""
job = job_queue.get_job_status(job_id)
if not job:
return {'error': 'Job not found'}, 404
return job
@celery_app.task
def process_report(job_id: str):
"""Process report asynchronously."""
job_queue.redis.hset(f'job:{job_id}', 'status', 'running')
try:
report = generate_report(job_type=job_queue.get_job_status(job_id)['data']['type'])
job_queue.redis.hset(f'job:{job_id}', mapping={
'status': 'completed',
'result': json.dumps(report)
})
except Exception as e:
job_queue.redis.hset(f'job:{job_id}', mapping={
'status': 'failed',
'error': str(e)
})
Users get an immediate response with a job ID. They can check progress at /jobs/{job_id} while the report processes in the background. No timeouts, no failed requests.
Comprehensive Degradation Example
Here's a complete example combining all strategies:
from fastapi import FastAPI, HTTPException
import logging
app = FastAPI()
logger = logging.getLogger(__name__)
class ResilientServiceOrchestrator:
"""Orchestrate multiple services with graceful degradation."""
def __init__(self, redis_client):
self.redis = redis_client
self.flags = FeatureFlagManager(redis_client)
self.db = DatabaseWithReadOnlyMode(redis_client)
def get_product_page(self, product_id: int):
"""Fetch product page with multiple fallback strategies."""
# 1. Get basic product data (critical path)
try:
product = fetch_from_catalog_service(product_id, timeout=2)
except Exception as e:
logger.warning(f"Catalog service failed: {e}")
# Fallback to cache
cached = self.redis.get(f'product:{product_id}')
if cached:
product = json.loads(cached)
else:
raise HTTPException(status_code=503, detail="Product unavailable")
# 2. Add recommendations (optional, feature-flagged)
product['recommendations'] = []
if self.flags.is_enabled(FeatureFlag.RECOMMENDATIONS):
try:
recs = get_recommendations(product_id, timeout=1)
product['recommendations'] = recs
except Exception as e:
logger.warning(f"Recommendations failed: {e}")
# No recommendations; that's ok
# 3. Add reviews (optional)
product['reviews'] = []
if self.flags.is_enabled(FeatureFlag.REVIEWS):
try:
reviews = get_reviews(product_id, timeout=1)
product['reviews'] = reviews
except Exception as e:
logger.warning(f"Reviews failed: {e}")
# No reviews; still useful
# 4. Cache for next request
self.redis.setex(f'product:{product_id}', 3600, json.dumps(product))
return product
@app.get('/products/{product_id}')
def get_product(product_id: int):
orchestrator = ResilientServiceOrchestrator(redis_client)
return orchestrator.get_product_page(product_id)
@app.post('/products/{product_id}')
def update_product(product_id: int, data: dict):
orchestrator = ResilientServiceOrchestrator(redis_client)
# Write with potential queueing if database is slow
result = orchestrator.db.execute_write(
"UPDATE products SET name = %(name)s WHERE id = %(id)s",
{'name': data['name'], 'id': product_id}
)
if result.get('status') == 'queued':
# Cache invalidation queued too
invalidate_product_cache.delay(product_id)
else:
# Immediate cache invalidation
redis_client.delete(f'product:{product_id}')
return result
Key Takeaways
- Graceful degradation keeps users productive even during partial outages. A read-only version of your site is better than a 503 error.
- Use feature flags to disable expensive features under load. Users get reduced functionality, not broken functionality.
- Fall back to cached data when services fail. Always cache successful responses for exactly this scenario.
- Queue writes when the database is slow instead of rejecting them. Process them asynchronously once the database recovers.
- Monitor dependency health and switch modes (read-only, feature flags) automatically before users hit errors.
Frequently Asked Questions
How do I know when to degrade vs. fail?
Degrade when partial functionality is still valuable: search without recommendations, product page without reviews. Fail when you can't provide basic functionality: missing product data. If you can't compute the answer, return useful cached/default data instead of an error.
Should degradation be automatic or manual?
Automatic when possible. Monitor dependency latency and health; enable read-only mode or disable features programmatically. Manual override for planned degradation: maintenance windows, deliberate feature reduction during Black Friday overload.
What is the difference between graceful degradation and circuit breaking?
Circuit breaker stops retrying a failing service to prevent cascades. Graceful degradation provides reduced functionality instead of an error. Both are essential: circuit breaker prevents damage, graceful degradation maintains user value.
Can I measure the impact of degradation?
Yes. Track metrics: conversion rate during recommended vs. non-recommended product pages, user engagement with read-only features. Most users don't notice modest degradation (<5% difference).
How long should I queue writes during read-only mode?
Until the database recovers. Monitor latency; exit read-only mode when P99 latency falls below normal. Queue time should be hours if the database is seriously struggling. If queue backs up, implement load shedding: reject new writes after queue size exceeds threshold.
Further Reading
- Release It! - Michael T. Nygard — Comprehensive guide to building resilient systems.
- AWS Well-Architected Framework - Reliability Pillar — Graceful degradation patterns.
- Resilience4j Patterns — Reference implementation (Java, but patterns apply universally).
- Netflix Hystrix (archived) — Lessons from Netflix's resilience library.