Async Sessions in SQLAlchemy: Transactions and Updates
An async SQLAlchemy session is a context that coordinates multiple queries into atomic, consistent operations. Without sessions, each query is isolated—if the second query fails, the first's changes are stuck. Sessions let you group INSERT, UPDATE, and DELETE operations, commit them as a unit, and rollback everything if an error occurs. This is essential for financial transactions, inventory updates, and any operation that shouldn't partially succeed.
This guide teaches you how to create and use async sessions, manage transactions explicitly, handle errors safely, and avoid common pitfalls like dangling sessions or implicit commits.
Async Sessions vs. Raw Connections
A session is a higher-level abstraction over a connection. Here's the difference:
| Aspect | Raw Connection | Session |
|---|---|---|
| Scope | Single query | Multiple coordinated queries |
| Auto-commit | No (manual commit()) | Yes (auto-flushes on query) |
| Error handling | Manual rollback | Auto-rollback on exception |
| Lazy loading | N/A | ORM only (avoid in async) |
| Identity map | N/A | Caches objects per session |
For transactional safety, always use sessions.
Creating an AsyncSession
First, create an async engine and session factory:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
import asyncio
# Engine: manages the connection pool
engine = create_async_engine(
'postgresql+asyncpg://postgres:password@localhost/postgres',
echo=False, # Set to True to see generated SQL
)
# Session factory: creates new sessions
async_session = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False # Don't reload objects after commit
)
async def main():
# Create and use a session
async with async_session() as session:
# Queries go here
pass
# Session auto-closes here, returning the connection to the pool
await engine.dispose()
asyncio.run(main())
Key parameters:
expire_on_commit=False: By default, session reloads objects after commit. For async code, disable this to avoid extra queries.echo=True: Prints every SQL statement (helpful for debugging, disable in production).
A Single Transaction: Insert, Update, Commit
Here's a basic transaction—update inventory when an order is placed:
from sqlalchemy import update
async def place_order(product_id: int, quantity: int):
"""Decrement product stock and record the order."""
async with async_session() as session:
try:
# Step 1: Check current stock
product = await session.get(Product, product_id)
if product.stock < quantity:
raise ValueError(f"Insufficient stock. Available: {product.stock}")
# Step 2: Decrement stock
product.stock -= quantity
# Step 3: Create order record
order = Order(product_id=product_id, quantity=quantity, status='pending')
session.add(order)
# Step 4: Commit all changes atomically
await session.commit()
print(f"Order placed. New stock: {product.stock}")
except Exception as e:
# Rollback on any error
await session.rollback()
print(f"Order failed: {e}")
raise
asyncio.run(place_order(1, 5))
What happens:
- The session starts a transaction (implicit).
product.stock -= quantitymodifies the object in memory (not in the database yet).session.add(order)queues the insertion.await session.commit()sends all changes to the database in a single statement (or multiple statements, but atomically).- If any error occurs before commit,
await session.rollback()reverts all changes.
Explicit Transaction Blocks
For fine-grained control, use session.begin() to explicitly start a transaction:
async def transfer_funds(from_account_id: int, to_account_id: int, amount: int):
"""Transfer funds between accounts atomically."""
async with async_session() as session:
async with session.begin():
# Inside this block, all operations are part of a single transaction
# Fetch both accounts
from_account = await session.get(Account, from_account_id)
to_account = await session.get(Account, to_account_id)
# Validate
if from_account.balance < amount:
raise ValueError("Insufficient funds")
# Update both
from_account.balance -= amount
to_account.balance += amount
# Commit happens automatically when exiting the block
# (or rollback if an exception occurs)
asyncio.run(transfer_funds(1, 2, 100))
The async with session.begin() block automatically commits on success and rolls back on exception. This is cleaner than manual commit() and rollback() calls.
Savepoints for Nested Rollbacks
In complex workflows, you might want to rollback only part of a transaction. Savepoints let you revert to a midpoint:
async def complex_workflow():
"""Demonstrate savepoint usage."""
async with async_session() as session:
async with session.begin():
# Main transaction starts
user = User(name='Alice')
session.add(user)
await session.flush() # Flush to get the user ID without committing
try:
# Savepoint 1
savepoint = await session.begin_nested()
# Some operation that might fail
profile = UserProfile(user_id=user.id, bio='...')
session.add(profile)
if not validate_profile(profile):
await savepoint.rollback() # Revert just this part
print("Profile invalid, using default instead")
else:
await savepoint.commit()
except Exception as e:
# If anything fails, the entire transaction rolls back
print(f"Critical error: {e}")
raise
# Main transaction commits here
Savepoints are useful for optional operations within a larger transaction. If the optional part fails, the main transaction continues.
Refresh Objects After Commit
Sometimes you need the server-generated values (e.g., auto-increment ID) after insertion. Use session.refresh():
async def create_user_and_return_id(name: str):
"""Create a user and return the auto-generated ID."""
async with async_session() as session:
user = User(name=name)
session.add(user)
await session.flush() # Inserts and returns the ID
# At this point, user.id is set by the database
print(f"Created user with ID: {user.id}")
await session.commit()
return user.id
.flush() sends pending changes to the database without committing. Use it when you need generated values (IDs, defaults) mid-transaction.
Handling Connection Errors with Retry Logic
Database connections can fail. Wrap transactional code in a retry loop:
import asyncio
import asyncpg
async def transactional_with_retry(async_func, max_retries=3):
"""Retry a transactional function if it fails."""
for attempt in range(max_retries):
try:
return await async_func()
except asyncpg.TooManyConnectionsError as e:
if attempt == max_retries - 1:
raise
wait_time = 2 ** attempt # Exponential backoff
print(f"Connection failed, retrying in {wait_time}s...")
await asyncio.sleep(wait_time)
except Exception as e:
print(f"Non-retryable error: {e}")
raise
async def my_transaction():
"""Some transactional operation."""
async with async_session() as session:
async with session.begin():
user = User(name='Bob')
session.add(user)
# ... more operations ...
asyncio.run(transactional_with_retry(my_transaction))
Retryable errors: connection pool exhausted, temporary network issues, transaction conflicts (in PostgreSQL). Non-retryable: integrity constraint violations, invalid data.
Multiple Sessions for Concurrent Workflows
Sessions are thread-safe within async contexts. Run independent transactions concurrently:
async def process_orders(order_ids: list):
"""Process multiple orders concurrently."""
async def process_one(order_id):
async with async_session() as session:
async with session.begin():
order = await session.get(Order, order_id)
order.status = 'processing'
# ... fulfill order ...
# Run all orders concurrently
await asyncio.gather(*[process_one(oid) for oid in order_ids])
asyncio.run(process_orders([1, 2, 3, 4, 5]))
Each concurrent task gets its own session and connection from the pool. They don't interfere because they operate on independent rows.
Key Takeaways
- Sessions coordinate multiple queries into atomic transactions.
- Always use
async with async_session()to ensure cleanup. await session.commit()sends all pending changes to the database.await session.rollback()reverts changes if an error occurs (do this explicitly or useasync with session.begin()for auto-rollback).- Savepoints allow partial rollbacks within a transaction.
.flush()sends changes without committing, useful for retrieving generated values.- Wrap transactions in retry loops to handle transient database errors.
Frequently Asked Questions
What's the difference between .flush() and .commit()?
.flush() sends changes to the database but doesn't commit the transaction. The changes are sent in SQL but can still be rolled back. .commit() persists all changes and closes the transaction. Use .flush() when you need generated values (IDs) mid-transaction.
Can I run transactions in parallel?
Yes, each async with async_session() block creates a new session with its own connection. Multiple sessions can run concurrently without interfering (assuming they modify different rows). For overlapping row modifications, PostgreSQL's MVCC handles isolation.
Do I need to call .rollback() explicitly?
No. If an exception occurs and you exit the session or transaction block, SQLAlchemy auto-rolls back. Explicit rollback() is only needed if you catch an exception and want to continue in the same transaction.
What isolation level should I use?
PostgreSQL defaults to READ COMMITTED, which is fine for most applications. For stricter isolation (e.g., no dirty reads), use SERIALIZABLE. Set it in SQLAlchemy with isolation_level='SERIALIZABLE' on the engine.