Skip to main content

Background Jobs with Celery

Not every operation can complete in an HTTP request. Sending emails takes 0.5–5 seconds; generating reports can take minutes; processing webhook payloads should be durable. Celery is a distributed task queue for Python that lets you offload work to background workers. A user triggers a job (e.g., "export my data") via HTTP, Celery executes it asynchronously, and the API returns immediately. This guide sets up Celery with Redis, handles retries, monitors task status, and applies it to real SaaS scenarios: email, PDF generation, and scheduled tasks.

Why Celery Over Synchronous Processing

Making a user wait 30 seconds for a PDF to generate (request times out after 60 sec) is a poor experience. Celery decouples job submission from execution: the request queues the job in Redis and returns; a separate worker process executes it. Workers scale independently: add more workers if jobs queue up. Celery retries failed tasks, handles task timeouts, and preserves job history for debugging.

Setting Up Celery and Redis

Install dependencies:

pip install celery[redis] redis

Configure Celery:

# celery_config.py
import os
from celery import Celery

REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")

app = Celery(
"saas_backend",
broker=REDIS_URL,
backend=REDIS_URL # Results backend; optional but useful
)

# Celery configuration
app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
task_track_started=True, # Track when task starts
task_time_limit=30 * 60, # Abort tasks running >30 min
task_soft_time_limit=25 * 60, # Warn at 25 min
worker_prefetch_multiplier=4, # Prefetch 4 tasks per worker
result_expires=3600, # Results kept for 1 hour
beat_scheduler="celery.beat:PersistentScheduler" # Periodic task storage
)

Defining Tasks

A Celery task is a Python function decorated with @app.task:

# tasks.py
from celery_config import app
from datetime import datetime
import smtplib
from email.mime.text import MIMEText

@app.task(bind=True, max_retries=3)
def send_email(self, to_email: str, subject: str, body: str):
"""
Send an email asynchronously.
If it fails, Celery retries up to 3 times with exponential backoff.
"""
try:
# Simulated SMTP (use smtplib or a service like SendGrid in production)
print(f"Sending email to {to_email}: {subject}")

# msg = MIMEText(body)
# msg['Subject'] = subject
# msg['From'] = "[email protected]"
# msg['To'] = to_email
# SMTP_SERVER = "smtp.gmail.com"
# with smtplib.SMTP_SSL(SMTP_SERVER, 465) as server:
# server.login(EMAIL_USER, EMAIL_PASSWORD)
# server.send_message(msg)

return {"status": "sent", "to": to_email}

except Exception as exc:
# Retry with exponential backoff: 60s, 120s, 240s
raise self.retry(exc=exc, countdown=2 ** self.request.retries)

@app.task(bind=True)
def generate_report(self, tenant_id: int, report_type: str):
"""
Generate a PDF report (slow operation).
Stores result in database for later retrieval.
"""
from sqlalchemy.orm import Session
from app.models import Report
from app.database import engine

try:
session = Session(engine)

# Simulate slow report generation
import time
time.sleep(5)

# Store report
report = Report(
tenant_id=tenant_id,
type=report_type,
file_path=f"/storage/{tenant_id}/report_{report_type}_{datetime.utcnow().timestamp()}.pdf",
status="completed",
created_at=datetime.utcnow()
)
session.add(report)
session.commit()

return {"report_id": report.id, "status": "completed"}

except Exception as exc:
raise self.retry(exc=exc, countdown=60, max_retries=5)

@app.task
def process_webhook(webhook_id: int, payload: dict):
"""
Process a webhook asynchronously.
Keeps HTTP endpoint fast; actual work happens in background.
"""
print(f"Processing webhook {webhook_id}: {payload}")
# Simulate work
return {"webhook_id": webhook_id, "processed": True}

Queuing Tasks from FastAPI

Queue a task from an endpoint without waiting for it to complete:

from fastapi import FastAPI, Depends, HTTPException
from pydantic import BaseModel
from typing import Annotated
from sqlalchemy.orm import Session
from app.tasks import send_email, generate_report, process_webhook

app = FastAPI()

class SendEmailRequest(BaseModel):
to_email: str
subject: str
body: str

@app.post("/send-email")
async def send_email_endpoint(
request: SendEmailRequest,
session: Annotated[Session, Depends(get_db)],
current_user: Annotated[TokenData, Depends(get_current_user)]
):
"""
Queue an email to be sent asynchronously.
Returns immediately with a task ID.
"""
# Queue the task
task = send_email.delay(
to_email=request.to_email,
subject=request.subject,
body=request.body
)

return {
"task_id": task.id,
"status": "queued",
"message": "Email will be sent shortly"
}

@app.post("/reports/generate")
async def generate_report_endpoint(
report_type: str,
session: Annotated[Session, Depends(get_db)],
current_user: Annotated[TokenData, Depends(get_current_user)]
):
"""
Queue a report generation job.
User polls the status endpoint to check progress.
"""
task = generate_report.delay(
tenant_id=current_user.tenant_id,
report_type=report_type
)

return {
"task_id": task.id,
"status": "generating",
"check_status_at": f"/tasks/{task.id}"
}

@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
"""
Poll the status of a background task.
Returns state (pending, started, success, failure) and result.
"""
from celery.result import AsyncResult

task = AsyncResult(task_id, app=app)

response = {
"task_id": task_id,
"status": task.state,
"result": None
}

if task.state == "SUCCESS":
response["result"] = task.result
elif task.state == "FAILURE":
response["error"] = str(task.info)

return response

Handling Task Failures and Retries

Configure retries with exponential backoff:

from celery.exceptions import SoftTimeLimitExceeded, MaxRetriesExceededError

@app.task(bind=True, autoretry_for=(Exception,), max_retries=5)
def fetch_external_data(self, url: str):
"""
Fetch data from external API with automatic retries.
"""
import httpx

try:
response = httpx.get(url, timeout=30)
response.raise_for_status()
return response.json()

except httpx.HTTPError as exc:
# Retry after 2^retries seconds (exponential backoff)
countdown = min(2 ** self.request.retries, 600) # Cap at 10 minutes
raise self.retry(exc=exc, countdown=countdown)

except SoftTimeLimitExceeded:
# Task exceeded soft time limit; clean up gracefully
print("Task exceeded 25-minute soft limit; cleaning up...")
raise

except MaxRetriesExceededError:
# All retries exhausted; log and fail
print(f"Task failed after {self.max_retries} retries: {exc}")
raise

@app.task(bind=True)
def batch_process_users(self, tenant_id: int):
"""
Process users in batches, reporting progress.
"""
from app.models import User
from sqlalchemy.orm import Session
from app.database import engine

session = Session(engine)
users = session.query(User).filter(User.tenant_id == tenant_id).all()
total = len(users)

for i, user in enumerate(users):
try:
# Process user
print(f"Processing user {user.id}")

# Update task progress
self.update_state(
state="PROGRESS",
meta={"current": i + 1, "total": total, "status": f"Processing user {user.id}"}
)

except Exception as exc:
print(f"Error processing user {user.id}: {exc}")
# Continue with next user instead of failing entire batch

session.close()
return {"processed": total}

Scheduled Tasks (Periodic Jobs)

Run tasks on a schedule using Celery Beat:

from celery.schedules import crontab

# celery_config.py
app.conf.beat_schedule = {
"check-subscription-renewals": {
"task": "tasks.check_subscription_renewals",
"schedule": crontab(hour=0, minute=0), # Daily at midnight UTC
},
"send-weekly-digest": {
"task": "tasks.send_weekly_digest",
"schedule": crontab(day_of_week=1, hour=9, minute=0), # Mondays at 9 AM
},
"cleanup-old-logs": {
"task": "tasks.cleanup_old_logs",
"schedule": crontab(hour="*/6"), # Every 6 hours
}
}

# tasks.py
@app.task
def check_subscription_renewals():
"""
Check for subscriptions renewing today and log details.
Runs daily at midnight UTC.
"""
from sqlalchemy.orm import Session
from app.models import Tenant
from app.database import engine
from datetime import date

session = Session(engine)
renewals = session.query(Tenant).filter(
Tenant.subscription_current_period_end == date.today()
).all()

print(f"Found {len(renewals)} subscriptions renewing today")
for tenant in renewals:
print(f"Tenant {tenant.id} ({tenant.name}) renews today")

session.close()
return {"renewed": len(renewals)}

@app.task
def send_weekly_digest():
"""Send weekly activity digest to all users. Runs Mondays at 9 AM."""
from sqlalchemy.orm import Session
from app.models import User
from app.database import engine

session = Session(engine)
users = session.query(User).filter(User.is_active == True).all()

for user in users:
send_email.delay(
to_email=user.email,
subject="Your Weekly Activity Digest",
body=f"Hello {user.first_name}, here's your activity this week..."
)

session.close()
return {"digests_sent": len(users)}

Run the Celery Beat scheduler:

celery -A celery_config beat --loglevel=info

Monitoring and Debugging

Check task queues and worker health:

# List pending tasks
celery -A celery_config inspect active

# View registered tasks
celery -A celery_config inspect registered

# Check worker stats
celery -A celery_config inspect stats

For monitoring in production, use Flower (web UI for Celery):

pip install flower
celery -A celery_config flower --port=5555

Then visit http://localhost:5555 to see task history, worker stats, and queues.

Key Takeaways

  • Queue long-running work (emails, reports, webhooks) to Celery; return immediately to the user.
  • Configure retries with exponential backoff for transient failures (network issues, rate limits).
  • Use Celery Beat for periodic tasks (renewing subscriptions, sending digests).
  • Monitor task queues and worker health via Flower; set up alerts for failed tasks.
  • Store task results in Redis for polling (user checks job status).

Frequently Asked Questions

What if a worker crashes mid-task?

Celery retries the task on another worker (or same one if restarted). Use task_acks_late=True in config so Celery doesn't acknowledge task completion until it finishes (avoids data loss on crash).

Can I run Celery without Redis?

Yes, but Redis is recommended. Celery supports RabbitMQ, Kafka, and databases as brokers. Redis is simplest for small-to-medium SaaS.

How do I test Celery tasks?

In tests, use eager execution:

from celery import current_app

@pytest.fixture
def celery_config():
return {"task_always_eager": True}

This runs tasks synchronously, making tests faster and deterministic.

Should I use apply_async() or delay()?

Use delay(arg1, arg2) for simple cases. Use apply_async(args=(arg1, arg2), queue='high_priority', countdown=60) for advanced options (priority queues, countdown, eta).

How do I handle rate limiting (e.g., max 10 emails/second)?

Use task rate limiting:

@app.task(rate_limit="10/s")  # 10 tasks per second
def send_email(to_email: str, subject: str, body: str):
...

Celery queues tasks and executes at the specified rate.

Further Reading