Skip to main content

Reproducible ML Pipelines in Python: Step-by-Step

Reproducibility is a cornerstone of scientific machine learning and professional ML practice. A reproducible pipeline means that running your code again—on the same data, with the same versions of libraries—produces the exact same model. Without reproducibility, you cannot debug, you cannot collaborate with confidence, and you cannot audit your models for compliance. This article teaches you how to build pipelines that are reproducible from data loading to model deployment.

Why Reproducibility Matters

Imagine you train a model that gets 94% accuracy. A week later, a colleague reruns your code and gets 91% accuracy. What happened? Maybe you used a different random seed, updated a library, changed the data, or introduced a subtle bug. Without reproducibility, debugging is a nightmare. In regulated industries (finance, healthcare), auditors ask "Can you retrain this exact model today and show it produces the same results?" If you cannot, the model fails compliance.

Reproducibility requires pinning four things: random seeds, dependency versions, data, and code.

Pinning Random Seeds

Random seeds are your first line of defense. Many ML libraries use randomness: neural network initialization, train-test splits, hyperparameter sampling, dropout, and cross-validation fold shuffling. Set the seed globally before any randomness occurs.

import os
import random
import numpy as np
import torch
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

# Set seeds globally
def set_random_seed(seed: int):
"""Set all random seeds for reproducibility."""
os.environ["PYTHONHASHSEED"] = str(seed)
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
# Disable cuDNN randomness (trades performance for reproducibility)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

SEED = 42
set_random_seed(SEED)

# Now all randomness is seeded
X = np.random.randn(100, 5)
y = np.random.randint(0, 2, 100)

X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=SEED
)

model = RandomForestClassifier(n_estimators=100, random_state=SEED)
model.fit(X_train, y_train)
accuracy = accuracy_score(y_test, model.predict(X_test))
print(f"Accuracy: {accuracy:.3f}")

Every time you run this, you get the same train-test split and the same model. For PyTorch or TensorFlow models, seed setting is even more critical because neural network initialization is random.

Note: Setting torch.backends.cudnn.deterministic = True disables GPU optimizations for exact determinism, which slightly slows training. For production, you might disable it once you are confident in reproducibility.

Pinning Dependency Versions

Python libraries change constantly. A newer version of scikit-learn might use a different default algorithm, change random behavior, or introduce bugs. Pin your dependencies in a requirements.txt or pyproject.toml file.

requirements.txt approach:

scikit-learn==1.5.0
pandas==2.2.0
numpy==1.26.0
torch==2.4.0
mlflow==2.12.0

Install pinned versions:

pip install -r requirements.txt

Better: use pip freeze to capture the exact versions you are using right now:

pip freeze > requirements.txt

This creates a complete snapshot of all installed packages and their versions.

Best practice: use environment management (conda or uv):

# environment.yml (conda format)
name: ml-pipeline
channels:
- defaults
- conda-forge
dependencies:
- python=3.11
- scikit-learn=1.5.0
- pandas=2.2.0
- numpy=1.26.0
- pytorch::pytorch=2.4.0
- pytorch::pytorch-cuda=12.1
- pip
- pip:
- mlflow==2.12.0

Create and activate:

conda env create -f environment.yml
conda activate ml-pipeline

Conda ensures reproducibility across operating systems better than pip alone.

Versioning Data

You train a model on data from January 2026. Two months later, someone reruns the training script on current data (March 2026) and gets a different model. Which one is deployed? Data versioning solves this: every dataset gets a version tag.

The simplest approach: hash the data.

import hashlib
import json
from pathlib import Path

def hash_dataframe(df):
"""Compute SHA256 hash of a DataFrame."""
return hashlib.sha256(
pd.util.hash_pandas_object(df, index=True).values.tobytes()
).hexdigest()

def save_data_with_hash(df, filepath: str):
"""Save DataFrame and log its hash."""
df.to_csv(filepath, index=False)
data_hash = hash_dataframe(df)

# Save hash metadata
metadata = {
"filepath": filepath,
"hash": data_hash,
"rows": len(df),
"columns": list(df.columns),
"timestamp": pd.Timestamp.now().isoformat(),
}

metadata_path = filepath.replace(".csv", "_metadata.json")
with open(metadata_path, "w") as f:
json.dump(metadata, f, indent=2)

print(f"Data saved: {filepath}")
print(f"Hash: {data_hash}")
return data_hash

# Usage
import pandas as pd
df = pd.read_csv("raw_data.csv")
data_hash = save_data_with_hash(df, "data/training_data.csv")

# In your training script, log the hash to MLflow
import mlflow
mlflow.log_param("data_hash", data_hash)

Now, every model is tagged with the exact data version it was trained on. When you need to retrain, you can either use the same data hash or explicitly use a new one, but it's always tracked.

Structuring Your Pipeline as a Reproducible Script

Move away from notebooks and build a modular training script:

# train.py
import argparse
import mlflow
import pandas as pd
from pathlib import Path
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score
import json

def load_data(data_path: str) -> pd.DataFrame:
"""Load and validate data."""
df = pd.read_csv(data_path)
assert not df.isnull().any().any(), "Data contains null values"
return df

def preprocess(df: pd.DataFrame, random_state: int) -> tuple:
"""Split and scale data."""
X = df.drop("target", axis=1)
y = df["target"]

X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=random_state
)

scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

return X_train, X_test, y_train, y_test, scaler

def train_and_log(
X_train, X_test, y_train, y_test,
n_estimators: int,
max_depth: int,
run_name: str,
):
"""Train model and log to MLflow."""
mlflow.start_run(run_name=run_name)

mlflow.log_param("n_estimators", n_estimators)
mlflow.log_param("max_depth", max_depth)

model = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
random_state=42
)
model.fit(X_train, y_train)

y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred, average="weighted")
recall = recall_score(y_test, y_pred, average="weighted")

mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("precision", precision)
mlflow.log_metric("recall", recall)

mlflow.sklearn.log_model(model, "model")

mlflow.end_run()

return model, accuracy

def main():
parser = argparse.ArgumentParser()
parser.add_argument("--data", type=str, default="data.csv")
parser.add_argument("--n_estimators", type=int, default=100)
parser.add_argument("--max_depth", type=int, default=10)
parser.add_argument("--run_name", type=str, default="default_run")
parser.add_argument("--seed", type=int, default=42)

args = parser.parse_args()

# Set seed before any randomness
set_random_seed(args.seed)

# Load and preprocess
df = load_data(args.data)
X_train, X_test, y_train, y_test, scaler = preprocess(df, args.seed)

# Train and log
model, accuracy = train_and_log(
X_train, X_test, y_train, y_test,
args.n_estimators,
args.max_depth,
args.run_name,
)

print(f"Training complete. Accuracy: {accuracy:.3f}")

if __name__ == "__main__":
main()

Run it:

python train.py --data data.csv --n_estimators 100 --max_depth 10 --run_name "exp_v1"

This script is reproducible and parameterized. Anyone can run it with the same arguments and get the same results (assuming the same seed, versions, and data).

Version Control for Code

Use Git to version your code. Every commit is a snapshot. When you train a model, log the Git commit hash to MLflow:

import subprocess
import mlflow

def log_git_commit():
"""Log the current Git commit hash."""
commit = subprocess.check_output(
["git", "rev-parse", "HEAD"]
).decode("utf-8").strip()
mlflow.log_param("git_commit", commit)
return commit

# In your training function
mlflow.start_run()
log_git_commit()
# ... training code ...
mlflow.end_run()

Now every model is linked to the exact code version that trained it. If a model fails in production, you can check out that commit and debug.

Key Takeaways

  • Reproducibility requires pinning four elements: random seeds, dependency versions, data, and code.
  • Set random.seed(), np.random.seed(), and torch.manual_seed() globally before any randomness.
  • Use requirements.txt, environment.yml, or uv to pin dependency versions.
  • Hash your data and log the hash to MLflow so you always know which data trained which model.
  • Structure your pipeline as a modular script (not a notebook) with clear functions for loading, preprocessing, and training.
  • Use Git for code versioning and log the commit hash with each model.

Frequently Asked Questions

Does pinning dependency versions lock me into old, insecure libraries?

No. You should still update dependencies regularly (monthly or quarterly) and test your pipeline with new versions. Once you confirm it works, update your pinned versions. The point is to know which versions you are using, not to avoid updates forever.

Can I make a reproducible pipeline with PyTorch on GPU?

Yes, but be aware that GPU computations can have subtle non-determinism due to floating-point rounding. Set torch.backends.cudnn.deterministic = True and torch.backends.cudnn.benchmark = False as shown above. This gives exact reproducibility at a small performance cost.

What if I cannot hash large datasets?

For very large datasets, use a subset hash or a file-level hash instead. Hash only the first N rows or compute a hash of the file itself: hashlib.sha256(open("file.csv", "rb").read()).hexdigest(). The key is that the hash is deterministic and unique per dataset version.

How do I handle randomness in data preprocessing (e.g., random feature selection)?

Set the seed before preprocessing. If you use sklearn's FeatureSelectionRandomForest or similar, pass random_state=seed to the function.

Is a Git commit sufficient, or should I tag releases?

For production models, tag releases explicitly: git tag v1.0.0-model after committing training code. This makes it crystal clear which code version trained the deployed model. MLflow can reference the tag instead of the commit hash.

Further Reading