Async Database Python: Getting Started with asyncpg
Asyncpg is a PostgreSQL driver for Python that runs queries without blocking the event loop, letting your application serve hundreds of concurrent database requests. Unlike traditional drivers like psycopg2 that freeze your entire program while waiting for the server, asyncpg suspends execution, yields control to other tasks, and resumes when data arrives—all on a single thread.
This foundational guide teaches you how to connect to PostgreSQL asynchronously, fetch rows, and understand why async I/O matters for database programming. By the end, you'll write your first non-blocking query and see why large-scale applications depend on this pattern.
What is Async Database Programming?
Async database programming decouples query execution from thread blocking. Traditional code opens a connection, sends a query, and waits—the thread sits idle consuming memory. Async code sends the query, registers a callback, and the thread handles other tasks until the response arrives. A asyncpg is a pure-Python asyncio PostgreSQL client (not a wrapper around libpq) that implements this pattern natively.
For example, a web server handling 5,000 concurrent users with synchronous queries would need 5,000 threads and 10+ GB of RAM. The same server using asyncpg needs 50–100 threads and handles all 5,000 with 200 MB pools, because each thread can manage dozens of suspended queries simultaneously.
Installation and Prerequisites
Install asyncpg via pip. You'll also need PostgreSQL 9.2+ running locally or remotely.
pip install asyncpg
If you don't have PostgreSQL, Docker is quickest:
docker run --name postgres -e POSTGRES_PASSWORD=password -p 5432:5432 -d postgres:latest
Verify the connection works. Open psql or a Python shell and ensure you can reach the server:
psql -U postgres -h localhost -c "SELECT version();"
Your First Async Query
Here's a minimal asyncpg program. It connects, runs a query, and closes:
import asyncio
import asyncpg
async def fetch_version():
"""Connect to PostgreSQL and fetch the server version."""
conn = await asyncpg.connect('postgresql://postgres:password@localhost/postgres')
try:
version = await conn.fetchval('SELECT version()')
print(f"Server: {version}")
finally:
await conn.close()
# Run the async function
asyncio.run(fetch_version())
Key points:
await asyncpg.connect(...)pauses execution until the socket connects. The thread is not blocked—asyncio schedules other tasks.await conn.fetchval(...)fetches a single scalar value. Other methods:fetchrow()(one row as a record),fetch()(all rows as a list),execute()(run a statement, return nothing).conn.close()is async too—it flushes pending data and closes the socket cleanly.
Run it:
python script.py
Output: Server: PostgreSQL 15.2 on x86_64-pc-linux-gnu ...
Understanding the Asyncio Event Loop
When you call asyncio.run(fetch_version()), Python creates an event loop—a dispatcher that alternates between tasks. Here's what happens:
- The loop starts
fetch_version(). - The function hits
await asyncpg.connect(...). The loop suspendsfetch_version()and registers the socket with the OS. - While the OS waits for the network, the loop runs other tasks (or sleeps if none exist).
- The OS signals the socket is ready. The loop resumes
fetch_version(). - The connection is returned. The function runs
fetchval(), which again suspends. - Once the query result arrives,
fetch_version()resumes and prints the version. conn.close()suspends, then the function exits.
This interleaving—possible only because each I/O operation is async—lets thousands of queries run concurrently without threads. One thread, one event loop, many tasks.
Fetching Multiple Rows
Real queries return sets. Here's how to iterate:
import asyncio
import asyncpg
async def fetch_users():
"""Fetch multiple rows from a users table."""
conn = await asyncpg.connect('postgresql://postgres:password@localhost/postgres')
try:
# Create a sample table
await conn.execute('''
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
email TEXT
)
''')
# Insert sample data
await conn.execute(
"INSERT INTO users (name, email) VALUES ('Alice', '[email protected]')"
)
await conn.execute(
"INSERT INTO users (name, email) VALUES ('Bob', '[email protected]')"
)
# Fetch all rows
rows = await conn.fetch('SELECT id, name, email FROM users')
for row in rows:
print(f"ID: {row['id']}, Name: {row['name']}, Email: {row['email']}")
finally:
await conn.close()
asyncio.run(fetch_users())
Output:
ID: 1, Name: Alice, Email: [email protected]
ID: 2, Name: Bob, Email: [email protected]
The fetch() method returns a list of Record objects (dict-like). Access columns by name (row['name']) or index (row[1]).
Parameterized Queries and Safety
Never concatenate strings into SQL—it's vulnerable to injection. Use parameter placeholders:
import asyncio
import asyncpg
async def insert_user(name: str, email: str):
"""Insert a user safely using parameterized queries."""
conn = await asyncpg.connect('postgresql://postgres:password@localhost/postgres')
try:
# Use $1, $2 for positional parameters
result = await conn.execute(
'INSERT INTO users (name, email) VALUES ($1, $2)',
name, email
)
print(f"Inserted: {result}")
finally:
await conn.close()
asyncio.run(insert_user('Charlie', '[email protected]'))
Asyncpg uses $1, $2, ... placeholders (PostgreSQL native syntax, not %s). Parameters are passed as separate positional arguments. This prevents SQL injection and is faster than string formatting.
Running Multiple Queries Concurrently
Here's where async shines. Fetch data from multiple queries without waiting for each to finish:
import asyncio
import asyncpg
async def concurrent_queries():
"""Run two queries at the same time."""
conn = await asyncpg.connect('postgresql://postgres:password@localhost/postgres')
try:
# Create both queries as coroutines (not awaited yet)
task1 = conn.fetchval('SELECT COUNT(*) FROM users')
task2 = conn.fetchval('SELECT MAX(id) FROM users')
# Run both concurrently using asyncio.gather
count, max_id = await asyncio.gather(task1, task2)
print(f"Total users: {count}, Max ID: {max_id}")
finally:
await conn.close()
asyncio.run(concurrent_queries())
asyncio.gather() accepts multiple coroutines and runs them all before returning results. If each query takes 100 ms, running them sequentially takes 200 ms. Running them concurrently takes ~100 ms. This pattern is essential for scaling.
Key Takeaways
- Asyncpg is a pure-async PostgreSQL driver that doesn't block the event loop.
await asyncpg.connect(),await conn.fetch(), and other operations are async—they suspend execution without blocking the thread.- One event loop can handle thousands of concurrent queries because most time is spent waiting for the network, not computing.
- Use parameterized queries (
$1, $2) to prevent SQL injection. asyncio.gather()runs multiple queries concurrently, reducing total wait time.- Next step: learn connection pooling to reuse connections and eliminate the connect/close overhead.
Frequently Asked Questions
Is asyncpg faster than psycopg2?
Asyncpg is often 2–10× faster in high-concurrency benchmarks (100+ concurrent queries) because it doesn't require thread overhead. Psycopg2 (synchronous) is comparable on a single query in isolation. Asyncpg wins at scale.
Can I use asyncpg in a synchronous function?
No. Asyncpg requires an event loop. If you need to call it from sync code, wrap it: asyncio.run(my_async_function()). In a web framework like FastAPI, the framework provides the event loop automatically.
Does asyncpg support all PostgreSQL types?
Asyncpg supports all standard types: integers, strings, arrays, JSON, UUIDs, timestamps, ranges, and more. Custom types require custom codec setup. See the official docs for examples.
How do I close the connection properly?
Always use try/finally or async context managers: async with asyncpg.connect(...) as conn:. This ensures the connection closes even if an exception occurs, preventing resource leaks.