Skip to main content

Celery in Production: Configuration, Security, and Best Practices

Production Celery requires hardening beyond basic setup: securing the broker, managing secrets, configuring logging, implementing fault tolerance, and automating deployment. A production-ready system handles failures gracefully, scales with demand, and provides visibility into task execution and system health.

Configuration Management

Use environment-based configuration for secrets and deployment-specific settings. Never hardcode credentials:

import os
from kombu import Queue, Exchange

class CeleryConfig:
"""Production-safe Celery configuration."""

# Broker and backend from environment
broker_url = os.getenv('CELERY_BROKER_URL', 'redis://localhost:6379/0')
result_backend = os.getenv('CELERY_RESULT_BACKEND', 'redis://localhost:6379/1')

# Serialization (JSON is safer than pickle)
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']

# Timezone and time
timezone = os.getenv('TZ', 'UTC')
enable_utc = True

# Task execution limits
task_track_started = True
task_send_sent_signal = True
task_acks_late = True # Ack after task completes, not when received
task_reject_on_worker_lost = True # Reject if worker crashes
worker_max_tasks_per_child = int(os.getenv('CELERY_MAX_TASKS_PER_CHILD', 1000))

# Timeouts
task_soft_time_limit = int(os.getenv('CELERY_SOFT_TIME_LIMIT', 600)) # 10 minutes
task_time_limit = int(os.getenv('CELERY_TIME_LIMIT', 900)) # 15 minutes

# Queues
task_queues = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('critical', Exchange('critical'), routing_key='critical', priority=10),
Queue('batch', Exchange('batch'), routing_key='batch', priority=1),
)
task_default_queue = 'default'

# Routing by task name
task_routes = {
'tasks.send_email': {'queue': 'critical', 'priority': 9},
'tasks.process_payment': {'queue': 'critical', 'priority': 10},
'tasks.batch_*': {'queue': 'batch', 'priority': 1},
}

# Result expiry
result_expires = 3600 # Expire results after 1 hour

# Broker connection pooling
broker_pool_limit = 10
broker_connection_retry = True
broker_connection_retry_on_startup = True
broker_connection_max_retries = 10

from celery import Celery

app = Celery('myapp')
app.config_from_object(CeleryConfig)

Securing the Broker

For production, secure the broker against unauthorized access:

Redis Security:

# Set a password and bind to localhost only
redis-server --requirepass mysecurepassword --bind 127.0.0.1

Use a connection string with password:

broker_url = 'redis://:mysecurepassword@localhost:6379/0'

Or use Redis Sentinel for high availability:

from redis.sentinel import Sentinel

broker_url = 'sentinel://localhost:26379/0'

RabbitMQ Security:

# Create a restricted user (not 'guest')
sudo rabbitmqctl add_user celery_user securepassword
sudo rabbitmqctl set_permissions -p / celery_user ".*" ".*" ".*"

Use AMQP URL with credentials:

broker_url = 'amqp://celery_user:securepassword@rabbitmq-host:5672//'

Enable TLS for encrypted broker connections:

broker_url = 'amqps://celery_user:securepassword@rabbitmq-host:5671//'
broker_use_ssl = {
'keyfile': '/etc/ssl/private/key.pem',
'certfile': '/etc/ssl/certs/cert.pem',
'ca_certs': '/etc/ssl/certs/ca.pem',
'cert_reqs': ssl.CERT_REQUIRED,
}

Task Result Security

Store task results securely, especially for sensitive data:

# Use encryption for results
result_backend = 'redis://localhost:6379/1'
result_compression = 'gzip' # Compress to save space
result_expires = 3600 # Auto-expire after 1 hour

# For sensitive data, avoid storing results in persistent storage
# Instead, use in-memory backends or store in a database with encryption
result_backend = None # Disable result backend if not needed

# Or use a custom result backend with encryption
from kombu.serialization import json_loads, json_dumps

app.conf.update(
result_serializer='json',
result_compression='gzip',
task_ignore_result=False, # Set True if you don't need results
)

Logging and Observability

Configure structured logging for production:

import logging
import logging.config

LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'verbose': {
'format': '[{levelname}] {asctime} {name} {message}',
'style': '{',
},
'json': {
'format': '{"timestamp": "%(asctime)s", "level": "%(levelname)s", "message": "%(message)s"}',
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'verbose',
},
'file': {
'class': 'logging.handlers.RotatingFileHandler',
'filename': '/var/log/celery/worker.log',
'maxBytes': 10485760, # 10 MB
'backupCount': 10,
'formatter': 'verbose',
},
},
'loggers': {
'celery': {
'handlers': ['console', 'file'],
'level': os.getenv('LOG_LEVEL', 'INFO'),
'propagate': False,
},
'tasks': {
'handlers': ['console', 'file'],
'level': os.getenv('LOG_LEVEL', 'INFO'),
},
},
}

logging.config.dictConfig(LOGGING_CONFIG)

Deployment with Systemd

Create a systemd service file for Celery workers:

# /etc/systemd/system/celery-worker.service
[Unit]
Description=Celery Worker
After=network.target redis.service

[Service]
Type=forking
User=celery
Group=celery
WorkingDirectory=/opt/myapp
Environment="CELERY_BROKER_URL=redis://localhost:6379/0"
Environment="CELERY_RESULT_BACKEND=redis://localhost:6379/1"
ExecStart=/usr/local/bin/celery -A celery_app worker \
-l info \
--pidfile=/var/run/celery/%n.pid \
--logfile=/var/log/celery/%n%I.log

# Restart policy
Restart=on-failure
RestartSec=10

[Install]
WantedBy=multi-user.target

Enable and start:

sudo systemctl enable celery-worker.service
sudo systemctl start celery-worker.service
sudo systemctl status celery-worker.service

Docker Deployment

Create a Docker image for Celery workers:

FROM python:3.11-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

CMD ["celery", "-A", "celery_app", "worker", "-l", "info"]

Deploy with Docker Compose:

version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
command: redis-server --appendonly yes

celery_worker:
build: .
environment:
CELERY_BROKER_URL: redis://redis:6379/0
CELERY_RESULT_BACKEND: redis://redis:6379/1
depends_on:
- redis
deploy:
replicas: 3
resources:
limits:
cpus: '1'
memory: 512M

celery_beat:
build: .
command: celery -A celery_app beat -l info
environment:
CELERY_BROKER_URL: redis://redis:6379/0
depends_on:
- redis

Monitoring and Alerting

Use Flower for real-time monitoring and set up alerts:

import requests
from celery import signals

@signals.task_failure.connect
def task_failed(sender=None, task_id=None, exc=None, **kwargs):
"""Alert on task failure."""
message = f'Task {sender.name} failed: {exc}'
requests.post('https://hooks.slack.com/services/YOUR/WEBHOOK/URL', json={
'text': message,
})

@signals.worker_shutdown.connect
def worker_shutdown(sender=None, **kwargs):
"""Alert when a worker dies."""
message = f'Celery worker {sender} shut down unexpectedly'
requests.post('https://hooks.slack.com/services/YOUR/WEBHOOK/URL', json={
'text': message,
})

Or use Prometheus metrics:

pip install celery-prometheus-exporter
celery-prometheus-exporter --broker redis://localhost:6379/0

Configure Prometheus scrape:

scrape_configs:
- job_name: 'celery'
static_configs:
- targets: ['localhost:8888']

High Availability and Failover

For mission-critical tasks, implement redundancy:

Broker Failover (Redis Sentinel):

from redis.sentinel import Sentinel

sentinels = [('sentinel-1', 26379), ('sentinel-2', 26379), ('sentinel-3', 26379)]
sentinel = Sentinel(sentinels)
master = sentinel.master_for('mymaster', socket_timeout=0.1)

broker_url = f'redis://{master.connection_pool.connection_kwargs["host"]}:{master.connection_pool.connection_kwargs["port"]}/0'

Worker Redundancy: Run multiple workers across different machines. If one crashes, others pick up the work. Use a process supervisor (systemd, supervisord) to restart crashed workers.

Dead Letter Queue: Capture permanently failed tasks for manual inspection:

@app.task(bind=True, max_retries=3)
def critical_task(self, data):
try:
return process(data)
except Exception as exc:
if self.request.retries < self.max_retries:
raise self.retry(exc=exc, countdown=60)
else:
# Send to DLQ
FailedTask.objects.create(
task_name=self.name,
task_id=self.request.id,
args=self.request.args,
kwargs=self.request.kwargs,
error=str(exc),
)
logger.error(f'Task {self.request.id} sent to DLQ')

Production Checklist

  • ✓ Use environment variables for broker URL, secrets, and configuration
  • ✓ Secure the broker: password, TLS, restricted access
  • ✓ Configure task timeouts and limits to prevent runaway tasks
  • ✓ Set task_acks_late=True to ensure tasks are retried on worker crash
  • ✓ Enable compression and appropriate serialization
  • ✓ Configure structured logging to a persistent file
  • ✓ Use systemd or Docker to manage worker lifecycle
  • ✓ Run Flower for real-time monitoring and alerting
  • ✓ Set up alerts for task failures and worker outages
  • ✓ Implement a dead letter queue for permanently failed tasks
  • ✓ Use a persistent scheduler (django-celery-beat) for scheduled tasks
  • ✓ Test failover: broker and worker restart scenarios
  • ✓ Monitor broker disk space and clean up old results
  • ✓ Rate-limit or circuit-break flaky external APIs
  • ✓ Document runbooks for common failure scenarios

Key Takeaways

  • Externalize configuration: use environment variables for secrets, broker URLs, and timeouts
  • Secure the broker with passwords, TLS, and network isolation
  • Configure task acks late and timeouts to ensure reliability
  • Use structured logging and route logs to persistent storage
  • Deploy workers with systemd or Docker for automatic restart on failure
  • Monitor with Flower and set up alerts for failures
  • Implement dead letter queues for failed task investigation
  • Run multiple worker instances for redundancy and load balancing
  • Test failover scenarios: broker restart, worker crash, network partition

Frequently Asked Questions

How do I rotate Celery logs?

Use RotatingFileHandler in Python logging or systemd's journal with rotation. Docker logs are rotated by the Docker daemon (configure in /etc/docker/daemon.json).

Should I enable task_ignore_result?

Set to True if you don't need task results (fire-and-forget tasks). This saves storage and improves performance. Keep False for tasks where you check status or retrieve results.

How do I handle database connection failures in tasks?

Use retries with exponential backoff. Wrap database operations with try-except and retry on connection errors. Ensure your ORM uses connection pooling.

Can I run Celery and Beat on the same instance in production?

Yes, but run them as separate systemd services or processes for clarity and independent restart. If one crashes, you can restart it without affecting the other.

How do I debug a task that's hanging?

Use soft_time_limit to raise an exception inside the task, allowing cleanup. Log the traceback and check for deadlocks or infinite loops. Use the worker logs and Flower to identify which task is hanging.

Further Reading