Skip to main content

Automated Retraining: Triggers and Scheduling

Automated retraining removes the manual step of monitoring a model, deciding it is time to retrain, and running the training pipeline. Instead, the system automatically detects when retraining is needed (drift detected, new data available, or time elapsed) and kicks off the pipeline. In this article, you will learn to set up trigger-based and scheduled retraining with Python and Apache Airflow.

Why Automated Retraining Matters

Without automation, retraining is reactive: a model decays in production, a human notices, and someone manually runs training code. This can take days or weeks. With automation, retraining is proactive: as soon as drift is detected or a threshold is crossed, a new model is trained, validated, and deployed automatically.

This requires three components: a trigger (when to retrain), a training pipeline, and validation before deployment.

Trigger 1: Performance-Based Retraining

Retrain when accuracy drops below a threshold. You need ground truth labels to compute accuracy (article 6 covers this).

import sqlite3
import pandas as pd
from sklearn.metrics import accuracy_score
import subprocess

def should_retrain_performance() -> bool:
"""Check if accuracy dropped below threshold."""
conn = sqlite3.connect("monitoring.db")

query = """
SELECT prediction, actual
FROM predictions
WHERE actual IS NOT NULL
AND timestamp > datetime('now', '-7 days')
"""
df = pd.read_sql_query(query, conn)
conn.close()

if len(df) < 100: # Need enough data to judge accuracy
return False

y_pred = (df["prediction"] > 0.5).astype(int)
y_true = df["actual"]
accuracy = accuracy_score(y_true, y_pred)

print(f"Current accuracy: {accuracy:.3f}")

# Threshold: retrain if accuracy drops below 85%
return accuracy < 0.85

def trigger_retraining():
"""Trigger retraining pipeline."""
if should_retrain_performance():
print("Accuracy below threshold. Triggering retraining...")
subprocess.run(["python", "train.py"], check=True)
else:
print("Accuracy acceptable. No retraining needed.")

# Run periodically (e.g., daily cron job)
trigger_retraining()

Trigger 2: Data Drift-Based Retraining

Retrain when data drift is detected (article 6 covers drift detection).

import pandas as pd
from scipy.stats import ks_2samp
import subprocess

def should_retrain_drift() -> bool:
"""Check if data drift detected."""
train_data = pd.read_csv("training_baseline.csv")
prod_data = pd.read_csv("production_recent.csv")

drift_threshold = 0.05

for col in train_data.columns:
if col in prod_data.columns:
statistic, p_value = ks_2samp(train_data[col], prod_data[col])
if p_value < drift_threshold:
print(f"Drift detected in {col}")
return True

return False

def trigger_retraining_on_drift():
"""Trigger retraining if drift detected."""
if should_retrain_drift():
print("Data drift detected. Triggering retraining...")
subprocess.run(["python", "train.py"], check=True)
else:
print("No drift. No retraining needed.")

# Run daily
trigger_retraining_on_drift()

Trigger 3: Time-Based Retraining

Retrain periodically (e.g., weekly) regardless of drift or performance, to incorporate new data.

import subprocess
from datetime import datetime, timedelta
import json
import os

def should_retrain_time() -> bool:
"""Check if it is time to retrain (e.g., every 7 days)."""
metadata_file = "retraining_metadata.json"

# Load last retraining time
if os.path.exists(metadata_file):
with open(metadata_file, "r") as f:
metadata = json.load(f)
last_retrain = datetime.fromisoformat(metadata["last_retrain_time"])
else:
last_retrain = datetime.now() - timedelta(days=10) # Force first retrain

days_since = (datetime.now() - last_retrain).days
retrain_interval = 7

return days_since >= retrain_interval

def trigger_retraining_on_schedule():
"""Trigger retraining on schedule."""
if should_retrain_time():
print(f"Scheduled retraining time reached. Retraining...")
subprocess.run(["python", "train.py"], check=True)

# Record retraining time
with open("retraining_metadata.json", "w") as f:
json.dump({"last_retrain_time": datetime.now().isoformat()}, f)
else:
print("Not yet time to retrain.")

# Run daily; the function checks if the interval has passed
trigger_retraining_on_schedule()

A Complete Retraining Pipeline with Validation

Build a pipeline that trains, validates, and only deploys if validation passes:

import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score
import pandas as pd
from mlflow.tracking import MlflowClient

def retrain_and_validate():
"""Retrain model, validate, and promote if good."""

# Load fresh data
df = pd.read_csv("training_data_latest.csv")
X = df.drop("target", axis=1)
y = df["target"]

X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)

# Train
mlflow.set_experiment("Automated Retraining")
mlflow.start_run(run_name="auto_retrain")

model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred, average="weighted")
recall = recall_score(y_test, y_pred, average="weighted")

mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("precision", precision)
mlflow.log_metric("recall", recall)

mlflow.sklearn.log_model(model, "model")
run_id = mlflow.active_run().info.run_id
mlflow.end_run()

# Validate
validation_passed = accuracy >= 0.85 and precision >= 0.83

if validation_passed:
print(f"Validation passed. Accuracy: {accuracy:.3f}")

# Register and promote
client = MlflowClient()
model_name = "production_model"
model_uri = f"runs:/{run_id}/model"

# Check if model exists
try:
versions = client.search_model_versions(f"name='{model_name}'")
new_version = max([int(v.version) for v in versions]) + 1
except:
new_version = 1

version_info = mlflow.register_model(model_uri, model_name)

# Archive old production version
prod_versions = client.search_model_versions(
f"name='{model_name}' and current_stage='Production'"
)
for pv in prod_versions:
client.transition_model_version_stage(
name=model_name,
version=pv.version,
stage="Archived"
)

# Promote new version to production
client.transition_model_version_stage(
name=model_name,
version=version_info.version,
stage="Production"
)

print(f"Model promoted to Production!")
return True
else:
print(f"Validation failed. Accuracy: {accuracy:.3f}, Precision: {precision:.3f}")
return False

# Trigger this when retraining is needed
retrain_and_validate()

Scheduling with Apache Airflow

Apache Airflow is a workflow orchestration tool perfect for managing complex ML pipelines. Install it:

pip install apache-airflow

Define a DAG (directed acyclic graph) for retraining:

# dags/ml_retraining_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
import pandas as pd
from scipy.stats import ks_2samp
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

default_args = {
"owner": "ml_team",
"start_date": datetime(2026, 6, 1),
"retries": 2,
"retry_delay": timedelta(minutes=5),
}

dag = DAG(
"ml_retraining_pipeline",
default_args=default_args,
description="Automated ML retraining pipeline",
schedule_interval="@daily", # Run daily at midnight
catchup=False,
)

def check_drift():
"""Check for data drift."""
train_data = pd.read_csv("training_baseline.csv")
prod_data = pd.read_csv("production_recent.csv")

for col in train_data.columns:
_, p_value = ks_2samp(train_data[col], prod_data[col])
if p_value < 0.05:
return "drift_detected"

return "no_drift"

def train_model():
"""Train a new model."""
df = pd.read_csv("training_data_latest.csv")
X = df.drop("target", axis=1)
y = df["target"]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
model = RandomForestClassifier(n_estimators=100)
model.fit(X_train, y_train)

accuracy = accuracy_score(y_test, model.predict(X_test))

if accuracy < 0.85:
raise Exception(f"Model accuracy {accuracy} below threshold")

# Save model
import pickle
with open("/tmp/new_model.pkl", "wb") as f:
pickle.dump(model, f)

return accuracy

def deploy_model():
"""Deploy the trained model."""
print("Deploying model to production...")
# Copy model to serving location
import shutil
shutil.copy("/tmp/new_model.pkl", "/models/production/model.pkl")
print("Deployment complete")

# Define tasks
check_drift_task = PythonOperator(
task_id="check_drift",
python_callable=check_drift,
dag=dag,
)

train_task = PythonOperator(
task_id="train_model",
python_callable=train_model,
dag=dag,
)

deploy_task = PythonOperator(
task_id="deploy_model",
python_callable=deploy_model,
dag=dag,
)

# Define dependencies
check_drift_task >> train_task >> deploy_task

Start the Airflow scheduler:

airflow scheduler

View the DAG in the Airflow UI (http://localhost:8080). Airflow will execute the pipeline daily, automatically managing retries and logging.

Monitoring Retraining Runs

Track retraining events in a log:

import json
from datetime import datetime

def log_retraining(trigger: str, success: bool, accuracy: float = None, error: str = None):
"""Log retraining event."""
event = {
"timestamp": datetime.now().isoformat(),
"trigger": trigger, # "performance", "drift", "schedule"
"success": success,
"accuracy": accuracy,
"error": error,
}

with open("retraining_log.jsonl", "a") as f:
f.write(json.dumps(event) + "\n")

# Use in your retraining function
try:
success = retrain_and_validate()
log_retraining(trigger="drift", success=success, accuracy=0.88)
except Exception as e:
log_retraining(trigger="drift", success=False, error=str(e))

Key Takeaways

  • Retraining triggers include performance drop, data drift, and elapsed time.
  • A complete retraining pipeline trains, validates, and deploys only if validation passes.
  • Apache Airflow orchestrates complex pipelines with scheduling, retries, and monitoring.
  • Log all retraining events for auditing and analysis.
  • Automation reduces time from drift detection to deployment from days to hours.

Frequently Asked Questions

What if my retraining fails? Will the old model keep serving?

Yes. If validation fails or the pipeline crashes, the old production model stays deployed. This is by design: only promote if validation passes. Log failures and alert the team to investigate.

How do I handle the delay between prediction and ground truth?

Some problems have natural delays (credit default labels arrive months later). Use partial performance metrics: if you have labels for 20% of recent predictions, compute accuracy on those. Alert if accuracy drops below threshold on this subset.

Can I retrain on multiple machines in parallel?

Yes. Airflow supports distributed execution via workers. You can also trigger independent retraining pipelines for different models simultaneously. Ensure each model has a separate directory to avoid conflicts.

Should I retrain all models or just the ones showing drift?

Start with models showing drift. As you scale, retraining all models on a schedule (e.g., weekly) is simpler and safer, even if some are not drifting. The cost of recomputing is low; the cost of serving stale models is high.

How do I roll back if a new model is buggy?

In the model registry, transition the buggy version to Archived and the previous version back to Production. This takes seconds. Your serving infrastructure automatically loads the new production version on the next restart or update.

Further Reading