Monitoring Celery with Flower: Real-Time Insights
Flower is a web-based real-time monitoring tool for Celery. It displays task execution, worker status, queue lengths, latency, and failure rates in a user-friendly dashboard. In production, Flower is essential for diagnosing bottlenecks, spotting failures, and understanding system health at a glance.
Installing and Starting Flower
Install Flower via pip:
pip install flower
Start Flower, pointing it to your Celery app:
celery -A celery_app flower
Flower binds to http://localhost:5555 by default. Open a browser and you'll see a live dashboard with worker statuses, active tasks, and queue metrics.
For production, run Flower on a separate server with authentication and HTTPS:
celery -A celery_app flower \
--port=5555 \
--broker=redis://localhost:6379/0 \
--basic_auth=admin:password123 \
--log_file=/var/log/flower.log
Dashboard Overview
The Flower dashboard has several main pages:
Workers Tab: Shows all registered workers, their status (online/offline), CPU/memory usage, and task counts (active, reserved, processed). A red status indicates a dead worker; green indicates healthy.
Tasks Tab: Lists all tasks executed in the current session: task name, state (success/failure/pending), execution time, worker assigned, and arguments. Filter by state or task name.
Queues Tab: Displays queue names, their lengths, and consumption rate (tasks per second).
Events Tab: A real-time log of Celery events: task execution, worker status changes, errors. Useful for debugging.
Broker Tab: Broker statistics (connection count, memory, uptime) and queue depth.
Programmatic Monitoring
Beyond the UI, Flower exposes monitoring data via HTTP API. Celery also provides an inspect API for runtime monitoring:
from celery import current_app
# Inspect active tasks
inspect = current_app.control.inspect()
# Get active tasks on all workers
active_tasks = inspect.active()
for worker_name, tasks in active_tasks.items():
print(f'{worker_name}: {len(tasks)} active')
for task in tasks:
print(f' - {task["name"]} (started {task["time_start"]}s ago)')
# Get registered tasks per worker
registered_tasks = inspect.registered()
for worker_name, tasks in registered_tasks.items():
print(f'{worker_name}: {len(tasks)} registered tasks')
# Get worker statistics
stats = inspect.stats()
for worker_name, stat in stats.items():
print(f'{worker_name}: pool={stat["pool"]["max-concurrency"]} workers')
# Check if workers are alive
ping = inspect.ping()
for worker_name, result in ping.items():
print(f'{worker_name}: {"alive" if result else "dead"}')
Setting Up Alerts
Monitor metrics and trigger alerts when thresholds are exceeded. Use Flower's integration with monitoring tools:
Prometheus Integration: Expose Celery metrics in Prometheus format:
pip install celery-prometheus-exporter
celery-prometheus-exporter --broker redis://localhost:6379/0
Then scrape the exporter in your Prometheus config:
scrape_configs:
- job_name: celery
static_configs:
- targets: ['localhost:8888']
Custom Monitoring Script: Poll Celery periodically and alert on anomalies:
import time
from celery import current_app
def monitor_celery_health():
"""Monitor Celery health and alert on issues."""
inspect = current_app.control.inspect()
while True:
stats = inspect.stats()
active = inspect.active()
# Alert if no workers are alive
if not stats:
print('CRITICAL: No workers are alive!')
send_alert('Celery workers down')
# Alert if queue is backed up
for worker, task_list in active.items():
if len(task_list) > 100:
print(f'WARNING: {worker} has {len(task_list)} active tasks (backlog)')
send_alert(f'Task backlog on {worker}')
# Alert if a worker hasn't processed tasks recently
for worker, stat in stats.items():
if stat['total'] == 0:
print(f'WARNING: {worker} has never processed a task')
time.sleep(60)
def send_alert(message):
"""Send alert via email, Slack, etc."""
print(f'ALERT: {message}')
# Integration with monitoring tools (e.g., Slack, PagerDuty)
Run this in a background thread or separate process for continuous monitoring.
Tracking Task Latency
Monitor how long tasks take from enqueue to completion:
import time
from celery import signals
from celery_app import app
task_start_times = {}
@signals.before_task_publish.connect
def task_prerun(sender=None, body=None, **kwargs):
"""Record when a task is enqueued."""
task_start_times[body['id']] = time.time()
@signals.task_postrun.connect
def task_postrun(sender=None, task_id=None, **kwargs):
"""Calculate total latency (enqueue to completion)."""
if task_id in task_start_times:
elapsed = time.time() - task_start_times[task_id]
print(f'{sender.name}: Completed in {elapsed:.2f}s')
del task_start_times[task_id]
@app.task
def slow_task(data):
"""Example task with latency tracking."""
time.sleep(2)
return data
This tracks end-to-end latency, accounting for queue wait time plus execution time.
Identifying Performance Bottlenecks
Use Flower to spot where time is spent:
Queue Wait Time: If a task is enqueued but sits in the queue, the queue is backed up. Increase worker concurrency or add more workers.
Execution Time: If tasks take long, profile them:
@app.task
def slow_computation(data):
"""Profile this task to identify bottlenecks."""
import cProfile
profiler = cProfile.Profile()
profiler.enable()
# Actual work
result = compute(data)
profiler.disable()
profiler.print_stats() # Logs to stdout/worker logs
return result
Worker Utilization: If CPU/memory is low but latency is high, add workers. If CPU is high, increase task concurrency or add hardware.
Production Deployment
In production, run Flower with persistence and security:
# Use a persistent database for event logging
celery -A celery_app flower \
--db=/var/lib/flower/flower.db \
--max_tasks=100000 \
--auth=basic \
--auth_provider=flower.views.auth.GruntHTTPBasicAuthentication \
--basic_auth=admin:secure_password \
--https_key=/etc/ssl/private/key.pem \
--https_cert=/etc/ssl/certs/cert.pem \
--url_prefix=/monitoring
Or use a reverse proxy (Nginx) to serve Flower:
server {
listen 443 ssl;
server_name monitoring.example.com;
ssl_certificate /etc/ssl/certs/cert.pem;
ssl_certificate_key /etc/ssl/private/key.pem;
auth_basic "Restricted";
auth_basic_user_file /etc/nginx/.htpasswd;
location / {
proxy_pass http://localhost:5555;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
}
Persisting Flower Data
By default, Flower stores data in memory and loses it on restart. For long-term analysis, persist events:
from flower.events import EventsState
class PersistentEventsState(EventsState):
"""Store events in a database."""
def save(self):
"""Override to persist events."""
with open('/var/log/celery_events.json', 'w') as f:
json.dump(self.state, f)
# Configure Flower to use persistent state
app.conf.worker_send_task_events = True
Or use Flower's built-in database backend:
celery -A celery_app flower \
--db=/var/lib/flower.db \
--max_tasks=100000
Key Takeaways
- Flower is a web UI for real-time Celery monitoring: tasks, workers, queues, and performance
- Start Flower with
celery -A celery_app flowerand visit http://localhost:5555 - Use the inspect API programmatically to monitor and alert on Celery health
- Track task latency from enqueue to completion to identify queue backups
- Identify performance bottlenecks: queue wait, slow tasks, or underutilized workers
- Secure and persist Flower in production using database backends and authentication
Frequently Asked Questions
Can Flower monitor multiple Celery apps?
Yes. Use the --broker option to point Flower to a specific broker, or set up multiple Flower instances for different apps. Flower doesn't distinguish between apps; it monitors all tasks in the broker.
Does Flower slow down my tasks?
Minimally. Flower listens to Celery events, which are optional signals. Disable event capture in production to reduce overhead: set worker_send_task_events = False in config. Re-enable only for debugging.
How do I export Celery metrics for long-term analysis?
Use the celery-prometheus-exporter to expose metrics to Prometheus, then query with Grafana for dashboards and alerts. Alternatively, log events to a database or data warehouse for post-analysis.
Can I alert when a task fails?
Yes. Configure signal handlers in your Celery app:
from celery import signals
@signals.task_failure.connect
def task_failed(sender=None, task_id=None, exc=None, **kwargs):
send_alert(f'Task {sender.name} failed: {exc}')
What's the difference between active and reserved tasks in Flower?
Active tasks are currently executing. Reserved tasks are pulled by the worker but not yet executing (waiting for a free worker slot). High reserved count indicates queue congestion.