Running Concurrent Tasks: `asyncio.gather()`
We've seen how to create individual tasks with asyncio.create_task() and then await them. This is a great way to start multiple background jobs. However, it can be a bit verbose if you just want to run a list of coroutines concurrently and wait for all of them to finish.
For this common use case, asyncio provides a powerful, high-level function: asyncio.gather().
📚 Prerequisites
You should be comfortable with async/await and the concept of creating tasks to achieve concurrency.
🎯 Article Outline: What You'll Master
In this article, you will learn:
- ✅ What
asyncio.gather()Is: Understand its role as a tool to run multiple awaitable objects concurrently. - ✅ How to Use
gather(): See the simple syntax for running a group of coroutines. - ✅ Collecting Results: Learn how
gather()conveniently collects the return values of all the coroutines into a list. - ✅ Error Handling: How to use the
return_exceptionsparameter to handle cases where one of the concurrent tasks might fail.
🧠 Section 1: From Manual await to gather()
Let's imagine we want to fetch data from three different API endpoints. Using the create_task pattern, the code would look like this:
async def main_manual():
task1 = asyncio.create_task(fetch_from_api("endpoint1"))
task2 = asyncio.create_task(fetch_from_api("endpoint2"))
task3 = asyncio.create_task(fetch_from_api("endpoint3"))
# Now we have to wait for them all
await task1
await task2
await task3
This works, but it's repetitive. asyncio.gather() simplifies this significantly.
gather() takes one or more awaitable objects (like coroutines or tasks) as arguments, runs them all concurrently, and waits for them all to complete.
💻 Section 2: Using asyncio.gather()
Let's rewrite our API fetching example using gather().
import asyncio
import time
async def fetch_from_api(endpoint: str, delay: float):
"""Simulates fetching data from a slow API."""
print(f"Fetching from {endpoint}...")
await asyncio.sleep(delay)
result = {"endpoint": endpoint, "status": "success"}
print(f"Finished fetching from {endpoint}")
return result
async def main():
start_time = time.time()
# gather() takes the coroutines and runs them concurrently.
# It waits for all of them to finish.
all_results = await asyncio.gather(
fetch_from_api("users", 1),
fetch_from_api("products", 2),
fetch_from_api("orders", 1.5)
)
end_time = time.time()
print(f"\nAll tasks finished in {end_time - start_time:.2f} seconds.")
print("--- Results ---")
print(all_results)
asyncio.run(main())
Output:
Fetching from users...
Fetching from products...
Fetching from orders...
Finished fetching from users
Finished fetching from orders
Finished fetching from products
All tasks finished in 2.00 seconds.
--- Results ---
[{'endpoint': 'users', 'status': 'success'}, {'endpoint': 'products', 'status': 'success'}, {'endpoint': 'orders', 'status': 'success'}]
How it works:
gather()takes our three coroutine objects.- It automatically schedules them as tasks on the event loop.
- The total execution time is determined by the longest running task (2 seconds), not the sum of all tasks.
- Crucially,
gather()collects the return values from each coroutine into a list. The order of the results in the list matches the order in which the coroutines were passed togather().
🛠️ Section 3: Handling Exceptions with gather()
By default, if any of the tasks passed to gather() raises an exception, gather() will immediately propagate that exception to the caller, and the other tasks will continue to run in the background until they finish.
However, you often want to continue processing the successful results even if one task fails. You can do this by setting return_exceptions=True.
async def fetch_or_fail(endpoint: str, delay: float, should_fail: bool = False):
"""A coroutine that might fail."""
print(f"Fetching from {endpoint}...")
await asyncio.sleep(delay)
if should_fail:
raise ValueError(f"Failed to fetch from {endpoint}")
return {"endpoint": endpoint, "status": "success"}
async def main_with_errors():
print("Running tasks with potential errors...")
results = await asyncio.gather(
fetch_or_fail("users", 1),
fetch_or_fail("products", 2, should_fail=True),
fetch_or_fail("orders", 1.5),
return_exceptions=True # <-- The important part
)
print("\n--- All tasks complete ---")
for res in results:
if isinstance(res, Exception):
print(f"A task failed: {res}")
else:
print(f"A task succeeded: {res}")
asyncio.run(main_with_errors())
Output:
Running tasks with potential errors...
Fetching from users...
Fetching from products...
Fetching from orders...
--- All tasks complete ---
A task succeeded: {'endpoint': 'users', 'status': 'success'}
A task failed: Failed to fetch from products
A task succeeded: {'endpoint': 'orders', 'status': 'success'}
When return_exceptions is True, instead of raising the ValueError, gather() catches it and places the exception object itself into the results list. This allows your program to handle failures gracefully while still collecting the results from the successful tasks.
✨ Conclusion & Key Takeaways
asyncio.gather() is a high-level, convenient tool for a very common asynchronous pattern: running a group of tasks concurrently and waiting for all of them to complete.
Let's summarize the key takeaways:
asyncio.gather()runs multiple awaitable objects concurrently.- It simplifies concurrency: It's a more concise alternative to manually creating and awaiting a list of tasks.
- It collects results: The return value of
gather()is a list containing the results of all the completed coroutines, in the order they were passed in. - Use
return_exceptions=Trueto prevent one failed task from stopping your program and to handle errors gracefully.
➡️ Next Steps
You now know how to run many tasks at once. But how do you handle errors that might occur within those tasks? In the next article, we'll explore "Error Handling in Async Methods" to make our concurrent applications even more robust.
Happy gathering!