Skip to main content

Celery Basics: Setting Up Your First Task Queue

Celery is a distributed task queue system that enables Python applications to process work asynchronously. A task is a Python function decorated with @app.task that can be executed by background workers, decoupled from your main application thread. Installing Celery and running your first task takes minutes, and once set up, you can offload expensive operations to a fleet of workers.

What Is Celery and Why Use It?

Celery is an open-source asynchronous task queue/job queue library built on message passing. It lets you define tasks in Python, send them to a message broker (like Redis or RabbitMQ), and have distributed worker processes execute them. The core benefit is non-blocking execution: instead of waiting for a slow operation like sending an email, your application enqueues the task and returns instantly. Workers pick up the task and process it in the background, freeing your web server to handle more requests.

The architecture has three parts: a producer (your app) that creates tasks, a message broker (Redis/RabbitMQ) that queues tasks, and one or more workers that consume and execute them. This separation allows you to scale the task processing tier independently from your application tier.

Installation and Initial Setup

Start by installing Celery and Redis (we'll use Redis as the broker for simplicity):

pip install celery redis

Next, download and run Redis. On macOS with Homebrew:

brew install redis
redis-server

On Linux (Ubuntu/Debian):

sudo apt-get install redis-server
sudo systemctl start redis-server

On Windows, use WSL or download the Redis Windows port from https://github.com/microsoftarchive/redis/releases.

Once Redis is running on localhost:6379, verify the connection:

redis-cli ping
# Output: PONG

Creating Your First Celery Application

Create a file called celery_app.py:

from celery import Celery

# Initialize Celery with a name and broker/backend URL
app = Celery(
'myapp',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)

# Configure Celery (optional but recommended)
app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
)

@app.task
def add(x, y):
"""Simple task that adds two numbers."""
return x + y

@app.task
def send_email(email, subject):
"""Simulate an email task (would call an SMTP service in production)."""
print(f'Sending email to {email}: {subject}')
return f'Email sent to {email}'

In this example, the broker parameter points to Redis (the message queue), and the backend points to where results are stored. We've created two simple tasks: add performs arithmetic, and send_email simulates sending an email.

Running Your First Task

In one terminal, start a Celery worker listening for tasks:

celery -A celery_app worker --loglevel=info

The flag -A celery_app tells Celery to load tasks from the celery_app.py module. You should see output like:

[tasks]
. celery_app.add
. celery_app.send_email

In a second terminal, open a Python shell and enqueue a task:

from celery_app import add, send_email

# Option 1: Enqueue asynchronously (fire-and-forget)
result = add.delay(4, 6)
print(f'Task ID: {result.id}')

# Option 2: Enqueue with a result backend (can check status)
result = send_email.delay('[email protected]', 'Hello!')
print(f'Task status: {result.status}')
print(f'Task result: {result.get(timeout=10)}') # Wait up to 10 seconds for result

Watch the worker terminal—you'll see the tasks execute in real time. The task ID allows you to track execution, and result.get() retrieves the return value once complete.

Understanding the Architecture

Your application enqueues tasks into Redis (the broker). The Celery worker processes runs in a loop, polling Redis for new tasks. When a task arrives, the worker deserializes it, executes the task function, and stores the result back in Redis (the backend). Your app can check the result later by querying the backend with the task ID.

This architecture is resilient: if a worker crashes, unprocessed tasks remain in the queue and another worker picks them up. If your application crashes, tasks already enqueued are preserved.

Configuration Best Practices

In production, store configuration in an environment or config module rather than hardcoding it. Create a celery_config.py:

import os

class CeleryConfig:
broker_url = os.getenv('CELERY_BROKER_URL', 'redis://localhost:6379/0')
result_backend = os.getenv('CELERY_RESULT_BACKEND', 'redis://localhost:6379/1')
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'UTC'
enable_utc = True
task_track_started = True # Track when task begins
task_time_limit = 30 * 60 # Hard time limit (30 minutes)

Then load it in celery_app.py:

from celery import Celery
from celery_config import CeleryConfig

app = Celery('myapp')
app.config_from_object(CeleryConfig)

Key Takeaways

  • Celery is a distributed task queue that executes Python functions asynchronously via background workers
  • A broker (Redis/RabbitMQ) queues tasks; workers consume and execute them
  • Install Celery and Redis, define tasks with @app.task, start a worker, then enqueue tasks with .delay() or .apply_async()
  • Use a result backend to store and retrieve task results by ID
  • Configuration should be externalized and environment-driven in production

Frequently Asked Questions

What's the difference between .delay() and .apply_async()?

.delay(*args, **kwargs) is shorthand for .apply_async(args=args, kwargs=kwargs). Both enqueue the task. Use .apply_async() when you need to set options like retry count, countdown, or routing key. For simple cases, .delay() is cleaner.

Can I run Celery without Redis?

Yes. Celery supports RabbitMQ, AWS SQS, and other brokers. Redis is popular because it's fast, in-memory, and easy to set up locally. For high throughput and strict message reliability, RabbitMQ is the enterprise standard.

How do workers know what tasks are available?

When you start a worker with -A celery_app, Celery imports that module and introspects all functions decorated with @app.task. The worker registers itself with the broker and listens for incoming messages.

Is Celery only for web frameworks?

No. Celery is framework-agnostic. You can use it in Flask, Django, FastAPI, or standalone Python scripts. Any code that can import your task module can enqueue tasks.

How do I ensure a task doesn't run twice?

Use an idempotency key or deduplication strategy in your task logic. Check a database or cache for the operation before executing. Celery also supports result expiry and task compression, but idempotence must be designed into your task itself.

Further Reading