Skip to main content

Monitoring Celery with Flower: Real-Time Insights

Flower is a web-based real-time monitoring tool for Celery. It displays task execution, worker status, queue lengths, latency, and failure rates in a user-friendly dashboard. In production, Flower is essential for diagnosing bottlenecks, spotting failures, and understanding system health at a glance.

Installing and Starting Flower

Install Flower via pip:

pip install flower

Start Flower, pointing it to your Celery app:

celery -A celery_app flower

Flower binds to http://localhost:5555 by default. Open a browser and you'll see a live dashboard with worker statuses, active tasks, and queue metrics.

For production, run Flower on a separate server with authentication and HTTPS:

celery -A celery_app flower \
--port=5555 \
--broker=redis://localhost:6379/0 \
--basic_auth=admin:password123 \
--log_file=/var/log/flower.log

Dashboard Overview

The Flower dashboard has several main pages:

Workers Tab: Shows all registered workers, their status (online/offline), CPU/memory usage, and task counts (active, reserved, processed). A red status indicates a dead worker; green indicates healthy.

Tasks Tab: Lists all tasks executed in the current session: task name, state (success/failure/pending), execution time, worker assigned, and arguments. Filter by state or task name.

Queues Tab: Displays queue names, their lengths, and consumption rate (tasks per second).

Events Tab: A real-time log of Celery events: task execution, worker status changes, errors. Useful for debugging.

Broker Tab: Broker statistics (connection count, memory, uptime) and queue depth.

Programmatic Monitoring

Beyond the UI, Flower exposes monitoring data via HTTP API. Celery also provides an inspect API for runtime monitoring:

from celery import current_app

# Inspect active tasks
inspect = current_app.control.inspect()

# Get active tasks on all workers
active_tasks = inspect.active()
for worker_name, tasks in active_tasks.items():
print(f'{worker_name}: {len(tasks)} active')
for task in tasks:
print(f' - {task["name"]} (started {task["time_start"]}s ago)')

# Get registered tasks per worker
registered_tasks = inspect.registered()
for worker_name, tasks in registered_tasks.items():
print(f'{worker_name}: {len(tasks)} registered tasks')

# Get worker statistics
stats = inspect.stats()
for worker_name, stat in stats.items():
print(f'{worker_name}: pool={stat["pool"]["max-concurrency"]} workers')

# Check if workers are alive
ping = inspect.ping()
for worker_name, result in ping.items():
print(f'{worker_name}: {"alive" if result else "dead"}')

Setting Up Alerts

Monitor metrics and trigger alerts when thresholds are exceeded. Use Flower's integration with monitoring tools:

Prometheus Integration: Expose Celery metrics in Prometheus format:

pip install celery-prometheus-exporter
celery-prometheus-exporter --broker redis://localhost:6379/0

Then scrape the exporter in your Prometheus config:

scrape_configs:
- job_name: celery
static_configs:
- targets: ['localhost:8888']

Custom Monitoring Script: Poll Celery periodically and alert on anomalies:

import time
from celery import current_app

def monitor_celery_health():
"""Monitor Celery health and alert on issues."""
inspect = current_app.control.inspect()

while True:
stats = inspect.stats()
active = inspect.active()

# Alert if no workers are alive
if not stats:
print('CRITICAL: No workers are alive!')
send_alert('Celery workers down')

# Alert if queue is backed up
for worker, task_list in active.items():
if len(task_list) > 100:
print(f'WARNING: {worker} has {len(task_list)} active tasks (backlog)')
send_alert(f'Task backlog on {worker}')

# Alert if a worker hasn't processed tasks recently
for worker, stat in stats.items():
if stat['total'] == 0:
print(f'WARNING: {worker} has never processed a task')

time.sleep(60)

def send_alert(message):
"""Send alert via email, Slack, etc."""
print(f'ALERT: {message}')
# Integration with monitoring tools (e.g., Slack, PagerDuty)

Run this in a background thread or separate process for continuous monitoring.

Tracking Task Latency

Monitor how long tasks take from enqueue to completion:

import time
from celery import signals
from celery_app import app

task_start_times = {}

@signals.before_task_publish.connect
def task_prerun(sender=None, body=None, **kwargs):
"""Record when a task is enqueued."""
task_start_times[body['id']] = time.time()

@signals.task_postrun.connect
def task_postrun(sender=None, task_id=None, **kwargs):
"""Calculate total latency (enqueue to completion)."""
if task_id in task_start_times:
elapsed = time.time() - task_start_times[task_id]
print(f'{sender.name}: Completed in {elapsed:.2f}s')
del task_start_times[task_id]

@app.task
def slow_task(data):
"""Example task with latency tracking."""
time.sleep(2)
return data

This tracks end-to-end latency, accounting for queue wait time plus execution time.

Identifying Performance Bottlenecks

Use Flower to spot where time is spent:

Queue Wait Time: If a task is enqueued but sits in the queue, the queue is backed up. Increase worker concurrency or add more workers.

Execution Time: If tasks take long, profile them:

@app.task
def slow_computation(data):
"""Profile this task to identify bottlenecks."""
import cProfile
profiler = cProfile.Profile()
profiler.enable()

# Actual work
result = compute(data)

profiler.disable()
profiler.print_stats() # Logs to stdout/worker logs
return result

Worker Utilization: If CPU/memory is low but latency is high, add workers. If CPU is high, increase task concurrency or add hardware.

Production Deployment

In production, run Flower with persistence and security:

# Use a persistent database for event logging
celery -A celery_app flower \
--db=/var/lib/flower/flower.db \
--max_tasks=100000 \
--auth=basic \
--auth_provider=flower.views.auth.GruntHTTPBasicAuthentication \
--basic_auth=admin:secure_password \
--https_key=/etc/ssl/private/key.pem \
--https_cert=/etc/ssl/certs/cert.pem \
--url_prefix=/monitoring

Or use a reverse proxy (Nginx) to serve Flower:

server {
listen 443 ssl;
server_name monitoring.example.com;

ssl_certificate /etc/ssl/certs/cert.pem;
ssl_certificate_key /etc/ssl/private/key.pem;

auth_basic "Restricted";
auth_basic_user_file /etc/nginx/.htpasswd;

location / {
proxy_pass http://localhost:5555;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
}

Persisting Flower Data

By default, Flower stores data in memory and loses it on restart. For long-term analysis, persist events:

from flower.events import EventsState

class PersistentEventsState(EventsState):
"""Store events in a database."""

def save(self):
"""Override to persist events."""
with open('/var/log/celery_events.json', 'w') as f:
json.dump(self.state, f)

# Configure Flower to use persistent state
app.conf.worker_send_task_events = True

Or use Flower's built-in database backend:

celery -A celery_app flower \
--db=/var/lib/flower.db \
--max_tasks=100000

Key Takeaways

  • Flower is a web UI for real-time Celery monitoring: tasks, workers, queues, and performance
  • Start Flower with celery -A celery_app flower and visit http://localhost:5555
  • Use the inspect API programmatically to monitor and alert on Celery health
  • Track task latency from enqueue to completion to identify queue backups
  • Identify performance bottlenecks: queue wait, slow tasks, or underutilized workers
  • Secure and persist Flower in production using database backends and authentication

Frequently Asked Questions

Can Flower monitor multiple Celery apps?

Yes. Use the --broker option to point Flower to a specific broker, or set up multiple Flower instances for different apps. Flower doesn't distinguish between apps; it monitors all tasks in the broker.

Does Flower slow down my tasks?

Minimally. Flower listens to Celery events, which are optional signals. Disable event capture in production to reduce overhead: set worker_send_task_events = False in config. Re-enable only for debugging.

How do I export Celery metrics for long-term analysis?

Use the celery-prometheus-exporter to expose metrics to Prometheus, then query with Grafana for dashboards and alerts. Alternatively, log events to a database or data warehouse for post-analysis.

Can I alert when a task fails?

Yes. Configure signal handlers in your Celery app:

from celery import signals

@signals.task_failure.connect
def task_failed(sender=None, task_id=None, exc=None, **kwargs):
send_alert(f'Task {sender.name} failed: {exc}')

What's the difference between active and reserved tasks in Flower?

Active tasks are currently executing. Reserved tasks are pulled by the worker but not yet executing (waiting for a free worker slot). High reserved count indicates queue congestion.

Further Reading