ML Training Pipeline

PR Ready

Data prep → train → evaluate → A/B test → deploy to serving endpoint.

6 nodes · 6 edgespr ready
argomlopstrainingkubernetesml-pipeline
Visual
Prepare Training Datadata

Pull from feature store, split train/val/test, and write to GCS.

sequentialTrain Model
Train Modelcli

Fine-tune transformer model on GPU nodes.

sequentialEvaluate Model
Evaluate Modelcicd

Compare accuracy, latency, and fairness metrics against baseline.

conditionalA/B Test on Shadow Traffic
conditionalRollback
A/B Test on Shadow Trafficinfra

Route 10% of shadow traffic to the candidate model for 24 hours.

conditionalDeploy to Serving
conditionalRollback
Deploy to Servinginfra

Promote candidate to production serving endpoint.

Rollbackinfra

Revert to previous model version if A/B test regresses.

ex-argo-ml-training.osop.yaml
# Argo Workflows ML Training Pipeline — OSOP Portable Workflow
#
# End-to-end ML pipeline: prepare training data, train a model on GPU,
# evaluate against baseline metrics, run an A/B test on shadow traffic,
# and deploy to a serving endpoint if the new model wins.
#
# Run with Argo or validate: osop validate argo-ml-training.osop.yaml

osop_version: "1.0"
id: "argo-ml-training"
name: "ML Training Pipeline"
description: "Data prep → train → evaluate → A/B test → deploy to serving endpoint."
version: "1.0.0"
tags: [argo, mlops, training, kubernetes, ml-pipeline]

nodes:
  - id: "prepare_data"
    type: "data"
    name: "Prepare Training Data"
    description: "Pull from feature store, split train/val/test, and write to GCS."
    config:
      source: "feature-store://user-embeddings/v3"
      splits: { train: 0.8, val: 0.1, test: 0.1 }

  - id: "train_model"
    type: "cli"
    subtype: "script"
    name: "Train Model"
    description: "Fine-tune transformer model on GPU nodes."
    config:
      command: "python train.py --config config/prod.yaml"
      resources: { gpu: 4, memory: "64Gi" }

  - id: "evaluate"
    type: "cicd"
    subtype: "test"
    name: "Evaluate Model"
    description: "Compare accuracy, latency, and fairness metrics against baseline."
    config:
      metrics: [accuracy, f1, p99_latency, demographic_parity]
      baseline: "models/production/latest"

  - id: "ab_test"
    type: "infra"
    name: "A/B Test on Shadow Traffic"
    description: "Route 10% of shadow traffic to the candidate model for 24 hours."
    config:
      traffic_split: 0.1
      duration_hours: 24

  - id: "deploy_serving"
    type: "infra"
    name: "Deploy to Serving"
    description: "Promote candidate to production serving endpoint."
    config:
      endpoint: "models/recommendation/v3"
      canary_percent: 5

  - id: "rollback"
    type: "infra"
    name: "Rollback"
    description: "Revert to previous model version if A/B test regresses."

edges:
  - from: "prepare_data"
    to: "train_model"
    mode: "sequential"
  - from: "train_model"
    to: "evaluate"
    mode: "sequential"
  - from: "evaluate"
    to: "ab_test"
    mode: "conditional"
    when: "metrics.accuracy > baseline.accuracy"
    label: "Beats baseline"
  - from: "ab_test"
    to: "deploy_serving"
    mode: "conditional"
    when: "ab_result == 'winner'"
    label: "A/B test passed"
  - from: "ab_test"
    to: "rollback"
    mode: "conditional"
    when: "ab_result == 'loser'"
    label: "A/B test failed"
  - from: "evaluate"
    to: "rollback"
    mode: "conditional"
    when: "metrics.accuracy <= baseline.accuracy"
    label: "Below baseline — abort"