Celery Task Chaining, Signatures, and Workflows
Celery task chaining allows you to compose complex workflows from simple tasks. A signature is a serialized task call that can be passed between functions, enabling task composition at runtime. Using chains, groups, chords, and maps, you build data pipelines, parallel processing, and conditional workflows without writing a monolithic function.
Task Signatures Basics
A signature is a serialized task call. It captures the task name, args, kwargs, and options, allowing you to execute the task later or pass it to other tasks:
from celery import signature
from celery_app import app
@app.task
def add(x, y):
return x + y
# Method 1: Using .s() shorthand
sig = add.s(2, 2) # Signature for add(2, 2)
# Method 2: Using signature()
sig = signature('tasks.add', args=(2, 2))
# Execute the signature
result = sig.delay()
print(result.get()) # 4
Signatures can be parameterized (partial):
# Partial signature: args set later
partial_add = add.s(2) # add(2, ?)
result = partial_add.delay(3) # Completes to add(2, 3)
print(result.get()) # 5
Task Chaining: Sequential Execution
Chain tasks together to run sequentially, passing output of one task as input to the next:
from celery import chain
@app.task
def fetch_data(user_id):
return {'user_id': user_id, 'data': 'raw_data'}
@app.task
def process_data(data):
data['processed'] = True
return data
@app.task
def save_result(data):
# Save to database
result = Result.objects.create(data=data)
return str(result.id)
# Chain: fetch -> process -> save
workflow = chain(
fetch_data.s(user_id=42),
process_data.s(),
save_result.s(),
)
result = workflow.delay()
final_id = result.get() # Database ID
The chain executes tasks sequentially. Each task's return value is passed as the first argument to the next. Chains block on .get() (or execute asynchronously if you don't call .get()).
Parallel Execution with Groups
A group executes multiple tasks in parallel, collecting all results:
from celery import group
@app.task
def process_image(image_id):
# Expensive image processing
return f'processed_image_{image_id}'
# Process 5 images in parallel
job = group(
process_image.s(i) for i in range(1, 6)
)
results = job.delay()
processed = results.get() # Wait for all to complete
print(processed) # ['processed_image_1', 'processed_image_2', ...]
Groups are ideal for embarrassingly parallel work: processing multiple items independently.
Chords: Parallel with Aggregation
A chord runs parallel tasks, then executes a callback task with all results:
from celery import chord
@app.task
def fetch_user_data(user_id):
return {'user_id': user_id, 'posts': 10}
@app.task
def aggregate_stats(results):
# All fetch results passed here
total_users = len(results)
total_posts = sum(r['posts'] for r in results)
return {'total_users': total_users, 'total_posts': total_posts}
# Fetch data for 5 users in parallel, then aggregate
workflow = chord(
[fetch_user_data.s(i) for i in range(1, 6)]
)(aggregate_stats.s())
result = workflow.delay()
stats = result.get() # {'total_users': 5, 'total_posts': 50}
Chords are common: fetch data in parallel, aggregate or summarize the results.
Complex Workflows: Chains of Groups
Combine chains and groups for complex pipelines:
from celery import chain, group, chord
# Scenario: batch process a dataset
# 1. Fetch raw data in parallel (2 sources)
# 2. Process each batch
# 3. Aggregate results
@app.task
def fetch_source_a():
return 'data_from_source_a'
@app.task
def fetch_source_b():
return 'data_from_source_b'
@app.task
def process_batch(data):
return f'processed_{data}'
@app.task
def combine_results(results):
return f'combined_{results}'
workflow = chain(
group(fetch_source_a.s(), fetch_source_b.s()), # Fetch in parallel
chord([process_batch.s()])(combine_results.s()), # Process and combine
)
result = workflow.delay()
final = result.get()
This workflow fetches from two sources in parallel, processes each, and combines results.
Conditional Execution and Error Handling
Use callbacks and error handlers to branch workflows:
from celery import chain, group, signature
from celery.exceptions import Reject
@app.task
def validate_input(data):
if not data.get('user_id'):
raise ValueError('Missing user_id')
return data
@app.task
def process_valid(data):
return f'processed_{data["user_id"]}'
@app.task(bind=True)
def handle_error(self, exc, traceback):
logger.error(f'Workflow failed: {exc}')
return None
# Chain with error handling
workflow = chain(
validate_input.s({'user_id': 42}),
process_valid.s(),
)
# Add error callback
workflow.apply_async(
link_error=handle_error.s(),
)
Or use link for success callbacks and link_error for failure:
@app.task
def send_success_notification(result):
logger.info(f'Workflow succeeded: {result}')
@app.task
def send_failure_notification(exc):
logger.error(f'Workflow failed: {exc}')
workflow = chain(
fetch_data.s(42),
process_data.s(),
)
result = workflow.apply_async(
link=send_success_notification.s(),
link_error=send_failure_notification.s(),
)
Canvas: Programmatic Workflow Building
For dynamic workflows (constructed at runtime), use canvas primitives:
def build_workflow(user_ids):
"""Dynamically build a workflow based on input."""
tasks = [fetch_and_process.s(uid) for uid in user_ids]
return chord(tasks)(aggregate_results.s())
@app.task
def fetch_and_process(user_id):
user = get_user(user_id)
return process_user_data(user)
@app.task
def aggregate_results(results):
return len(results)
# Build and run at runtime
user_list = [1, 2, 3, 4, 5]
workflow = build_workflow(user_list)
result = workflow.delay()
count = result.get()
Immutable Signatures and Retries
Use immutable signatures to prevent task chaining from modifying arguments:
@app.task
def task_a():
return 10
@app.task
def task_b(x):
return x * 2
# Normal chaining: task_b receives task_a's result
sig = chain(task_a.s(), task_b.s())
# Executes: task_b(10)
# Immutable: task_b ignores task_a's result, uses defined args
sig = chain(task_a.s(), task_b.si(5)) # .si() = immutable signature
# Executes: task_b(5), regardless of task_a's return value
Immutable is useful when you want to run multiple independent tasks in a chain without data flow.
Monitoring Workflow Progress
Track progress through multi-step workflows:
from celery import group, chord
def monitor_workflow(workflow_id):
"""Check progress of a multi-step workflow."""
result = app.AsyncResult(workflow_id)
print(f'Status: {result.status}')
print(f'Progress: {result.info}') # May show progress % or current step
@app.task(bind=True)
def long_step(self, data):
"""Task that updates progress."""
for i in range(10):
self.update_state(state='PROGRESS', meta={'step': i, 'total': 10})
time.sleep(1)
return data
workflow = chord(
[long_step.s(i) for i in range(3)]
)(aggregate.s())
task = workflow.delay()
print(task.id)
# Check progress periodically
time.sleep(5)
monitor_workflow(task.id)
Key Takeaways
- Signatures are serialized task calls; use
.s()to create them - Chain tasks for sequential execution: output of one becomes input of next
- Groups execute multiple tasks in parallel, collecting all results
- Chords combine parallel execution with a callback task for aggregation
- Build complex workflows by nesting chains, groups, and chords
- Use immutable signatures (
.si()) to prevent argument modification - Handle errors with
link_errorcallbacks - Monitor progress with
update_state()and periodic status checks
Frequently Asked Questions
What's the difference between a chain and a group?
A chain executes tasks sequentially: task1 -> task2 -> task3. A group executes tasks in parallel: [task1, task2, task3] all at once. Chains pass data forward; groups collect all results.
Can I conditionally branch a workflow?
Yes, manually. Check a condition and build different chains:
if condition:
workflow = chain(task_a.s(), task_b.s())
else:
workflow = chain(task_a.s(), task_c.s())
For complex branching, consider a state machine or workflow engine (not native to Celery).
How do I handle failures in a chain?
Use link_error on the chain:
workflow = chain(task_a.s(), task_b.s()).apply_async(
link_error=handle_failure.s(),
)
Or catch exceptions inside tasks and retry.
Can I cancel a running workflow?
Partially. You can revoke task IDs, but a chain in progress may have already started executing later tasks. Use task.revoke(terminate=True) but be aware of race conditions.
How do I make a reusable workflow template?
Define the canvas structure as a function:
def my_workflow(data, user_id):
return chain(
validate.s(data),
process.s(user_id),
save.s(),
)
# Use it:
result = my_workflow({'key': 'value'}, 42).delay()