Skip to main content

Task Routing in Celery: Direct Tasks to Specific Workers

Task routing directs specific tasks to dedicated workers or queues. This allows you to isolate workloads: critical payment tasks run on fast hardware, image processing runs on GPU workers, and cleanup tasks run on low-priority workers. Routing is essential for building production systems where resource constraints and SLAs vary by task type.

Understanding Queues and Routing

Celery's default queue is called celery. When you enqueue a task without specifying a queue, it lands in this default. Workers listen to one or more queues. By creating multiple queues and routing tasks to them, you achieve several benefits: parallel processing (different queue = different worker), isolation (failure in one queue doesn't affect others), and resource optimization (GPU workers for heavy tasks, shared workers for light ones).

The routing mechanism is simple: a task specifies a queue, and workers bound to that queue pick it up.

Basic Routing: Queue Assignment

Define queues in your Celery config:

from kombu import Queue, Exchange

app.conf.task_queues = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('priority', Exchange('priority'), routing_key='priority'),
Queue('batch', Exchange('batch'), routing_key='batch'),
)

# Default queue if not specified
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange = 'default'
app.conf.task_default_routing_key = 'default'

Enqueue a task to a specific queue:

from tasks import send_email, process_image, cleanup_old_files

# Route to different queues
send_email.apply_async(
args=('[email protected]', 'Hello'),
queue='priority',
)

process_image.apply_async(
args=(image_id,),
queue='batch',
)

cleanup_old_files.apply_async(
queue='default',
)

Start workers listening to specific queues:

# Worker for priority email tasks
celery -A celery_app worker -Q priority --concurrency=4 --loglevel=info

# Worker for batch image processing
celery -A celery_app worker -Q batch --concurrency=2 --loglevel=info

# Worker for default and other queues
celery -A celery_app worker -Q default --concurrency=8 --loglevel=info

Now priority emails execute immediately on a dedicated worker, batch jobs run in parallel on separate workers, and default tasks don't block each other.

Pattern-Based Routing with task_routes

For large systems, hard-code routing by task name using task_routes:

app.conf.task_routes = {
'tasks.send_email': {'queue': 'priority', 'priority': 9},
'tasks.send_sms': {'queue': 'priority', 'priority': 9},
'tasks.process_image': {'queue': 'batch'},
'tasks.process_video': {'queue': 'batch', 'time_limit': 600},
'tasks.cleanup_*': {'queue': 'default'}, # Wildcard patterns
'tasks.report_*': {'queue': 'default', 'priority': 1}, # Low priority
}

@app.task
def send_email(to, subject, body):
"""Automatically routed to 'priority' queue."""
pass

@app.task
def send_sms(phone, message):
"""Also routed to 'priority' queue."""
pass

@app.task
def process_image(image_id):
"""Routed to 'batch' queue."""
pass

@app.task
def cleanup_old_files():
"""Matches wildcard 'cleanup_*'."""
pass

With task_routes, enqueuing tasks is simple—Celery automatically applies the routing:

send_email.delay('[email protected]', 'Subject', 'Body')  # Automatically routed to 'priority'
process_image.delay(123) # Automatically routed to 'batch'

Priority Queues and SLAs

Combine routing with priority levels to honor service-level agreements:

from celery.schedules import crontab

app.conf.task_routes = {
'tasks.charge_payment': {
'queue': 'critical',
'priority': 10, # Highest priority
'time_limit': 30,
'soft_time_limit': 25,
},
'tasks.send_notification': {
'queue': 'general',
'priority': 5,
},
'tasks.generate_report': {
'queue': 'background',
'priority': 1, # Lowest priority
'time_limit': 3600,
},
}

# Deploy workers with matching resources
# Critical queue: 2 workers, 8 cores each, SSD storage
# General queue: 4 workers, 4 cores each
# Background queue: 1 worker, 2 cores, HDD storage

Workers can also be configured to respect priority within a queue:

# Worker with worker_prefetch_multiplier=1 for fair scheduling
celery -A celery_app worker -Q critical -c 8 --prefetch-multiplier=1

Routing Logic: Custom Router

For dynamic routing (e.g., routing based on task arguments), implement a custom router function:

def route_payment_task(name, args, kwargs, options, broker, **rest):
"""Route payment tasks based on amount."""
if name == 'tasks.charge_payment':
amount = kwargs.get('amount', args[0] if args else 0)
if amount > 10000:
return {
'queue': 'critical',
'priority': 10,
'routing_key': 'critical',
}
return {'queue': 'general', 'routing_key': 'general'}

app.conf.task_routes = (route_payment_task,)

@app.task
def charge_payment(user_id, amount):
"""Charge a user; high amounts are routed to critical queue."""
return process_payment(user_id, amount)

# Large payments are automatically routed to critical queue
charge_payment.delay(user_id=123, amount=50000)

Multi-Region Routing

Route tasks to workers in different geographic regions:

app.conf.task_routes = {
'tasks.send_email_us': {'queue': 'regional-us'},
'tasks.send_email_eu': {'queue': 'regional-eu'},
'tasks.send_email_asia': {'queue': 'regional-asia'},
}

def send_regional_email(user_id):
"""Send email to user, routed to their region."""
user = get_user(user_id)
task_map = {
'US': 'tasks.send_email_us',
'EU': 'tasks.send_email_eu',
'ASIA': 'tasks.send_email_asia',
}
task_name = task_map.get(user.region, 'tasks.send_email_us')
app.send_task(task_name, args=(user.email, 'Subject', 'Body'))

Deploy regional workers:

# US worker
celery -A celery_app worker -Q regional-us --hostname=worker-us@%h

# EU worker
celery -A celery_app worker -Q regional-eu --hostname=worker-eu@%h

# Asia worker
celery -A celery_app worker -Q regional-asia --hostname=worker-asia@%h

Monitoring Routed Tasks

Check queue lengths and worker status:

# List all active queues and their messages
celery -A celery_app inspect active_queues

# Get queue lengths
celery -A celery_app inspect reserved

# Check which worker is handling which queue
celery -A celery_app inspect active

Or programmatically:

from celery import current_app

inspect = current_app.control.inspect()
active_tasks = inspect.active()
for worker, tasks in active_tasks.items():
print(f'{worker}: {len(tasks)} active tasks')

reserved_tasks = inspect.reserved()
for worker, tasks in reserved_tasks.items():
print(f'{worker}: {len(tasks)} reserved tasks')

Key Takeaways

  • Routing directs tasks to specific queues; workers listen to queues and process tasks
  • Define queues in config, then route tasks via apply_async(queue='...') or task_routes
  • Use task_routes with patterns for declarative, automatic routing
  • Combine routing with priority levels to honor SLAs for critical tasks
  • Implement custom router functions for dynamic, argument-based routing
  • Monitor queue lengths and worker status to balance load across queues

Frequently Asked Questions

Can a single worker listen to multiple queues?

Yes. Use -Q queue1,queue2,queue3 to listen to multiple queues. The worker picks up tasks in order; use priority to ensure important tasks are processed first.

What happens if I enqueue to a queue with no listening workers?

The task sits in the queue until a worker starts listening. It's not lost, just delayed. This is useful for scaling: enqueue tasks during high load, start workers when capacity increases.

How do I balance load across multiple workers on the same queue?

Celery uses fair dispatching by default: workers pull tasks from the queue one at a time. Use worker_prefetch_multiplier=1 for strict fairness (one task per worker at a time) or increase it for batch efficiency.

Can I change routing without restarting workers?

No. Routing is determined at enqueue time and stored with the task. Workers are already listening to specific queues. To change routing globally, restart workers and update the config.

How do I ensure a high-priority task runs immediately, not queued?

Route to a dedicated high-priority queue with dedicated workers. Queuing is inherent in Celery; tasks always enter the broker first. To bypass queuing, call the task function directly (not recommended for async).

Further Reading