Skip to main content

Graceful Degradation: Services That Fail Elegantly

Graceful degradation is the art of keeping your service operational and useful even when parts of it fail. Instead of returning a 500 error when a recommendation engine is down, show users cached popular items. When the payment service is unreachable, let them complete their checkout and process payments in a background job once service recovers. When a database replica fails, use the primary with slightly increased latency instead of crashing. This article covers fallback strategies, feature flags, read-only modes, and other patterns that ensure your API remains functional and valuable even under partial failure.

I worked on an e-commerce platform where a search reranking service regularly became unresponsive due to ML model load. The naive fix was to return 503 when it failed. The graceful fix: if reranking times out, return unranked (but still relevant) results from the base search engine. Conversion rate dropped only 2% during outages instead of the 30% loss we saw with 503 errors. Users didn't even notice the service was degraded.

Graceful Degradation Strategies

Degradation takes many forms:

Fallback to Cache: Return yesterday's data instead of querying a failing service.

Feature Reduction: Disable advanced features (recommendations, sorting) but keep basic functionality (search, list).

Read-Only Mode: Accept queries but reject writes to protect a struggling database.

Load Shedding: Serve requests from a subset of expensive operations, rejecting others with a queue.

Async Processing: Accept a request, process later, return a job ID instead of waiting for a failing service.

Fallback to Cached Data

When a dependency fails, fall back to cached version:

import redis
import json
import logging

logger = logging.getLogger(__name__)

class APIWithFallback:
def __init__(self, redis_client, timeout: float = 5):
self.redis = redis_client
self.timeout = timeout

def get_user_preferences(self, user_id: int) -> dict:
"""Fetch user preferences with cached fallback."""
cache_key = f'user_prefs:{user_id}'

try:
# Try to fetch fresh data
prefs = fetch_from_preferences_service(user_id, timeout=self.timeout)

# Cache it for fallback
self.redis.setex(cache_key, 3600, json.dumps(prefs))
return prefs

except Exception as e:
# Preferences service failed
logger.warning(f"Preferences service failed: {e}")

# Return cached version
cached = self.redis.get(cache_key)
if cached:
logger.info(f"Returning cached preferences for user {user_id}")
return json.loads(cached)

# No cache; return defaults (graceful degradation)
logger.info(f"No cache for user {user_id}; using defaults")
return {
'theme': 'light',
'language': 'en',
'notifications': True
}

# Usage
prefs_api = APIWithFallback(redis_client)

@app.get('/users/{user_id}/preferences')
def get_preferences(user_id: int):
prefs = prefs_api.get_user_preferences(user_id)
return prefs

When preferences service fails, users get cached preferences or defaults. No 500 error, no frozen UI.

Feature Flags for Degradation

Feature flags let you disable expensive features when the system is stressed:

import time
from enum import Enum

class FeatureFlag(Enum):
RECOMMENDATIONS = "recommendations"
ADVANCED_SEARCH = "advanced_search"
PERSONALIZATION = "personalization"

class FeatureFlagManager:
"""Simple in-memory feature flag system."""

def __init__(self, redis_client):
self.redis = redis_client

def is_enabled(self, flag: FeatureFlag) -> bool:
"""Check if a feature is enabled."""
key = f'feature_flag:{flag.value}'
cached = self.redis.get(key)

if cached:
return cached.decode() == 'true'

# Default: enabled
return True

def disable(self, flag: FeatureFlag, duration_seconds: int = 3600):
"""Temporarily disable a feature."""
key = f'feature_flag:{flag.value}'
self.redis.setex(key, duration_seconds, 'false')
logging.warning(f"Feature {flag.value} disabled for {duration_seconds}s")

flags = FeatureFlagManager(redis_client)

@app.get('/search')
def search(query: str, include_recommendations: bool = True):
"""Search with optional recommendations."""
results = basic_search(query)

# Only add recommendations if feature is enabled and service is healthy
if include_recommendations and flags.is_enabled(FeatureFlag.RECOMMENDATIONS):
try:
recommendations = get_recommendations(query, timeout=2)
results['recommendations'] = recommendations
except Exception as e:
logger.warning(f"Recommendations failed: {e}")
# Gracefully skip recommendations; search still works

return results

When the recommendation engine times out, disable the flag and subsequent requests skip recommendations. Users still get search results.

Read-Only Mode for Database Failures

When writes are slow or failing, switch to read-only mode:

class DatabaseWithReadOnlyMode:
def __init__(self, redis_client):
self.redis = redis_client
self.read_only_key = 'db:read_only'

def is_read_only(self) -> bool:
"""Check if database is in read-only mode."""
return self.redis.exists(self.read_only_key)

def set_read_only(self, duration_seconds: int = 300):
"""Enable read-only mode."""
self.redis.setex(self.read_only_key, duration_seconds, '1')
logging.warning(f"Database in read-only mode for {duration_seconds}s")

def execute_write(self, query: str, params: dict):
"""Execute a write, or queue it if read-only."""
if self.is_read_only():
# Queue for later
job_id = queue_write(query, params)
return {
'status': 'queued',
'job_id': job_id,
'message': 'Database temporarily read-only. Your changes will be saved shortly.'
}

# Normal write
return execute_query(query, params)

db = DatabaseWithReadOnlyMode(redis_client)

@app.post('/articles/{article_id}')
def update_article(article_id: int, data: dict):
"""Update article; queue if database is struggling."""
result = db.execute_write(
"UPDATE articles SET content = %(content)s WHERE id = %(id)s",
{'content': data['content'], 'id': article_id}
)

if result.get('status') == 'queued':
return result, 202 # Accepted
else:
return result, 200

# Monitor database latency; enable read-only if too slow
def monitor_database_health():
while True:
start = time.time()
try:
db.execute_read("SELECT 1")
latency_ms = (time.time() - start) * 1000

if latency_ms > 1000: # Queries taking >1s
db.set_read_only(duration_seconds=300)
else:
redis_client.delete('db:read_only')
except Exception:
db.set_read_only(duration_seconds=60)

time.sleep(5)

# Run monitor in background thread
import threading
threading.Thread(target=monitor_database_health, daemon=True).start()

If database latency exceeds a threshold, new writes are queued for later processing. Users get a clear message, and the database has time to recover without being overwhelmed.

Async Processing: Accept, Process Later

For operations that are failing synchronously, accept the request and process asynchronously:

import uuid
import celery

app = FastAPI()
celery_app = celery.Celery('tasks')

class JobQueue:
"""Queue jobs for later processing."""

def __init__(self, redis_client):
self.redis = redis_client

def enqueue_job(self, job_type: str, data: dict) -> str:
"""Queue a job; return job ID."""
job_id = str(uuid.uuid4())
self.redis.hset(f'job:{job_id}', mapping={
'type': job_type,
'data': json.dumps(data),
'status': 'pending',
'created_at': datetime.utcnow().isoformat()
})
return job_id

def get_job_status(self, job_id: str) -> dict:
"""Get job status."""
job = self.redis.hgetall(f'job:{job_id}')
return job if job else None

job_queue = JobQueue(redis_client)

@app.post('/reports/generate')
def generate_report(report_type: str):
"""
Generate a report asynchronously.
Instead of waiting 30 seconds for the report engine to finish,
return a job ID immediately.
"""
job_id = job_queue.enqueue_job('report_generation', {
'type': report_type,
'user_id': current_user.id
})

# Trigger background processing
process_report.delay(job_id)

return {
'status': 'processing',
'job_id': job_id,
'check_url': f'/jobs/{job_id}'
}, 202 # Accepted

@app.get('/jobs/{job_id}')
def get_job_status(job_id: str):
"""Check status of a job."""
job = job_queue.get_job_status(job_id)

if not job:
return {'error': 'Job not found'}, 404

return job

@celery_app.task
def process_report(job_id: str):
"""Process report asynchronously."""
job_queue.redis.hset(f'job:{job_id}', 'status', 'running')

try:
report = generate_report(job_type=job_queue.get_job_status(job_id)['data']['type'])
job_queue.redis.hset(f'job:{job_id}', mapping={
'status': 'completed',
'result': json.dumps(report)
})
except Exception as e:
job_queue.redis.hset(f'job:{job_id}', mapping={
'status': 'failed',
'error': str(e)
})

Users get an immediate response with a job ID. They can check progress at /jobs/{job_id} while the report processes in the background. No timeouts, no failed requests.

Comprehensive Degradation Example

Here's a complete example combining all strategies:

from fastapi import FastAPI, HTTPException
import logging

app = FastAPI()
logger = logging.getLogger(__name__)

class ResilientServiceOrchestrator:
"""Orchestrate multiple services with graceful degradation."""

def __init__(self, redis_client):
self.redis = redis_client
self.flags = FeatureFlagManager(redis_client)
self.db = DatabaseWithReadOnlyMode(redis_client)

def get_product_page(self, product_id: int):
"""Fetch product page with multiple fallback strategies."""

# 1. Get basic product data (critical path)
try:
product = fetch_from_catalog_service(product_id, timeout=2)
except Exception as e:
logger.warning(f"Catalog service failed: {e}")
# Fallback to cache
cached = self.redis.get(f'product:{product_id}')
if cached:
product = json.loads(cached)
else:
raise HTTPException(status_code=503, detail="Product unavailable")

# 2. Add recommendations (optional, feature-flagged)
product['recommendations'] = []
if self.flags.is_enabled(FeatureFlag.RECOMMENDATIONS):
try:
recs = get_recommendations(product_id, timeout=1)
product['recommendations'] = recs
except Exception as e:
logger.warning(f"Recommendations failed: {e}")
# No recommendations; that's ok

# 3. Add reviews (optional)
product['reviews'] = []
if self.flags.is_enabled(FeatureFlag.REVIEWS):
try:
reviews = get_reviews(product_id, timeout=1)
product['reviews'] = reviews
except Exception as e:
logger.warning(f"Reviews failed: {e}")
# No reviews; still useful

# 4. Cache for next request
self.redis.setex(f'product:{product_id}', 3600, json.dumps(product))

return product

@app.get('/products/{product_id}')
def get_product(product_id: int):
orchestrator = ResilientServiceOrchestrator(redis_client)
return orchestrator.get_product_page(product_id)

@app.post('/products/{product_id}')
def update_product(product_id: int, data: dict):
orchestrator = ResilientServiceOrchestrator(redis_client)

# Write with potential queueing if database is slow
result = orchestrator.db.execute_write(
"UPDATE products SET name = %(name)s WHERE id = %(id)s",
{'name': data['name'], 'id': product_id}
)

if result.get('status') == 'queued':
# Cache invalidation queued too
invalidate_product_cache.delay(product_id)
else:
# Immediate cache invalidation
redis_client.delete(f'product:{product_id}')

return result

Key Takeaways

  • Graceful degradation keeps users productive even during partial outages. A read-only version of your site is better than a 503 error.
  • Use feature flags to disable expensive features under load. Users get reduced functionality, not broken functionality.
  • Fall back to cached data when services fail. Always cache successful responses for exactly this scenario.
  • Queue writes when the database is slow instead of rejecting them. Process them asynchronously once the database recovers.
  • Monitor dependency health and switch modes (read-only, feature flags) automatically before users hit errors.

Frequently Asked Questions

How do I know when to degrade vs. fail?

Degrade when partial functionality is still valuable: search without recommendations, product page without reviews. Fail when you can't provide basic functionality: missing product data. If you can't compute the answer, return useful cached/default data instead of an error.

Should degradation be automatic or manual?

Automatic when possible. Monitor dependency latency and health; enable read-only mode or disable features programmatically. Manual override for planned degradation: maintenance windows, deliberate feature reduction during Black Friday overload.

What is the difference between graceful degradation and circuit breaking?

Circuit breaker stops retrying a failing service to prevent cascades. Graceful degradation provides reduced functionality instead of an error. Both are essential: circuit breaker prevents damage, graceful degradation maintains user value.

Can I measure the impact of degradation?

Yes. Track metrics: conversion rate during recommended vs. non-recommended product pages, user engagement with read-only features. Most users don't notice modest degradation (<5% difference).

How long should I queue writes during read-only mode?

Until the database recovers. Monitor latency; exit read-only mode when P99 latency falls below normal. Queue time should be hours if the database is seriously struggling. If queue backs up, implement load shedding: reject new writes after queue size exceeds threshold.

Further Reading