Skip to main content

Background Tasks FastAPI: Async Job Queue

Background tasks are work that happens outside the request/response cycle. When a user uploads a file, you validate it instantly but resize and compress it later. When they reset their password, you return success immediately but email them the reset link asynchronously. FastAPI provides two mechanisms: BackgroundTasks for simple, in-process jobs, and Celery for distributed, fault-tolerant queues. This guide covers both, showing when to use each and how to avoid common pitfalls like missing error handling or hanging tasks.

I've debugged production incidents caused by crashed background tasks left silently failing. This article teaches you patterns that keep background work reliable.

FastAPI BackgroundTasks: In-Process Jobs

For lightweight tasks that don't require persistence or retry logic, BackgroundTasks queues work to run after the response is sent:

from fastapi import FastAPI, BackgroundTasks
import smtplib
from email.mime.text import MIMEText

app = FastAPI()

def send_email_background(email: str, message: str):
"""Send email without blocking the HTTP response."""
try:
msg = MIMEText(message)
msg["Subject"] = "Password Reset"
msg["From"] = "[email protected]"
msg["To"] = email

with smtplib.SMTP("smtp.example.com", 587) as server:
server.starttls()
server.login("[email protected]", "password")
server.sendmail("[email protected]", [email], msg.as_string())

logger.info(f"Email sent to {email}")
except Exception as e:
logger.error(f"Failed to send email to {email}: {e}")

@app.post("/users/reset-password")
async def reset_password(email: str, background_tasks: BackgroundTasks):
"""Initiate password reset; send email in background."""
# Validate email and generate reset token (synchronously)
reset_token = generate_reset_token(email)
save_reset_token(email, reset_token)

# Queue email to send after response
background_tasks.add_task(
send_email_background,
email=email,
message=f"Click here to reset: https://example.com/reset/{reset_token}"
)

return {"status": "reset email sent"}

The background_tasks.add_task() call queues the function. FastAPI waits for the route to finish, sends the HTTP response, then runs send_email_background() in the background. The client gets a 200 response regardless of whether the email succeeds.

Async Background Tasks

For I/O-bound background work, use async functions:

import aiohttp
import asyncio

async def process_image_async(image_path: str, output_path: str):
"""Resize and compress image asynchronously."""
async with aiohttp.ClientSession() as session:
# Call external image processing service
async with session.post(
"https://imageapi.example.com/compress",
json={"input": image_path, "output": output_path}
) as resp:
result = await resp.json()
logger.info(f"Image processed: {result}")

@app.post("/upload")
async def upload_image(
file: UploadFile,
background_tasks: BackgroundTasks
):
"""Upload image; process in background."""
file_path = f"/tmp/{file.filename}"

# Save file (simple sync operation)
with open(file_path, "wb") as f:
f.write(await file.read())

# Queue async background task
output_path = f"/tmp/processed_{file.filename}"
background_tasks.add_task(
process_image_async,
image_path=file_path,
output_path=output_path
)

return {"filename": file.filename, "status": "processing"}

FastAPI's event loop schedules the async task. The background function runs concurrently with future requests.

Error Handling in Background Tasks

Always wrap background tasks in try/except and log failures:

from datetime import datetime
import logging

logger = logging.getLogger(__name__)

def sync_external_database(user_id: int, retry_count: int = 0):
"""Sync user data to external system; log failures."""
max_retries = 3

try:
external_api.sync_user(user_id)
logger.info(f"Synced user {user_id}")
except requests.Timeout:
if retry_count < max_retries:
# Retry after delay (in production, use a task queue)
import time
time.sleep(2 ** retry_count) # Exponential backoff
sync_external_database(user_id, retry_count + 1)
else:
logger.error(f"Failed to sync user {user_id} after {max_retries} retries")
# Send alert to ops
send_alert(f"User sync failed: {user_id}")
except Exception as e:
logger.error(f"Unexpected error syncing user {user_id}: {e}", exc_info=True)

@app.post("/users")
async def create_user(user: UserCreate, background_tasks: BackgroundTasks):
"""Create user and sync to external system."""
db_user = db.add_and_commit(user)

# Queue sync with error handling
background_tasks.add_task(sync_external_database, user_id=db_user.id)

return db_user

Without error handling, background tasks fail silently. Your users think everything succeeded, but the external sync never happened.

Celery for Distributed Task Queues

For mission-critical work or long-running tasks, use Celery. It's a distributed task queue that persists jobs to a message broker (Redis, RabbitMQ) and retries on failure:

from celery import Celery
from celery.exceptions import MaxRetriesExceededError
import smtplib

celery_app = Celery(
"tasks",
broker="redis://localhost:6379",
backend="redis://localhost:6379"
)

@celery_app.task(bind=True, max_retries=3)
def send_email_celery(self, email: str, subject: str, body: str):
"""Send email with automatic retries."""
try:
msg = MIMEText(body)
msg["Subject"] = subject
msg["From"] = "[email protected]"
msg["To"] = email

with smtplib.SMTP("smtp.example.com", 587) as server:
server.starttls()
server.login("[email protected]", "password")
server.sendmail("[email protected]", [email], msg.as_string())

logger.info(f"Email sent to {email}")
except smtplib.SMTPException as e:
# Retry with exponential backoff
logger.warning(f"Retrying email to {email}: {e}")
raise self.retry(exc=e, countdown=2 ** self.request.retries)
except MaxRetriesExceededError:
logger.error(f"Failed to send email to {email} after max retries")

# In your FastAPI route
from fastapi import FastAPI

app = FastAPI()

@app.post("/users/reset-password")
async def reset_password(email: str):
"""Queue email with Celery."""
reset_token = generate_reset_token(email)

# Queue Celery task (persisted to Redis)
send_email_celery.delay(
email=email,
subject="Password Reset",
body=f"Click to reset: https://example.com/reset/{reset_token}"
)

return {"status": "email queued"}

Celery tasks are persisted to Redis. If the worker crashes mid-task, Redis replays it when the worker restarts. The max_retries=3 ensures failed tasks are retried automatically.

Monitoring Background Tasks

Track background task status for visibility:

from enum import Enum
from sqlalchemy import Column, String, DateTime
from datetime import datetime

class TaskStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"

class BackgroundTaskRecord(Base):
__tablename__ = "background_tasks"

id = Column(Integer, primary_key=True)
task_name = Column(String)
status = Column(String, default=TaskStatus.PENDING)
result = Column(String, nullable=True)
error = Column(String, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
completed_at = Column(DateTime, nullable=True)

def wrap_background_task(task_name: str, task_func, *args, **kwargs):
"""Wrapper to record task status in database."""
task_record = db.query(BackgroundTaskRecord).filter_by(
task_name=task_name
).first()

if not task_record:
task_record = BackgroundTaskRecord(task_name=task_name)
db.add(task_record)
db.commit()

task_record.status = TaskStatus.RUNNING
db.commit()

try:
result = task_func(*args, **kwargs)
task_record.status = TaskStatus.COMPLETED
task_record.result = str(result)
db.commit()
return result
except Exception as e:
task_record.status = TaskStatus.FAILED
task_record.error = str(e)
db.commit()
raise

@app.post("/users")
async def create_user(user: UserCreate, background_tasks: BackgroundTasks):
db_user = db.add_and_commit(user)

background_tasks.add_task(
wrap_background_task,
task_name="send_welcome_email",
task_func=send_email,
email=user.email
)

return db_user

@app.get("/tasks/{task_id}")
async def get_task_status(task_id: int):
task = db.query(BackgroundTaskRecord).get(task_id)
return task

You can now query task status and see failures without digging through logs.

Choosing Between BackgroundTasks and Celery

FeatureBackgroundTasksCelery
PersistenceNo (in-process)Yes (to broker)
RetriesManualAutomatic
ScalabilitySingle processDistributed workers
SetupZeroRequires Redis/RabbitMQ
Best forQuick tasks (e.g., email), low volumeCritical work, high volume, long-running

Use BackgroundTasks for simple, best-effort work. Use Celery for mission-critical, high-volume work where reliability matters.

Key Takeaways

  • Use BackgroundTasks for simple, in-process work that should happen after the response.
  • Always handle exceptions in background tasks and log failures.
  • Use async functions for I/O-bound background work.
  • Use Celery for distributed, fault-tolerant task queues with automatic retries.
  • Monitor background task status in a database for visibility.

Frequently Asked Questions

What happens if a background task crashes?

With BackgroundTasks, the task fails silently (unless you log it). With Celery, the task is retried automatically. This is a key advantage of Celery for critical work.

Can background tasks access the database?

Yes. Create a new database session inside the background task (don't reuse the request's session). Use a dependency-like pattern or a context manager.

How do I pass complex objects to background tasks?

Celery serializes arguments to JSON. Complex objects must be JSON-serializable or converted to IDs. For example, pass user_id instead of a User object, then fetch the user inside the background task.

Can I cancel a queued background task?

With BackgroundTasks, no—once added, it runs. With Celery, yes—call task.revoke() to cancel a pending task.

How do I set a timeout for background tasks?

Celery allows time_limit. For BackgroundTasks, implement a timeout inside the function using asyncio.wait_for().

Further Reading