Skip to main content

Data Migrations: Transform Data During Schema Changes

Data migrations are operations that transform your data as you change your schema. Unlike schema migrations (which add/drop columns), data migrations compute new values, split columns, denormalize data, or consolidate records. A common example: when you add a status field to a users table, you need to populate it for existing rows. Alembic handles this with op.execute() and Python logic in migration files.

The Challenge: Schema and Data Together

When you split a full_name column into first_name and last_name, you face a sequencing problem:

  1. Add first_name and last_name columns (empty)
  2. Compute values from full_name for each row
  3. Drop the old full_name column

If you skip step 2, existing rows lose their data. Alembic's migration engine lets you do steps 1 and 2 atomically: the migration runs the schema change and the data transformation together, ensuring nothing is lost.

Data Migration Pattern: Add, Populate, Constrain

This three-step pattern handles most data migrations safely:

# migrations/versions/005_split_full_name.py
def upgrade():
# Step 1: Add new columns (nullable initially)
op.add_column('users', sa.Column('first_name', sa.String(50)))
op.add_column('users', sa.Column('last_name', sa.String(50)))

# Step 2: Populate from existing data
op.execute("""
UPDATE users
SET first_name = SUBSTRING_INDEX(full_name, ' ', 1),
last_name = SUBSTRING_INDEX(full_name, ' ', -1)
WHERE full_name IS NOT NULL
""")

# Step 3: Add constraints now that data is populated
op.alter_column('users', 'first_name', nullable=False)
op.alter_column('users', 'last_name', nullable=False)
op.drop_column('users', 'full_name')

def downgrade():
# Reverse: add full_name, merge first/last, drop new columns
op.add_column('users', sa.Column('full_name', sa.String(100)))
op.execute("""
UPDATE users
SET full_name = CONCAT(first_name, ' ', last_name)
""")
op.alter_column('users', 'full_name', nullable=False)
op.drop_column('users', 'first_name')
op.drop_column('users', 'last_name')

This ensures that:

  • Existing rows are transformed correctly
  • You don't have a state where data is lost
  • The migration is reversible

Example: Add a Computed Field

You want to add a created_year field derived from created_at:

# migrations/versions/006_add_created_year.py
def upgrade():
op.add_column('users', sa.Column('created_year', sa.Integer))

# Populate with the year extracted from created_at
op.execute("""
UPDATE users
SET created_year = EXTRACT(YEAR FROM created_at)
""")

op.alter_column('users', 'created_year', nullable=False)

def downgrade():
op.drop_column('users', 'created_year')

You have orders and customers tables and want to cache the customer_name in orders for faster queries:

# migrations/versions/007_denormalize_customer_name.py
def upgrade():
op.add_column('orders', sa.Column('customer_name', sa.String(100)))

# Join and copy
op.execute("""
UPDATE orders
SET customer_name = (
SELECT customers.name
FROM customers
WHERE customers.id = orders.customer_id
)
""")

op.alter_column('orders', 'customer_name', nullable=False)

def downgrade():
op.drop_column('orders', 'customer_name')

Example: Conditional Updates

Use CASE statements to set a field based on conditions:

# migrations/versions/008_categorize_users_by_age.py
def upgrade():
op.add_column('users', sa.Column('age_group', sa.String(20)))

op.execute("""
UPDATE users
SET age_group = CASE
WHEN age < 18 THEN 'minor'
WHEN age < 65 THEN 'adult'
ELSE 'senior'
END
""")

op.alter_column('users', 'age_group', nullable=False)

def downgrade():
op.drop_column('users', 'age_group')

Example: Data Migration with Python Logic

For complex logic that SQL can't easily express, fetch rows in Python, compute values, and write back:

# migrations/versions/009_compute_user_score.py
from sqlalchemy import text

def upgrade():
op.add_column('users', sa.Column('score', sa.Float))

# Get a database connection
connection = op.get_bind()

# Fetch all user rows
result = connection.execute(text("""
SELECT id, posts_count, comments_count, followers_count FROM users
"""))

rows = result.fetchall()

# Compute score for each user and update
for user_id, posts, comments, followers in rows:
score = (posts * 10) + (comments * 5) + (followers * 20)
connection.execute(text("""
UPDATE users SET score = :score WHERE id = :id
"""), {"score": score, "id": user_id})

op.alter_column('users', 'score', nullable=False)

def downgrade():
op.drop_column('users', 'score')

Example: Batch Processing Large Tables

For large tables, processing all rows in one query may lock the table for too long. Batch in chunks:

# migrations/versions/010_backfill_status_batched.py
from sqlalchemy import text

def upgrade():
op.add_column('orders', sa.Column('status', sa.String(20)))

connection = op.get_bind()

# Process 1000 rows at a time
batch_size = 1000
while True:
result = connection.execute(text("""
SELECT id FROM orders
WHERE status IS NULL
LIMIT :limit
"""), {"limit": batch_size})

ids = [row[0] for row in result.fetchall()]
if not ids:
break

# Update this batch
placeholders = ','.join([str(id_) for id_ in ids])
connection.execute(text(f"""
UPDATE orders
SET status = 'processed'
WHERE id IN ({placeholders})
"""))

def downgrade():
op.drop_column('orders', 'status')

Database-Specific SQL Syntax

Different databases have different functions for string/date operations:

OperationPostgreSQLMySQLSQLite
Extract yearEXTRACT(YEAR FROM date)YEAR(date)strftime('%Y', date)
String splitsplit_part(str, ' ', 1)SUBSTRING_INDEX(str, ' ', 1)substr(str, 1, instr(str, ' ') - 1)
Concatenatestr1 || str2CONCAT(str1, str2)str1 || str2
Current dateNOW()NOW()datetime('now')

Use context.get_context().dialect.name to branch your SQL (see previous article for examples).

Performance Considerations

When backfilling large tables:

  1. Disable indexes before bulk updates (if your migration tool supports it); re-enable after.
  2. Process in batches to avoid locking the entire table.
  3. Test on production-sized data. What works for 1,000 rows may timeout for 100 million.
  4. Run during low-traffic windows. Schedule data migrations for off-peak hours.
  5. Monitor locks. Some migrations may temporarily block writes.

Safety Checks

Before data migrations, verify your logic:

def upgrade():
connection = op.get_bind()

# Check: count rows before
before = connection.execute(text("SELECT COUNT(*) FROM users WHERE status IS NULL")).scalar()
print(f"Rows to update: {before}")

# Do the update
op.execute("""
UPDATE users SET status = 'active' WHERE status IS NULL
""")

# Check: count rows after
after = connection.execute(text("SELECT COUNT(*) FROM users WHERE status IS NULL")).scalar()
print(f"Rows remaining null: {after}")

# Verify: spot-check some rows
result = connection.execute(text("SELECT id, status FROM users LIMIT 5"))
for row in result:
print(f"User {row[0]}: status={row[1]}")

Key Takeaways

  • Data migrations transform existing data during schema changes using op.execute() with SQL or Python
  • Follow the three-step pattern: add columns, populate data, add constraints
  • For complex logic, use op.get_bind() to fetch rows, compute values in Python, and update the database
  • Use database-specific SQL functions; check dialect name if supporting multiple databases
  • For large tables, process in batches to avoid locking; test on production-sized data
  • Always write reversible downgrade() functions that restore the original state

Frequently Asked Questions

What if a data migration fails halfway through?

Alembic wraps the entire migration in a transaction. If any statement (schema or data) fails, the entire migration is rolled back and your database returns to its previous state. You can then investigate, fix the migration, and try again.

Can I rollback a data migration that changed values?

The downgrade() function should restore the original data. However, if downgrade() can't perfectly reconstruct the original values (e.g., you computed a derived field and threw away information), the downgrade may result in dummy values or nulls. Always test downgrades with real data before production.

How do I migrate data between two tables?

Use a JOIN query in op.execute() to read from one table and insert/update another. For renormalization (moving data out of a denormalized column back into a separate table), create the new table in the upgrade(), populate it, and drop the denormalized column.

Is it safe to add computed columns that derive from other fields?

Yes, as long as the computation is deterministic and doesn't depend on external state. If the computation is expensive or changes frequently, consider making it a view instead of a materialized column. Otherwise, data migrations handle this well.

Further Reading