Discord Bot Scheduled Tasks in Python: Background Jobs
Scheduled tasks run code at specific times or intervals without user input. Examples include sending a daily reminder at 9 AM, resetting user scores at midnight weekly, or cleaning up old database entries every hour. The APScheduler library schedules Python functions with cron-like syntax; discord.py's tasks module offers a lightweight alternative using decorators. Scheduled tasks enable 24/7 automation, demonstrating event scheduling, background execution, and reliable job management—patterns used in enterprise systems.
Simple Background Tasks with discord.py tasks
The simplest approach uses discord.py's built-in task scheduler:
import discord
from discord.ext import commands, tasks
import os
from dotenv import load_dotenv
load_dotenv()
bot = commands.Bot(command_prefix='!', intents=discord.Intents.default())
# Define a task: runs every 60 seconds
@tasks.loop(seconds=60)
async def background_task():
"""Run every 60 seconds."""
print('Background task running...')
# Task to run daily at 9 AM
@tasks.loop(hours=24)
async def daily_reminder():
"""Send a daily reminder message at 9 AM UTC."""
# Get the target channel
channel = bot.get_channel(CHANNEL_ID) # Replace with your channel ID
if channel:
embed = discord.Embed(
title='Daily Reminder',
description='Time to check in on your goals!',
color=discord.Color.blue()
)
await channel.send(embed=embed)
# Start tasks when the bot is ready
@bot.event
async def on_ready():
print(f'{bot.user} is online')
background_task.start()
daily_reminder.start()
# Gracefully stop tasks when the bot shuts down
@bot.event
async def on_error(event, *args, **kwargs):
background_task.cancel()
daily_reminder.cancel()
bot.run(os.getenv('DISCORD_TOKEN'))
@tasks.loop() repeats a function at a fixed interval. Use seconds=60 for every minute, minutes=5 for every 5 minutes, hours=1 for hourly, or hours=24 for daily. Call .start() to begin the task and .cancel() to stop it. Tasks run asynchronously and don't block the bot's event loop.
Precise Timing with APScheduler
For more control (e.g., "run at 9:30 AM every Monday"), use APScheduler:
pip install apscheduler
import discord
from discord.ext import commands
from apscheduler.schedulers.asyncio import AsyncIOScheduler
import os
from dotenv import load_dotenv
from datetime import datetime
load_dotenv()
bot = commands.Bot(command_prefix='!', intents=discord.Intents.default())
# Create a scheduler
scheduler = AsyncIOScheduler()
async def send_weekly_report():
"""Send a report every Monday at 8 AM UTC."""
channel = bot.get_channel(CHANNEL_ID) # Replace with your channel ID
if channel:
embed = discord.Embed(
title='Weekly Report',
description='Here\'s this week\'s activity summary.',
color=discord.Color.green()
)
await channel.send(embed=embed)
async def cleanup_old_data():
"""Delete expired entries from the database every day at 2 AM UTC."""
from models import SessionLocal, UserProfile
from datetime import datetime, timedelta
session = SessionLocal()
try:
# Delete users who haven't interacted in 90 days
cutoff_date = datetime.utcnow() - timedelta(days=90)
session.query(UserProfile).filter(
UserProfile.last_active < cutoff_date
).delete()
session.commit()
print('Database cleanup completed.')
finally:
session.close()
@bot.event
async def on_ready():
print(f'{bot.user} is online')
# Schedule tasks
scheduler.add_job(
send_weekly_report,
'cron',
day_of_week='mon', # Monday
hour=8, # 8 AM
minute=0
)
scheduler.add_job(
cleanup_old_data,
'cron',
hour=2, # 2 AM
minute=0,
day='*' # Every day
)
# Start the scheduler if not already running
if not scheduler.running:
scheduler.start()
bot.run(os.getenv('DISCORD_TOKEN'))
scheduler.add_job() registers a coroutine. The 'cron' trigger uses cron syntax: hour=8, minute=0 is 8:00 AM; day_of_week='mon' is Monday; day='*' is every day. This is more flexible than @tasks.loop() for complex schedules.
User-Scheduled Tasks and Reminders
Let users schedule their own reminders:
from datetime import datetime, timedelta
import asyncio
@bot.command(name='remind')
async def remind(ctx, delay_minutes: int, *, message: str):
"""Schedule a personal reminder after N minutes."""
await ctx.send(f'Reminder set for {delay_minutes} minutes from now.')
# Convert to seconds
delay_seconds = delay_minutes * 60
# Wait and send reminder
await asyncio.sleep(delay_seconds)
embed = discord.Embed(
title='Your Reminder',
description=message,
color=discord.Color.yellow()
)
await ctx.author.send(embed=embed)
@bot.command(name='remindchannel')
async def remindchannel(ctx, delay_minutes: int, *, message: str):
"""Schedule a channel reminder after N minutes."""
await ctx.send(f'Reminder set for {delay_minutes} minutes.')
await asyncio.sleep(delay_minutes * 60)
embed = discord.Embed(
title='Channel Reminder',
description=message,
color=discord.Color.yellow()
)
await ctx.channel.send(embed=embed)
Use asyncio.sleep() to delay execution. This approach works for reminders up to a few hours; for longer periods or persistence across bot restarts, store reminders in a database and check them with a scheduled task.
Persistent Scheduled Jobs
For reminders that survive bot restarts, store them in the database:
# Add to models.py
from datetime import datetime
from sqlalchemy import Column, Integer, String, DateTime, Boolean
class Reminder(Base):
"""Store user reminders."""
__tablename__ = 'reminders'
id = Column(Integer, primary_key=True)
user_id = Column(Integer, nullable=False)
guild_id = Column(Integer, nullable=True)
channel_id = Column(Integer, nullable=True)
message = Column(String, nullable=False)
scheduled_time = Column(DateTime, nullable=False)
sent = Column(Boolean, default=False)
# In bot.py
from models import SessionLocal, Reminder
from tasks import tasks
@bot.command(name='scheduledremind')
async def scheduledremind(ctx, day: int, hour: int, minute: int, *, message: str):
"""Schedule a reminder for a specific date and time."""
from datetime import datetime, timedelta
# Create a reminder time
now = datetime.utcnow()
reminder_time = now.replace(day=day, hour=hour, minute=minute, second=0, microsecond=0)
# If the time has already passed today, schedule for next month
if reminder_time <= now:
if reminder_time.month == 12:
reminder_time = reminder_time.replace(year=reminder_time.year + 1, month=1)
else:
reminder_time = reminder_time.replace(month=reminder_time.month + 1)
session = SessionLocal()
try:
reminder = Reminder(
user_id=ctx.author.id,
guild_id=ctx.guild.id,
channel_id=ctx.channel.id,
message=message,
scheduled_time=reminder_time
)
session.add(reminder)
session.commit()
await ctx.send(f'Reminder scheduled for {reminder_time.strftime("%Y-%m-%d %H:%M")} UTC')
finally:
session.close()
# Background task to check for reminders
@tasks.loop(minutes=1)
async def check_reminders():
"""Check for due reminders every minute."""
from datetime import datetime
from models import SessionLocal, Reminder
session = SessionLocal()
try:
now = datetime.utcnow()
# Find reminders that are due
due_reminders = session.query(Reminder).filter(
Reminder.scheduled_time <= now,
Reminder.sent == False
).all()
for reminder in due_reminders:
try:
user = bot.get_user(reminder.user_id)
if user:
embed = discord.Embed(
title='Scheduled Reminder',
description=reminder.message,
color=discord.Color.orange()
)
await user.send(embed=embed)
# Mark as sent
reminder.sent = True
session.commit()
except Exception as e:
print(f'Failed to send reminder {reminder.id}: {e}')
finally:
session.close()
@bot.event
async def on_ready():
print(f'{bot.user} is online')
check_reminders.start()
This stores reminders in a database table. A background task checks every minute for due reminders and sends them. If the bot restarts, reminders persist and resume when it comes back online.
Controlling and Monitoring Tasks
Provide commands to manage running tasks:
@bot.command(name='taskstatus')
@commands.is_owner()
async def taskstatus(ctx):
"""Show status of background tasks (owner only)."""
embed = discord.Embed(title='Task Status', color=discord.Color.purple())
# Check discord.py tasks
if background_task.is_running():
embed.add_field(
name='background_task',
value=f'Running (iterations: {background_task.iterations})',
inline=False
)
else:
embed.add_field(
name='background_task',
value='Not running',
inline=False
)
# Check APScheduler jobs
if scheduler.running:
jobs = scheduler.get_jobs()
embed.add_field(
name='APScheduler',
value=f'Running ({len(jobs)} jobs)',
inline=False
)
for job in jobs:
embed.add_field(
name=job.name,
value=f'Next run: {job.next_run_time}',
inline=False
)
await ctx.send(embed=embed)
@bot.command(name='pause')
@commands.is_owner()
async def pause(ctx):
"""Pause all background tasks."""
background_task.cancel()
if scheduler.running:
scheduler.pause()
await ctx.send('Tasks paused.')
@bot.command(name='resume')
@commands.is_owner()
async def resume(ctx):
"""Resume all background tasks."""
background_task.start()
if not scheduler.running:
scheduler.resume()
await ctx.send('Tasks resumed.')
Only the bot owner can control tasks (@commands.is_owner()). This prevents accidental disruption.
| Method | Interval Control | Timing Precision | Use Case |
|---|---|---|---|
@tasks.loop() | Fixed (seconds/minutes/hours) | Low | Simple recurring tasks |
APScheduler | Cron syntax | High (exact time) | Complex schedules, specific times |
asyncio.sleep() | Manual | Low | Simple one-time delays |
| Database + loop | Flexible | High (database-driven) | User-defined, persistent reminders |
Key Takeaways
- Use
@tasks.loop()for simple recurring tasks with fixed intervals. - Use
APSchedulerfor complex schedules (e.g., "every Monday at 8 AM"). - Call
.start()on tasks inon_readyand.cancel()on shutdown. - For reminders that survive bot restarts, store them in a database and check with a scheduled task.
- Wrap task logic in try-except to prevent one failed task from breaking others.
- Provide commands to monitor and control running tasks (for debugging and maintenance).
Frequently Asked Questions
What happens if a scheduled task takes longer than its interval?
discord.py tasks queue: if a task takes 5 seconds and runs every 3 seconds, the next iteration waits until the current one finishes. APScheduler has similar behavior by default; use coalesce=True to skip missed runs.
Can I schedule a task that runs only once?
Yes, use scheduler.add_job(func, 'date', run_date=datetime(...)) with APScheduler, or use asyncio.sleep() for simple one-time delays.
How do I handle a task that fails?
Wrap the task in try-except. Log errors to a file or send them to a logging channel. Use APScheduler's misfire_grace_time to tolerate occasional delays.
Can tasks run in parallel?
Yes, all tasks run concurrently on the same event loop (cooperative multitasking). Truly CPU-intensive work should be offloaded to a thread pool with asyncio.to_thread().
How do I ensure tasks stop cleanly on bot shutdown?
Call .cancel() or scheduler.shutdown() in an error handler or shutdown event. Alternatively, wrap the bot.run() in a try-finally block to guarantee cleanup.