Skip to main content

Celery Task Retries and Error Handling

Distributed tasks fail for many reasons: network timeouts, database unavailability, external API errors, and resource exhaustion. Celery provides built-in retry mechanisms that automatically re-execute failed tasks with exponential backoff. Combining retries with proper error classification, monitoring, and fallback logic creates a resilient task system that recovers automatically from transient failures.

Understanding Task Failures

When a task raises an exception, Celery captures it. The task is marked as failed, and the exception is stored in the result backend. Without a retry strategy, the task is permanently lost. Retries allow Celery to automatically re-execute the task after a delay, increasing the odds of success.

Failures are either transient (temporary network glitch, database temporarily unavailable) or permanent (invalid input, authorization failure). Retries help with transient failures but waste resources on permanent ones. Good error handling distinguishes between the two.

Built-In Retry Mechanism

The simplest approach uses Celery's retry() method within a bound task:

@app.task(bind=True, max_retries=3)
def call_external_api(self, user_id):
"""Call an external API with automatic retries."""
try:
user = get_user(user_id)
response = requests.get(f'https://api.example.com/users/{user.external_id}', timeout=5)
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout as exc:
# Transient error: retry with exponential backoff
logger.warning(f'API timeout for user {user_id}, retrying (attempt {self.request.retries + 1})')
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
except requests.exceptions.HTTPError as exc:
# Classify the error
if 500 <= exc.response.status_code < 600:
# Server error: likely transient, retry
logger.warning(f'API returned {exc.response.status_code}, retrying')
raise self.retry(exc=exc, countdown=60)
else:
# Client error: likely permanent, don't retry
logger.error(f'Permanent API error: {exc.response.status_code}')
raise
except Exception as exc:
logger.error(f'Unexpected error: {exc}')
raise

In this example:

  • Timeout exceptions trigger a retry with exponential backoff: 2 ** retries seconds (1, 2, 4 seconds, etc.).
  • HTTP 5xx errors are assumed transient and retried after 60 seconds.
  • HTTP 4xx errors are assumed permanent and raised (task fails, no retry).
  • Other exceptions are logged and raised.

Exponential Backoff

Exponential backoff spaces retries over increasing intervals, reducing load on the target service during an outage and giving it time to recover:

@app.task(bind=True, max_retries=5)
def fetch_resource(self, resource_id):
"""Fetch a resource with exponential backoff."""
try:
return requests.get(f'https://api.example.com/{resource_id}').json()
except requests.RequestException as exc:
# Exponential backoff: 1s, 2s, 4s, 8s, 16s
countdown = 2 ** self.request.retries
logger.warning(f'Fetch failed, retrying in {countdown}s (attempt {self.request.retries + 1}/{self.max_retries})')
raise self.retry(exc=exc, countdown=countdown)

With max_retries=5, retries occur at 1, 2, 4, 8, and 16 seconds—a total of ~31 seconds before permanent failure. Add jitter to avoid thundering herd (all tasks retrying simultaneously):

import random

@app.task(bind=True, max_retries=5)
def fetch_with_jitter(self, resource_id):
try:
return requests.get(f'https://api.example.com/{resource_id}').json()
except requests.RequestException as exc:
countdown = (2 ** self.request.retries) + random.randint(0, 1)
raise self.retry(exc=exc, countdown=countdown)

Handling Different Error Types

Classify errors at the source and handle each category appropriately:

from celery.exceptions import SoftTimeLimitExceeded

@app.task(bind=True, max_retries=3, soft_time_limit=30)
def process_order(self, order_id):
"""Process an order with differentiated error handling."""
try:
order = Order.get(order_id)
if not order.is_valid():
# Validation error: permanent, don't retry
raise ValueError(f'Order {order_id} is invalid')

# Attempt payment
payment_result = charge_credit_card(order.payment_info)
return {'status': 'success', 'order_id': order_id}

except ValueError as e:
# Input validation: permanent failure
logger.error(f'Validation error for order {order_id}: {e}')
raise

except PaymentGatewayTimeout as e:
# Payment gateway timeout: likely transient
logger.warning(f'Payment timeout for order {order_id}, retrying')
raise self.retry(exc=e, countdown=60)

except PaymentDeclined as e:
# Payment declined: permanent, but retry-worthy (customer might retry with different card)
logger.warning(f'Payment declined for order {order_id}')
raise self.retry(exc=e, countdown=3600, max_retries=1) # Retry once, after 1 hour

except SoftTimeLimitExceeded:
# Task took too long: cleanup and retry
logger.warning(f'Task timeout for order {order_id}, retrying')
cleanup_partial_payment(order_id)
raise self.retry(countdown=120)

except Exception as e:
# Unexpected error: log and escalate
logger.exception(f'Unexpected error processing order {order_id}')
raise

Circuit Breaker Pattern

A circuit breaker prevents repeated requests to a failing service by temporarily blocking calls:

from datetime import datetime, timedelta
import redis

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def is_circuit_open(service_name, threshold=5, timeout=60):
"""Check if a service's circuit is open."""
key = f'circuit_breaker:{service_name}'
failure_count = int(redis_client.get(key) or 0)
if failure_count >= threshold:
logger.warning(f'Circuit breaker open for {service_name}')
return True
return False

def record_failure(service_name, timeout=60):
"""Record a service failure and increment the counter."""
key = f'circuit_breaker:{service_name}'
redis_client.incr(key)
redis_client.expire(key, timeout)

def reset_circuit(service_name):
"""Reset the circuit breaker for a service."""
key = f'circuit_breaker:{service_name}'
redis_client.delete(key)

@app.task(bind=True, max_retries=2)
def call_flaky_api(self, data):
"""Call an external API with circuit breaker protection."""
service_name = 'external_api'

if is_circuit_open(service_name):
logger.error(f'Circuit breaker open for {service_name}, failing fast')
raise Exception(f'{service_name} is unavailable')

try:
response = requests.get('https://api.example.com/endpoint', json=data, timeout=5)
response.raise_for_status()
reset_circuit(service_name)
return response.json()
except Exception as exc:
record_failure(service_name)
logger.warning(f'API call failed, circuit breaker updated')
raise self.retry(exc=exc, countdown=30)

After 5 failures within 60 seconds, the circuit opens and subsequent calls fail immediately without attempting a request, saving resources and reducing cascade failures.

Dead Letter Queue (DLQ)

When a task exhausts all retries, it permanently fails. Capture these for later investigation:

@app.task(bind=True, max_retries=3)
def unreliable_operation(self, data):
try:
return perform_operation(data)
except Exception as exc:
if self.request.retries < self.max_retries:
raise self.retry(exc=exc, countdown=60)
else:
# Final failure: route to DLQ
logger.error(f'Task {self.request.id} permanently failed, sending to DLQ')
dlq_task.delay(task_id=self.request.id, data=data, error=str(exc))
raise

@app.task
def dlq_task(task_id, data, error):
"""Log failed task for manual inspection."""
FailedTask.objects.create(
celery_task_id=task_id,
data=data,
error=error,
)
logger.info(f'DLQ: Logged failed task {task_id}')

Key Takeaways

  • Use self.retry() in bound tasks to automatically re-execute on failure
  • Classify errors: transient (retry) vs. permanent (fail immediately)
  • Implement exponential backoff to space retries and reduce load: countdown = 2 ** retries
  • Add jitter to prevent thundering herd when many tasks retry simultaneously
  • Use circuit breakers to fail fast when a service is down
  • Capture permanently failed tasks in a dead letter queue for investigation
  • Combine soft and hard time limits to handle timeouts gracefully

Frequently Asked Questions

What's the difference between soft and hard time limits?

A soft time limit (e.g., 30 seconds) raises SoftTimeLimitExceeded inside the task, allowing cleanup. A hard time limit (e.g., 35 seconds) forcefully terminates the worker process. Always set hard > soft, and catch SoftTimeLimitExceeded to gracefully abort.

Can I retry a task manually from the result backend?

Yes. Call task.retry() if you have the task object, or use celery_app.send_task('task_name', args=(...)) to requeue manually. In production, automate retry via Celery's built-in mechanism.

How long does a task stay in the result backend?

By default, results expire after 1 day. Configure with result_expires in Celery config. For long-term tracking of failed tasks, use a database (like the DLQ pattern above).

Should I retry on every exception?

No. Retry only transient errors (network, timeout, 5xx). Don't retry validation errors, authorization failures, or bugs—these will fail every time. Log and escalate permanent errors.

How do I test retry logic?

Mock the external service to raise exceptions. Use @patch to inject failures and verify that self.retry() is called with correct countdown values. Test circuit breaker state transitions separately.

Further Reading