Skip to main content

Production MLOps: End-to-End Case Study & Best Practices

In this final article, you will see MLOps in action: a real-world case study of a complete ML system handling experiment tracking, model registry, data versioning, production deployment, drift monitoring, and automated retraining. This ties together all previous articles and shows how the pieces fit.

Case Study: Customer Churn Prediction System

Imagine you work for a telecom company. Customer churn (cancellation) is costly: acquiring a new customer is 5-25 times more expensive than retaining an existing one. You build an ML model to predict which customers are likely to churn in the next month, so the retention team can proactively reach out.

Phase 1: Experiment and Development

Goal: Build and validate a churn prediction model.

Data scientist Alice trains models locally with MLflow (article 2):

# alice_train.py
import mlflow
import mlflow.sklearn
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, roc_auc_score
from sklearn.preprocessing import StandardScaler

# Load data
df = pd.read_csv("data/churn_data_jan2026.csv")
X = df.drop("churn", axis=1)
y = df["churn"]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Scale features
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

# Experiment tracking
mlflow.set_experiment("Customer Churn Prediction - Jan 2026")
mlflow.start_run(run_name="rf_v1_baseline")

mlflow.log_param("model", "RandomForest")
mlflow.log_param("n_estimators", 100)
mlflow.log_param("max_depth", 10)
mlflow.log_param("train_size", len(X_train))

# Train
model = RandomForestClassifier(n_estimators=100, max_depth=10, random_state=42)
model.fit(X_train, y_train)

# Evaluate
y_pred = model.predict(X_test)
y_pred_proba = model.predict_proba(X_test)[:, 1]

accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
auc = roc_auc_score(y_test, y_pred_proba)

mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("precision", precision)
mlflow.log_metric("recall", recall)
mlflow.log_metric("auc", auc)

# Log model
mlflow.sklearn.log_model(model, "model")

# Log data version
import hashlib
data_hash = hashlib.md5(pd.util.hash_pandas_object(df, index=True).values.tobytes()).hexdigest()
mlflow.log_param("data_hash", data_hash)

mlflow.end_run()

print(f"Model trained. AUC: {auc:.3f}, Precision: {precision:.3f}, Recall: {recall:.3f}")

Alice runs this multiple times, tweaking hyperparameters. She compares runs in the MLflow UI and selects the best.

Phase 2: Data Versioning and Reproducibility

Goal: Ensure the training pipeline is reproducible and data is versioned.

Create a Git repo with training code, requirements, and DVC pipeline:

# dvc.yaml
stages:
prepare:
cmd: python scripts/prepare_data.py
deps:
- data/raw_churn.csv
- scripts/prepare_data.py
outs:
- data/prepared.csv

train:
cmd: python scripts/train.py
deps:
- data/prepared.csv
- scripts/train.py
params:
- train.n_estimators
- train.max_depth
outs:
- models/model.pkl
metrics:
- metrics.json:
cache: false

Commit to Git:

dvc add data/raw_churn.csv
git add data/raw_churn.csv.dvc .gitignore scripts/
git commit -m "Initial churn prediction pipeline"
git tag -a data_v1_jan2026 -m "January 2026 churn data"

Phase 3: Model Registry and Staging

Goal: Register the best model and stage it for testing.

Alice registers the best run:

from mlflow.tracking import MlflowClient

client = MlflowClient()

# Get best run
experiment = client.get_experiment_by_name("Customer Churn Prediction - Jan 2026")
runs = client.search_runs(experiment_ids=[experiment.experiment_id])
best_run = max(runs, key=lambda r: r.data.metrics.get("auc", 0))

# Register
model_name = "churn_predictor"
model_uri = f"runs:/{best_run.info.run_id}/model"
version_info = mlflow.register_model(model_uri, model_name)

# Transition to Staging
client.transition_model_version_stage(
name=model_name,
version=version_info.version,
stage="Staging"
)

print(f"Registered {model_name} version {version_info.version} in Staging")

Phase 4: Staging and Validation

Goal: Validate the model before production.

The ML team loads the staging model and validates on holdout data:

import mlflow.pyfunc

# Load staging model
staging_model = mlflow.pyfunc.load_model("models:/churn_predictor/Staging")

# Validate on holdout test set
import pandas as pd
test_data = pd.read_csv("data/holdout_test_feb2026.csv")
X_test = test_data.drop("churn", axis=1)
y_test = test_data["churn"]

predictions = staging_model.predict(X_test)
auc = roc_auc_score(y_test, predictions[:, 1])

print(f"Staging model AUC on holdout: {auc:.3f}")

if auc >= 0.85: # Threshold
print("Validation passed. Ready for production.")
else:
print("Validation failed. Model needs improvement.")

Phase 5: Production Deployment

Goal: Deploy the validated model to serving infrastructure.

Transition the model to Production:

client.transition_model_version_stage(
name="churn_predictor",
version=1,
stage="Production"
)

# Serving infrastructure loads the production model
# (e.g., Flask app, SageMaker endpoint, or REST API)

A simple Flask API serves predictions:

# serving_api.py
from flask import Flask, request, jsonify
import mlflow.pyfunc
import pandas as pd
import json

app = Flask(__name__)

# Load production model on startup
model = mlflow.pyfunc.load_model("models:/churn_predictor/Production")

@app.route("/predict", methods=["POST"])
def predict():
data = request.json
df = pd.DataFrame([data])

# Prediction
pred = model.predict(df)
churn_probability = float(pred[0][1])

# Log prediction for monitoring
log_prediction({
"timestamp": datetime.now().isoformat(),
"customer_id": data.get("customer_id"),
"prediction": churn_probability,
})

return jsonify({"churn_probability": churn_probability})

if __name__ == "__main__":
app.run(host="0.0.0.0", port=5001)

Deploy to a server (Docker, Kubernetes, AWS, GCP, etc.).

Phase 6: Monitoring and Drift Detection

Goal: Monitor model performance and detect drift.

A scheduled job (cron, Airflow) checks for drift and performance decay:

# monitoring_job.py
from scipy.stats import ks_2samp
import pandas as pd
from sklearn.metrics import roc_auc_score
import sqlite3
from datetime import datetime

def monitor():
"""Monitor churn predictor."""

# 1. Check data drift
train_data = pd.read_csv("data/prepared_jan2026.csv")
recent_data = pd.read_csv("data/recent_customer_features_mar2026.csv")

drift_detected = False
for col in train_data.columns:
if col in recent_data.columns:
_, p_value = ks_2samp(train_data[col], recent_data[col])
if p_value < 0.05:
print(f"Drift in {col}")
drift_detected = True

# 2. Check performance (with ground truth)
conn = sqlite3.connect("predictions.db")
query = """
SELECT prediction, actual FROM predictions
WHERE actual IS NOT NULL
AND timestamp > datetime('now', '-7 days')
"""
df = pd.read_sql_query(query, conn)
conn.close()

if len(df) > 100:
y_true = df["actual"]
y_pred = df["prediction"]
auc = roc_auc_score(y_true, y_pred)

print(f"Current AUC: {auc:.3f}")

if auc < 0.80: # Threshold
print("Performance degraded. Consider retraining.")
return True

return drift_detected

if monitor():
print("Trigger retraining.")

Phase 7: Automated Retraining

Goal: Automatically retrain when drift or performance decay is detected.

An Airflow DAG orchestrates the retraining:

# dags/churn_retraining_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import subprocess

default_args = {"owner": "ml_team", "retries": 2}

dag = DAG(
"churn_retraining",
default_args=default_args,
schedule_interval="@daily",
start_date=datetime(2026, 6, 1),
)

def trigger_training():
"""Run training pipeline."""
result = subprocess.run(
["dvc", "repro"],
cwd="/home/ml/churn_predictor",
capture_output=True,
text=True,
)
if result.returncode != 0:
raise Exception(f"Training failed: {result.stderr}")
print("Training complete")

def validate():
"""Validate new model."""
import mlflow
from sklearn.metrics import roc_auc_score

# Load new model and validate
# If validation passes, transition to Production

print("Validation passed")

def notify():
"""Notify team."""
print("New model deployed. Notifying slack...")

train_task = PythonOperator(task_id="train", python_callable=trigger_training, dag=dag)
validate_task = PythonOperator(task_id="validate", python_callable=validate, dag=dag)
notify_task = PythonOperator(task_id="notify", python_callable=notify, dag=dag)

train_task >> validate_task >> notify_task

Phase 8: Operational Excellence

Goal: Document, audit, and continuously improve.

Logging and Auditing:

Every model deployed logs metadata:

# Log deployment event
deployment_log = {
"timestamp": datetime.now().isoformat(),
"model_name": "churn_predictor",
"model_version": 2,
"deployed_by": "[email protected]",
"auc": 0.87,
"data_version": "data_v2_mar2026",
"git_commit": "a1b2c3d4e5f6",
}

with open("deployments.jsonl", "a") as f:
f.write(json.dumps(deployment_log) + "\n")

Rollback Plan:

If the new model is buggy:

client.transition_model_version_stage(
name="churn_predictor",
version=1, # Revert to previous version
stage="Production"
)
# Serving API reloads next time

Continuous Improvement:

Track model performance over time and document lessons:

# analysis.py
import pandas as pd

# Load deployment history
deployments = pd.read_json("deployments.jsonl", lines=True)

# Analyze
deployments_sorted = deployments.sort_values("timestamp", ascending=False)
print(f"Latest model (v{deployments_sorted.iloc[0]['model_version']}): AUC {deployments_sorted.iloc[0]['auc']}")
print(f"Previous model (v{deployments_sorted.iloc[1]['model_version']}): AUC {deployments_sorted.iloc[1]['auc']}")

# Write findings
with open("findings.md", "w") as f:
f.write("# Churn Prediction Model Performance\n\n")
f.write("- Latest AUC: 0.87 (up from 0.84)\n")
f.write("- Data drift detected in tenure and monthly_charges\n")
f.write("- Recommendation: retrain more frequently (weekly vs. monthly)\n")

MLOps Maturity Checklist

Use this checklist to assess your production MLOps:

Level 1 (Manual):

  • Models trained locally in notebooks
  • No experiment tracking
  • Models deployed as pickle files on email
  • No monitoring or retraining

Level 2 (Tracked):

  • Experiment tracking with MLflow
  • Data versioning with DVC
  • Model registry with versioning
  • Automated retraining

Level 3 (Automated):

  • Automated training pipelines (Airflow, SageMaker)
  • Drift monitoring and alerting
  • Automated retraining on drift/schedule
  • CI/CD for models (tests before deployment)

Level 4 (Optimized):

  • Multi-model orchestration
  • Advanced monitoring (fairness, bias, explainability)
  • Automated A/B testing and canary deployments
  • Fully self-healing pipelines

Most organizations aim for Level 3; this case study achieves it.

Key Takeaways

  • A production ML system integrates experiment tracking, data versioning, model registry, deployment, monitoring, and retraining.
  • Each component serves a purpose: tracking accelerates research, versioning ensures reproducibility, registries enable governance, monitoring detects issues, retraining fixes them.
  • Automation reduces the time from problem detection to fix from days to hours.
  • Documentation and auditing are critical: every deployment must be logged, every decision must be traceable.
  • Expect to iterate: start at Level 1, mature to Level 2, then Level 3 as your team grows.

Frequently Asked Questions

How long does it take to go from Level 1 to Level 3?

For a small team (2-3 data scientists), 2-3 months if focused. Most teams do it over 6-12 months as a side project. The hardest part is cultural: getting everyone to commit to reproducibility and governance.

What is the cost difference between Level 1 and Level 3?

Level 1: Just your laptop. Level 3: MLflow server ($100-500/month), cloud compute for training ($500-2000/month), storage ($100-500/month). Total: ~$1000-3000/month for a small team. ROI comes from avoided mistakes and faster time-to-market.

Can I skip steps (e.g., use a model registry without monitoring)?

Technically yes, but not recommended. Each component solves a real problem. Skip monitoring, and you serve bad models. Skip a model registry, and you cannot roll back. Start with tracking + registry, add monitoring, then add retraining.

What if my team is using a commercial platform (SageMaker, Vertex AI)?

The principles are the same; the tools change. Commercial platforms handle much of the infrastructure, so you focus on model development. Still use the MLOps mental model: track experiments, version data, register models, monitor, and retrain.

How do I convince my organization to invest in MLOps?

Show ROI: estimate the cost of a bad model staying in production (missed revenue, regulatory risk) and compare to the cost of MLOps infrastructure. Most organizations break even in months.

Further Reading