Background Tasks FastAPI: Async Job Queue
Background tasks are work that happens outside the request/response cycle. When a user uploads a file, you validate it instantly but resize and compress it later. When they reset their password, you return success immediately but email them the reset link asynchronously. FastAPI provides two mechanisms: BackgroundTasks for simple, in-process jobs, and Celery for distributed, fault-tolerant queues. This guide covers both, showing when to use each and how to avoid common pitfalls like missing error handling or hanging tasks.
I've debugged production incidents caused by crashed background tasks left silently failing. This article teaches you patterns that keep background work reliable.
FastAPI BackgroundTasks: In-Process Jobs
For lightweight tasks that don't require persistence or retry logic, BackgroundTasks queues work to run after the response is sent:
from fastapi import FastAPI, BackgroundTasks
import smtplib
from email.mime.text import MIMEText
app = FastAPI()
def send_email_background(email: str, message: str):
"""Send email without blocking the HTTP response."""
try:
msg = MIMEText(message)
msg["Subject"] = "Password Reset"
msg["From"] = "[email protected]"
msg["To"] = email
with smtplib.SMTP("smtp.example.com", 587) as server:
server.starttls()
server.login("[email protected]", "password")
server.sendmail("[email protected]", [email], msg.as_string())
logger.info(f"Email sent to {email}")
except Exception as e:
logger.error(f"Failed to send email to {email}: {e}")
@app.post("/users/reset-password")
async def reset_password(email: str, background_tasks: BackgroundTasks):
"""Initiate password reset; send email in background."""
# Validate email and generate reset token (synchronously)
reset_token = generate_reset_token(email)
save_reset_token(email, reset_token)
# Queue email to send after response
background_tasks.add_task(
send_email_background,
email=email,
message=f"Click here to reset: https://example.com/reset/{reset_token}"
)
return {"status": "reset email sent"}
The background_tasks.add_task() call queues the function. FastAPI waits for the route to finish, sends the HTTP response, then runs send_email_background() in the background. The client gets a 200 response regardless of whether the email succeeds.
Async Background Tasks
For I/O-bound background work, use async functions:
import aiohttp
import asyncio
async def process_image_async(image_path: str, output_path: str):
"""Resize and compress image asynchronously."""
async with aiohttp.ClientSession() as session:
# Call external image processing service
async with session.post(
"https://imageapi.example.com/compress",
json={"input": image_path, "output": output_path}
) as resp:
result = await resp.json()
logger.info(f"Image processed: {result}")
@app.post("/upload")
async def upload_image(
file: UploadFile,
background_tasks: BackgroundTasks
):
"""Upload image; process in background."""
file_path = f"/tmp/{file.filename}"
# Save file (simple sync operation)
with open(file_path, "wb") as f:
f.write(await file.read())
# Queue async background task
output_path = f"/tmp/processed_{file.filename}"
background_tasks.add_task(
process_image_async,
image_path=file_path,
output_path=output_path
)
return {"filename": file.filename, "status": "processing"}
FastAPI's event loop schedules the async task. The background function runs concurrently with future requests.
Error Handling in Background Tasks
Always wrap background tasks in try/except and log failures:
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
def sync_external_database(user_id: int, retry_count: int = 0):
"""Sync user data to external system; log failures."""
max_retries = 3
try:
external_api.sync_user(user_id)
logger.info(f"Synced user {user_id}")
except requests.Timeout:
if retry_count < max_retries:
# Retry after delay (in production, use a task queue)
import time
time.sleep(2 ** retry_count) # Exponential backoff
sync_external_database(user_id, retry_count + 1)
else:
logger.error(f"Failed to sync user {user_id} after {max_retries} retries")
# Send alert to ops
send_alert(f"User sync failed: {user_id}")
except Exception as e:
logger.error(f"Unexpected error syncing user {user_id}: {e}", exc_info=True)
@app.post("/users")
async def create_user(user: UserCreate, background_tasks: BackgroundTasks):
"""Create user and sync to external system."""
db_user = db.add_and_commit(user)
# Queue sync with error handling
background_tasks.add_task(sync_external_database, user_id=db_user.id)
return db_user
Without error handling, background tasks fail silently. Your users think everything succeeded, but the external sync never happened.
Celery for Distributed Task Queues
For mission-critical work or long-running tasks, use Celery. It's a distributed task queue that persists jobs to a message broker (Redis, RabbitMQ) and retries on failure:
from celery import Celery
from celery.exceptions import MaxRetriesExceededError
import smtplib
celery_app = Celery(
"tasks",
broker="redis://localhost:6379",
backend="redis://localhost:6379"
)
@celery_app.task(bind=True, max_retries=3)
def send_email_celery(self, email: str, subject: str, body: str):
"""Send email with automatic retries."""
try:
msg = MIMEText(body)
msg["Subject"] = subject
msg["From"] = "[email protected]"
msg["To"] = email
with smtplib.SMTP("smtp.example.com", 587) as server:
server.starttls()
server.login("[email protected]", "password")
server.sendmail("[email protected]", [email], msg.as_string())
logger.info(f"Email sent to {email}")
except smtplib.SMTPException as e:
# Retry with exponential backoff
logger.warning(f"Retrying email to {email}: {e}")
raise self.retry(exc=e, countdown=2 ** self.request.retries)
except MaxRetriesExceededError:
logger.error(f"Failed to send email to {email} after max retries")
# In your FastAPI route
from fastapi import FastAPI
app = FastAPI()
@app.post("/users/reset-password")
async def reset_password(email: str):
"""Queue email with Celery."""
reset_token = generate_reset_token(email)
# Queue Celery task (persisted to Redis)
send_email_celery.delay(
email=email,
subject="Password Reset",
body=f"Click to reset: https://example.com/reset/{reset_token}"
)
return {"status": "email queued"}
Celery tasks are persisted to Redis. If the worker crashes mid-task, Redis replays it when the worker restarts. The max_retries=3 ensures failed tasks are retried automatically.
Monitoring Background Tasks
Track background task status for visibility:
from enum import Enum
from sqlalchemy import Column, String, DateTime
from datetime import datetime
class TaskStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
class BackgroundTaskRecord(Base):
__tablename__ = "background_tasks"
id = Column(Integer, primary_key=True)
task_name = Column(String)
status = Column(String, default=TaskStatus.PENDING)
result = Column(String, nullable=True)
error = Column(String, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
completed_at = Column(DateTime, nullable=True)
def wrap_background_task(task_name: str, task_func, *args, **kwargs):
"""Wrapper to record task status in database."""
task_record = db.query(BackgroundTaskRecord).filter_by(
task_name=task_name
).first()
if not task_record:
task_record = BackgroundTaskRecord(task_name=task_name)
db.add(task_record)
db.commit()
task_record.status = TaskStatus.RUNNING
db.commit()
try:
result = task_func(*args, **kwargs)
task_record.status = TaskStatus.COMPLETED
task_record.result = str(result)
db.commit()
return result
except Exception as e:
task_record.status = TaskStatus.FAILED
task_record.error = str(e)
db.commit()
raise
@app.post("/users")
async def create_user(user: UserCreate, background_tasks: BackgroundTasks):
db_user = db.add_and_commit(user)
background_tasks.add_task(
wrap_background_task,
task_name="send_welcome_email",
task_func=send_email,
email=user.email
)
return db_user
@app.get("/tasks/{task_id}")
async def get_task_status(task_id: int):
task = db.query(BackgroundTaskRecord).get(task_id)
return task
You can now query task status and see failures without digging through logs.
Choosing Between BackgroundTasks and Celery
| Feature | BackgroundTasks | Celery |
|---|---|---|
| Persistence | No (in-process) | Yes (to broker) |
| Retries | Manual | Automatic |
| Scalability | Single process | Distributed workers |
| Setup | Zero | Requires Redis/RabbitMQ |
| Best for | Quick tasks (e.g., email), low volume | Critical work, high volume, long-running |
Use BackgroundTasks for simple, best-effort work. Use Celery for mission-critical, high-volume work where reliability matters.
Key Takeaways
- Use
BackgroundTasksfor simple, in-process work that should happen after the response. - Always handle exceptions in background tasks and log failures.
- Use async functions for I/O-bound background work.
- Use Celery for distributed, fault-tolerant task queues with automatic retries.
- Monitor background task status in a database for visibility.
Frequently Asked Questions
What happens if a background task crashes?
With BackgroundTasks, the task fails silently (unless you log it). With Celery, the task is retried automatically. This is a key advantage of Celery for critical work.
Can background tasks access the database?
Yes. Create a new database session inside the background task (don't reuse the request's session). Use a dependency-like pattern or a context manager.
How do I pass complex objects to background tasks?
Celery serializes arguments to JSON. Complex objects must be JSON-serializable or converted to IDs. For example, pass user_id instead of a User object, then fetch the user inside the background task.
Can I cancel a queued background task?
With BackgroundTasks, no—once added, it runs. With Celery, yes—call task.revoke() to cancel a pending task.
How do I set a timeout for background tasks?
Celery allows time_limit. For BackgroundTasks, implement a timeout inside the function using asyncio.wait_for().