Advanced Alembic Patterns: Custom Scripts and Constraints
Once you're comfortable with basic migrations, you'll encounter edge cases: adding check constraints, creating partial indexes for performance, defining custom SQL functions, or orchestrating multi-step transformations that span multiple migrations. Alembic's op object supports these advanced scenarios through raw SQL, conditional execution, and transaction control.
Check Constraints: Enforce Business Rules at the Database
Check constraints ensure data validity without application-level checks. For example, enforce that age must be positive:
# migrations/versions/011_add_age_check_constraint.py
def upgrade():
op.create_check_constraint(
'check_age_positive',
'users',
'age > 0'
)
def downgrade():
op.drop_constraint('check_age_positive', 'users', type_='check')
Another example: ensure status is one of valid values:
def upgrade():
op.create_check_constraint(
'check_status_valid',
'orders',
"status IN ('pending', 'processing', 'shipped', 'delivered', 'cancelled')"
)
def downgrade():
op.drop_constraint('check_status_valid', 'orders', type_='check')
However, adding a check constraint to an existing table with invalid data fails. Handle it:
def upgrade():
# Step 1: Clean data (set invalid statuses to a default)
op.execute("""
UPDATE orders
SET status = 'pending'
WHERE status NOT IN ('pending', 'processing', 'shipped', 'delivered', 'cancelled')
""")
# Step 2: Add the constraint
op.create_check_constraint(
'check_status_valid',
'orders',
"status IN ('pending', 'processing', 'shipped', 'delivered', 'cancelled')"
)
def downgrade():
op.drop_constraint('check_status_valid', 'orders', type_='check')
Partial Indexes: Query Optimization Without Modifying Schema
A partial index indexes only rows matching a condition, saving disk space:
# migrations/versions/012_add_partial_index_active_users.py
def upgrade():
# Index only active users (faster for filtering active users)
op.create_index(
'idx_users_email_active',
'users',
['email'],
postgresql_where='active = true' # PostgreSQL syntax for partial index
)
def downgrade():
op.drop_index('idx_users_email_active', table_name='users')
This is useful for filtering soft-deleted records (where is_deleted = false):
def upgrade():
op.create_index(
'idx_posts_created_at_active',
'posts',
['created_at'],
postgresql_where='is_deleted = false'
)
Note: SQLite and MySQL have different syntax for partial indexes. Use conditional logic:
from alembic import context
def upgrade():
ctx = context.get_context()
dialect = ctx.dialect.name
if dialect == 'postgresql':
op.execute(
"CREATE INDEX idx_users_active ON users(id) WHERE active = true"
)
elif dialect == 'mysql':
# MySQL uses generated columns for this
op.create_index('idx_users_active', 'users', ['id'])
Creating Custom SQL Functions
Store complex logic in the database as a function:
# migrations/versions/013_add_calculate_age_function.py
def upgrade():
op.execute("""
CREATE FUNCTION calculate_age(birth_date DATE)
RETURNS INTEGER AS $$
BEGIN
RETURN EXTRACT(YEAR FROM AGE(birth_date));
END;
$$ LANGUAGE plpgsql;
""")
def downgrade():
op.execute("DROP FUNCTION IF EXISTS calculate_age(DATE)")
Then use the function in your app or in other migrations:
# Another migration that uses the function
def upgrade():
op.execute("""
UPDATE users
SET age = calculate_age(birth_date)
WHERE birth_date IS NOT NULL
""")
Composite Indexes for Multi-Column Queries
Index multiple columns together for queries that filter on both:
# migrations/versions/014_add_composite_index.py
def upgrade():
# For queries like: SELECT * FROM posts WHERE user_id = ? AND created_at > ?
op.create_index(
'idx_posts_user_created',
'posts',
['user_id', 'created_at']
)
def downgrade():
op.drop_index('idx_posts_user_created', table_name='posts')
The column order matters: put the most selective column first (the one that filters most rows).
Handling Concurrent Migrations
For large tables, adding an index locks the table. PostgreSQL supports concurrent index creation:
# migrations/versions/015_add_index_concurrently.py
from alembic import op, context
def upgrade():
ctx = context.get_context()
if ctx.dialect.name == 'postgresql':
# Concurrent index creation doesn't lock the table
op.execute(
"CREATE INDEX CONCURRENTLY idx_users_email ON users(email)"
)
else:
# Fallback for other databases (may lock table briefly)
op.create_index('idx_users_email', 'users', ['email'])
def downgrade():
op.drop_index('idx_users_email', table_name='users')
Batch Operations: Insert/Update Large Volumes
For bulk data changes, process in batches to avoid locking:
# migrations/versions/016_backfill_computed_field.py
from sqlalchemy import text
def upgrade():
connection = op.get_bind()
# Add the column
op.add_column('users', sa.Column('username_lower', sa.String(50)))
# Process in batches of 1000
batch_size = 1000
offset = 0
while True:
# Fetch a batch of user IDs without the column set
result = connection.execute(text("""
SELECT id FROM users
WHERE username_lower IS NULL
ORDER BY id
LIMIT :limit OFFSET :offset
"""), {'limit': batch_size, 'offset': offset})
ids = [row[0] for row in result.fetchall()]
if not ids:
break
# Update this batch
placeholders = ','.join(str(id_) for id_ in ids)
connection.execute(text(f"""
UPDATE users
SET username_lower = LOWER(username)
WHERE id IN ({placeholders})
"""))
offset += batch_size
print(f"Processed {offset} rows")
def downgrade():
op.drop_column('users', 'username_lower')
Event Triggers and Cascade Behavior
Define cascade rules for foreign keys:
# migrations/versions/017_add_user_posts_fk_cascade.py
def upgrade():
op.create_foreign_key(
'fk_posts_user_cascade',
'posts',
'users',
['user_id'],
['id'],
ondelete='CASCADE', # Delete posts when user is deleted
onupdate='CASCADE' # Update posts if user ID changes
)
def downgrade():
op.drop_constraint('fk_posts_user_cascade', 'posts')
Conditional Migrations Based on Environment
Different environments may need different schemas:
# migrations/versions/018_add_audit_logging.py
import os
from alembic import context
def upgrade():
# Only add audit logging in production
if os.getenv('ENVIRONMENT') == 'production':
op.create_table(
'audit_logs',
sa.Column('id', sa.Integer(), primary_key=True),
sa.Column('table_name', sa.String(100)),
sa.Column('operation', sa.String(10)), # INSERT, UPDATE, DELETE
sa.Column('changed_values', sa.JSON()),
sa.Column('created_at', sa.DateTime(), default=sa.func.now())
)
def downgrade():
if os.getenv('ENVIRONMENT') == 'production':
op.drop_table('audit_logs')
Handling Migration-Safe Column Renames
Renaming a column in a live system is risky (breaks application code referencing the old name). A safer approach:
- Add the new column
- Update code to write to both columns
- Backfill old column values to new column
- Update code to read from new column
- Drop the old column
# Step 1: Add new column (migration)
# migrations/versions/019_add_user_full_name.py
def upgrade():
op.add_column('users', sa.Column('full_name', sa.String(100)))
# Don't populate yet; let the app do it gradually
def downgrade():
op.drop_column('users', 'full_name')
# Step 2: Update code
# app/models.py
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(100)) # Old
full_name = Column(String(100)) # New
def __init__(self, **kwargs):
super().__init__(**kwargs)
# Write to both columns for now
if self.full_name and not self.name:
self.name = self.full_name
if self.name and not self.full_name:
self.full_name = self.name
# Step 3: Backfill old data (migration)
# migrations/versions/020_backfill_full_name.py
def upgrade():
op.execute("UPDATE users SET full_name = name WHERE full_name IS NULL")
def downgrade():
pass # Don't delete data; just stop writing to full_name in code
# Step 4: Update code to read from new column only
# Later, after the app is confident:
# Step 5: Drop old column (migration)
# migrations/versions/021_drop_user_name.py
def upgrade():
op.drop_column('users', 'name')
def downgrade():
op.add_column('users', sa.Column('name', sa.String(100)))
This approach lets you rename columns safely without breaking deployments.
Performance Monitoring During Migrations
Large migrations can take time. Monitor progress:
# migrations/versions/022_slow_backfill.py
from sqlalchemy import text
import time
def upgrade():
connection = op.get_bind()
op.add_column('users', sa.Column('status', sa.String(20)))
batch_size = 5000
offset = 0
start_time = time.time()
while True:
result = connection.execute(text("""
SELECT COUNT(*) FROM users WHERE status IS NULL
"""))
remaining = result.scalar()
if remaining == 0:
break
# Update a batch
connection.execute(text(f"""
UPDATE users
SET status = 'active'
WHERE id IN (SELECT id FROM users WHERE status IS NULL LIMIT :limit)
"""), {'limit': batch_size})
elapsed = time.time() - start_time
processed = offset * batch_size
rate = processed / elapsed if elapsed > 0 else 0
print(f"Progress: {processed} rows ({rate:.0f} rows/sec), "
f"{remaining} remaining, ETA: {remaining / rate:.0f}s")
offset += 1
def downgrade():
op.drop_column('users', 'status')
Key Takeaways
- Use
op.create_check_constraint()to enforce data validity at the database level - Create partial indexes with
postgresql_whereto optimize queries without modifying schema - Store complex logic in custom SQL functions with
op.execute() - Use composite indexes (multiple columns) for multi-column queries
- For large tables, process migrations in batches to avoid locking
- Use PostgreSQL's concurrent index creation for minimal downtime
- Safely rename columns with a multi-step process: add new, backfill, switch code, drop old
- Handle environment-specific migrations with conditional logic in upgrade/downgrade
Frequently Asked Questions
Can I add a check constraint to a column with invalid data?
No. The migration fails if existing data violates the constraint. First, clean the data (update invalid rows to valid values), then add the constraint. Alternatively, create the constraint as DEFERRABLE INITIALLY DEFERRED (PostgreSQL) so it's checked at transaction commit, giving you time to fix data.
How do I safely rename a table?
Use op.rename_table('old_name', 'new_name'). However, this breaks application code referencing the old table name. Better: keep the old table, create a view with the old name that selects from the new table, then migrate code gradually. Drop the view once all code is updated.
What if a migration takes hours to run?
For very large tables, consider: (1) processing in smaller batches over time, (2) using background workers to spread the load, (3) running the migration during maintenance windows, (4) using materialized views instead of computed columns. Always test on production-sized data first to estimate duration.
Can I run migrations in parallel?
No. Alembic enforces a linear, sequential chain. If you need parallel workloads, orchestrate them outside Alembic using your application code or job queues. Migrations themselves must run serially.
How do I test a migration that modifies millions of rows?
Create a test database with a representative subset of production data (e.g., 100K rows). Run the migration on the test database and measure how long it takes. Then extrapolate: if 100K rows take 5 seconds, 10 million rows will take roughly 500 seconds (about 8 minutes). Plan maintenance windows accordingly.