Skip to main content

Writing and Executing Celery Tasks: A Complete Guide

Writing effective Celery tasks requires understanding task signatures, execution modes, and the context available within a task function. A well-designed task is idempotent, includes proper logging, and handles its own errors gracefully. This guide covers the patterns and APIs you need to build robust, scalable tasks.

Task Basics: Definition and Decoration

A Celery task is a Python function decorated with @app.task. The simplest form:

from celery_app import app

@app.task
def send_notification(user_id, message):
"""Send a notification to a user."""
user = get_user(user_id) # Hypothetical function
email_service.send(user.email, message)
return f'Notification sent to {user.email}'

When you decorate a function with @app.task, Celery wraps it and makes it enqueueable. The function itself still executes normally if called directly, but calling send_notification.delay(1, 'Hello') enqueues it for asynchronous execution.

Execution Modes

Tasks can be executed in several ways:

Synchronous (blocking):

result = send_notification(user_id=42, message='Hello')
# The function runs immediately; execution blocks

Asynchronous (fire-and-forget):

task = send_notification.delay(user_id=42, message='Hello')
# Task is enqueued; function returns a task ID immediately
print(f'Task ID: {task.id}')

With options:

task = send_notification.apply_async(
args=(42, 'Hello'),
countdown=60, # Delay execution by 60 seconds
expires=300, # Task expires if not executed within 5 minutes
)

Use .delay() for simple cases and .apply_async() when you need fine-grained control.

Bound Tasks and Self Context

A bound task receives the task instance as the first argument, giving access to metadata like retry count, task ID, and retry logic:

@app.task(bind=True)
def process_data(self, data_id):
"""Bound task with access to self (task instance)."""
try:
result = heavy_computation(data_id)
return {'status': 'success', 'result': result}
except Exception as exc:
# Log the error and retry up to 3 times
self.retry(exc=exc, countdown=60, max_retries=3)

The self parameter provides:

  • self.request: Metadata about the task execution (ID, args, retries, time limits)
  • self.retry(): Retry the task with optional delays
  • self.update_state(): Update task status during long-running operations

Use bound tasks when you need control over retries or want to track progress.

Logging Within Tasks

Always log task execution for debugging and monitoring:

import logging
from celery_app import app

logger = logging.getLogger(__name__)

@app.task(bind=True)
def crawl_website(self, url):
"""Crawl a website and extract data."""
logger.info(f'Task {self.request.id} starting crawl of {url}')
try:
data = scrape_page(url)
logger.info(f'Task {self.request.id}: Successfully crawled {url}')
return data
except Exception as e:
logger.error(f'Task {self.request.id}: Error crawling {url}: {e}', exc_info=True)
raise

Configure logging in your Celery config:

app.conf.worker_log_format = '[%(levelname)s/%(processName)s] %(message)s'
app.conf.worker_task_log_format = '[%(levelname)s/%(processName)s] [%(task_name)s(%(task_id)s)] %(message)s'

Task Options and Metadata

Control task behavior with options passed at definition or enqueue time:

@app.task(
name='tasks.send_email', # Explicit task name (by default, 'module.function')
bind=True, # Receive self as first argument
max_retries=5, # Maximum retry attempts
default_retry_delay=60, # Delay between retries (seconds)
time_limit=300, # Hard time limit (30 seconds, hard stop)
soft_time_limit=250, # Soft time limit (task can catch SoftTimeLimitExceeded)
rate_limit='100/m', # Rate limit: max 100 tasks per minute
)
def send_email(self, to_address, subject, body):
"""Send email with explicit options."""
return email_service.send(to_address, subject, body)

Or set options at enqueue time:

send_email.apply_async(
args=('[email protected]', 'Subject', 'Body'),
countdown=300, # Wait 5 minutes before executing
expires=86400, # Task expires after 1 day
priority=9, # High priority (0-9, default 5)
queue='critical', # Route to specific queue
)

Idempotency: Designing Tasks to Be Retryable

A task may execute more than once (retries, duplicate messages, etc.). Ensure tasks are idempotent: executing them twice produces the same result as executing once.

Bad (non-idempotent):

@app.task
def increment_counter(user_id):
user = User.get(user_id)
user.counter += 1 # If task runs twice, counter increments twice!
user.save()
return user.counter

Good (idempotent):

@app.task
def increment_counter(user_id):
user = User.get(user_id)
# Use atomic operations or conditional updates
user.counter = F('counter') + 1 # Django ORM: atomic increment
user.save()
# Or: check if already processed using a unique constraint
return user.counter

Or use deduplication:

@app.task
def process_order(order_id):
# Check if already processed
if ProcessedOrder.objects.filter(order_id=order_id).exists():
logger.info(f'Order {order_id} already processed')
return None
# Process and record
order = Order.get(order_id)
result = fulfill_order(order)
ProcessedOrder.objects.create(order_id=order_id)
return result

Task State and Result Storage

Celery tracks task state: PENDING, STARTED, SUCCESS, FAILURE, RETRY, REVOKED. The result backend stores outcomes:

task = send_email.apply_async(args=('[email protected]', 'Test', 'Body'))

# Poll for completion
while not task.ready():
print(f'Task status: {task.status}')
time.sleep(1)

if task.successful():
print(f'Result: {task.result}')
elif task.failed():
print(f'Task failed: {task.result}')

For long-running tasks, update state periodically:

@app.task(bind=True)
def process_large_dataset(self, dataset_id):
dataset = Dataset.get(dataset_id)
total_items = dataset.count()
for i, item in enumerate(dataset):
process_item(item)
# Update progress every 10%
if i % (total_items // 10) == 0:
self.update_state(
state='PROGRESS',
meta={'current': i, 'total': total_items}
)
return {'status': 'complete', 'processed': total_items}

Your frontend can poll the task ID and retrieve progress:

// Client-side (e.g., JavaScript)
async function getTaskProgress(taskId) {
const response = await fetch(`/api/tasks/${taskId}`);
const data = await response.json();
console.log(`Progress: ${data.current}/${data.total}`);
}

Key Takeaways

  • Tasks are functions decorated with @app.task; use .delay() for async, .apply_async() for options
  • Bound tasks (bind=True) give access to task metadata and retry control via self
  • Always log within tasks for debugging and use proper log formats
  • Design tasks to be idempotent (safe to execute multiple times)
  • Use task options (time limits, rate limits, retries) to control execution behavior
  • Update task state during long operations to enable progress tracking

Frequently Asked Questions

What's the difference between task.get() and task.result?

task.get(timeout=None) blocks and waits for the result, raising an exception if the task failed. task.result is the raw return value (or exception object) without blocking. Use .get() when you need to wait; use .result to check if already done.

How do I pass complex objects to tasks?

Celery serializes task arguments using JSON (or pickle, depending on config). Keep arguments simple: integers, strings, lists, dicts. If you need to pass a complex object, serialize it to JSON or store it in a database and pass the ID.

Can I update a task's result after it completes?

Technically yes, via app.backend.set(), but don't. Tasks should be immutable once complete. If you need to update state, store metadata separately and update that instead.

How do I cancel a running task?

Use task.revoke(terminate=True) to revoke a task (prevent execution if not started) or terminate a running task. Note: termination is forceful; the task has no cleanup opportunity. Use soft time limits for graceful cancellation.

Can tasks call other tasks?

Yes. Tasks can call other tasks synchronously (blocking) or enqueue them asynchronously. Asynchronous task chaining is the standard pattern; see the chaining article for details.

Further Reading