Skip to main content

Asyncio Performance: Profiling and Optimization Tips

Optimizing asyncio applications requires understanding where time is spent: in I/O waits, task switching overhead, or blocking code. This article covers profiling tools, identifying bottlenecks, and concrete optimization strategies that can reduce latency by 10x in real applications.

Understanding Asyncio Performance Metrics

Key metrics for async applications are event loop latency, task count, and I/O readiness. A healthy event loop iteration completes in microseconds; slowdowns indicate blocking code or OS contention.

import asyncio
import time

class AsyncioMetrics:
"""Simple metrics collector for event loop performance."""

def __init__(self):
self.callback_times = []

async def measure_loop_latency(self, iterations=1000):
"""Measure event loop iteration time."""
loop = asyncio.get_running_loop()

times = []
for _ in range(iterations):
start = loop.time()
await asyncio.sleep(0) # Yield to event loop
elapsed = (loop.time() - start) * 1000 # Convert to ms
times.append(elapsed)

avg = sum(times) / len(times)
p99 = sorted(times)[int(len(times) * 0.99)]

print(f"Event loop latency (ms):")
print(f" Average: {avg:.3f}")
print(f" P99: {p99:.3f}")
print(f" Min: {min(times):.3f}")
print(f" Max: {max(times):.3f}")

async def demo_metrics():
metrics = AsyncioMetrics()
await metrics.measure_loop_latency()

asyncio.run(demo_metrics())

Output (varies by system):

Event loop latency (ms):
Average: 0.021
P99: 0.045
Min: 0.015
Max: 0.089

Target: event loop latency under 1ms for responsive systems. Higher values indicate kernel scheduling delays or system load.

Profiling with py-spy

py-spy is a sampling profiler that doesn't require code changes. It shows where CPU time is spent:

# Install py-spy
pip install py-spy

# Profile script.py for 10 seconds
py-spy record -d 10 -o profile.svg -- python script.py

# View the SVG in a browser

A py-spy flame graph reveals if time is spent in I/O waits (thin bars), event loop overhead, or user code.

import asyncio
import time

async def io_bound_task(delay):
"""Simulates I/O: sleeping."""
for _ in range(10):
await asyncio.sleep(delay)
return "io done"

async def cpu_bound_task():
"""Simulates CPU: sync computation."""
total = 0
for i in range(1_000_000):
total += i
return total

async def mixed_workload():
"""Mix I/O and CPU tasks."""
async with asyncio.TaskGroup() as tg:
tg.create_task(io_bound_task(0.01))
tg.create_task(io_bound_task(0.01))
tg.create_task(cpu_bound_task()) # Blocks event loop!

# Profile this: py-spy record -o profile.svg -- python script.py
asyncio.run(mixed_workload())

Profiling this mixed workload shows cpu_bound_task consuming significant CPU time, blocking other tasks.

Using asyncio.current_task and Task Callbacks

Monitor active tasks and their execution time:

import asyncio

async def task_statistics():
"""Collect task statistics."""
all_tasks = asyncio.all_tasks()
print(f"Active tasks: {len(all_tasks)}")

for task in all_tasks:
print(f" - {task.get_name()}: done={task.done()}, cancelled={task.cancelled()}")

async def slow_task(name, delay):
"""A task with known delay."""
for i in range(3):
await asyncio.sleep(delay)

async def monitor_demo():
"""Run tasks and monitor them."""
async with asyncio.TaskGroup() as tg:
tg.create_task(slow_task("task1", 0.1), name="slow-1")
tg.create_task(slow_task("task2", 0.2), name="slow-2")

# Monitor concurrently
for _ in range(10):
await task_statistics()
await asyncio.sleep(0.05)

asyncio.run(monitor_demo())

This pattern helps identify long-running tasks and task proliferation (creating too many tasks).

Optimizing I/O Concurrency

Increase concurrency by using semaphores and task groups efficiently:

import asyncio
import time

async def fetch_with_semaphore(semaphore, url, delay):
"""Fetch URL with rate limiting."""
async with semaphore:
start = time.time()
await asyncio.sleep(delay) # Simulate fetch
elapsed = time.time() - start
return f"{url}: {elapsed:.2f}s"

async def concurrent_fetches_tuned():
"""Optimize concurrency with semaphores."""
urls = [f"url-{i}" for i in range(100)]

# Tune concurrency: too low = slow, too high = resource exhaustion
semaphore = asyncio.Semaphore(20) # 20 concurrent requests

start = time.time()
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(fetch_with_semaphore(semaphore, url, 0.1))
for url in urls
]

elapsed = time.time() - start
# 100 requests * 0.1s / 20 concurrent = 0.5s optimal
print(f"Concurrent fetches: {elapsed:.2f}s (optimal: ~0.5s)")

asyncio.run(concurrent_fetches_tuned())

Tuning the semaphore value is critical:

  • Too low: sequential bottleneck, slow throughput.
  • Too high: resource exhaustion (connections, file handles), slower.
  • Sweet spot: typically 5–50 for HTTP, depends on target server and available resources.

Avoiding Blocking Calls

Synchronous blocking calls freeze the entire event loop. Use async equivalents or run_in_executor():

import asyncio
import time

def blocking_io(delay):
"""Synchronous, blocking operation."""
time.sleep(delay) # Blocks event loop!
return f"done after {delay}s"

async def bad_pattern():
"""Don't do this: blocks event loop."""
# This freezes all other tasks for 2 seconds
result = blocking_io(2.0)
print(f"Result: {result}")

async def good_pattern():
"""Use run_in_executor() for blocking code."""
loop = asyncio.get_running_loop()
# Run blocking code in thread pool
result = await loop.run_in_executor(None, blocking_io, 2.0)
print(f"Result: {result}")

async def demo():
print("Bad (blocks event loop):")
start = time.time()
async with asyncio.TaskGroup() as tg:
tg.create_task(bad_pattern())
tg.create_task(asyncio.sleep(1)) # Waits 2s, not 1s
print(f"Elapsed: {time.time() - start:.2f}s\n")

print("Good (uses thread pool):")
start = time.time()
async with asyncio.TaskGroup() as tg:
tg.create_task(good_pattern())
tg.create_task(asyncio.sleep(1)) # Completes in 1s
print(f"Elapsed: {time.time() - start:.2f}s")

asyncio.run(demo())

Output:

Bad (blocks event loop):
Result: done after 2.0s
Elapsed: 2.00s

Good (uses thread pool):
Result: done after 2.0s
Elapsed: 2.00s (but sleep(1) completes concurrently)

run_in_executor() offloads blocking work to a thread pool, freeing the event loop to handle other tasks.

Batch Processing for Efficiency

Instead of creating one task per item, batch items to reduce overhead:

import asyncio

async def process_item(item):
"""Process one item."""
await asyncio.sleep(0.01)
return item * 2

async def unbatched_approach(items):
"""Create one task per item (overhead!)."""
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(process_item(i)) for i in items]

async def batched_approach(items, batch_size=100):
"""Process items in batches (less overhead)."""
async def process_batch(batch):
async with asyncio.TaskGroup() as tg:
for item in batch:
tg.create_task(process_item(item))

async with asyncio.TaskGroup() as tg:
for i in range(0, len(items), batch_size):
batch = items[i:i+batch_size]
tg.create_task(process_batch(batch))

async def demo_batching():
items = list(range(10_000))

print("Unbatched (10k tasks):")
start = asyncio.get_event_loop().time()
await unbatched_approach(items)
elapsed = asyncio.get_event_loop().time() - start
print(f" Elapsed: {elapsed:.3f}s")

print("\nBatched (100 tasks):")
start = asyncio.get_event_loop().time()
await batched_approach(items, batch_size=100)
elapsed = asyncio.get_event_loop().time() - start
print(f" Elapsed: {elapsed:.3f}s")

asyncio.run(demo_batching())

Batching reduces task creation and scheduling overhead, especially for large workloads. The tradeoff: slightly less fine-grained concurrency for better throughput.

Key Takeaways

  • Profile event loop latency with loop.time() and asyncio.sleep(0) to baseline system performance; target under 1ms.
  • Use py-spy to identify CPU hotspots and blocking code without modifying code.
  • Monitor active task count with asyncio.all_tasks(); explosive growth signals a leak.
  • Tune semaphore concurrency based on target resource limits; start with 10-20 for HTTP and adjust by load testing.
  • Replace blocking calls with loop.run_in_executor() to offload to thread pools.
  • Batch items to reduce task creation overhead, trading some concurrency for throughput.

Frequently Asked Questions

How do I know if my async code is CPU-bound?

Profile with py-spy. If the flame graph shows your code taking significant CPU time (not I/O waits), it's CPU-bound. Move it to run_in_executor() or multiprocessing.

What's the maximum safe number of concurrent tasks?

Depends on your OS and resources. Linux can handle thousands; Windows fewer. Test with your workload and monitor memory and open files. Start at 100-1000 and measure.

Can I limit total memory usage of asyncio tasks?

No built-in mechanism. Use resource limits (resource.setrlimit() on Unix) or explicit memory tracking (task size estimation) and reject new tasks if over limit.

How do I profile memory usage of asyncio code?

Use tracemalloc:

import tracemalloc
tracemalloc.start()
await my_async_function()
current, peak = tracemalloc.get_traced_memory()
print(f"Memory: {peak / 1024 / 1024:.1f} MB")

Is there a rule of thumb for semaphore concurrency?

For HTTP requests, start with connections = requests_per_second * average_latency. E.g., 100 req/s * 0.1s latency = 10 concurrent. Measure and adjust.

Further Reading