ML Model Training Pipeline

AI/ML
7 nodes · 8 edgesai ml
ex-model-training.osop.yaml
# ML Model Training Pipeline
# Data preparation, distributed training, evaluation, model registry, and deployment
osop_version: "2.0"
id: model-training
name: ML Model Training Pipeline

nodes:
  - id: prepare_data
    type: data
    purpose: Fetch raw data, split into train/val/test sets, and apply feature engineering
    runtime:
      engine: spark
      config:
        app_name: data-prep
        master: "k8s://spark-master:7077"
        executor_memory: "8g"
    outputs: [train_set, val_set, test_set, feature_stats]
    timeout_sec: 1800
    explain: |
      Applies stratified splitting (80/10/10), handles class imbalance via SMOTE,
      normalizes features, and saves dataset versioning metadata to DVC.

  - id: train_model
    type: cli
    purpose: Launch distributed training job on GPU cluster with experiment tracking
    runtime:
      command: |
        python train.py \
          --data-path ${train_set} \
          --val-path ${val_set} \
          --config configs/model_v3.yaml \
          --experiment-name ${EXPERIMENT_NAME} \
          --epochs 50 \
          --early-stopping-patience 5 \
          --distributed
    inputs: [train_set, val_set]
    outputs: [model_checkpoint, training_metrics, mlflow_run_id]
    timeout_sec: 14400
    retry_policy:
      max_retries: 2
      backoff_sec: 60
    explain: |
      Uses PyTorch DDP across 4 GPUs. Logs metrics to MLflow.
      Early stopping prevents overfitting. Checkpoints saved every epoch.

  - id: evaluate_model
    type: cli
    purpose: Evaluate trained model on held-out test set and compute production metrics
    runtime:
      command: |
        python evaluate.py \
          --checkpoint ${model_checkpoint} \
          --test-path ${test_set} \
          --metrics accuracy,f1,auc_roc,calibration \
          --output evaluation_report.json
    inputs: [model_checkpoint, test_set]
    outputs: [accuracy, f1_score, auc_roc, calibration_error, evaluation_report]
    timeout_sec: 600

  - id: bias_audit
    type: cli
    purpose: Run fairness analysis across protected attributes using Fairlearn
    runtime:
      command: |
        python bias_audit.py \
          --checkpoint ${model_checkpoint} \
          --test-path ${test_set} \
          --protected-attributes age,gender,ethnicity \
          --metrics demographic_parity,equalized_odds
    inputs: [model_checkpoint, test_set]
    outputs: [bias_report, fairness_passed]
    timeout_sec: 300

  - id: review_results
    type: human
    purpose: ML engineer reviews metrics, bias report, and decides whether to promote
    role: ml_engineer
    inputs: [evaluation_report, bias_report, training_metrics]
    approval_gate:
      required_approvers: 1
      timeout_min: 1440

  - id: register_model
    type: api
    purpose: Register approved model version in MLflow Model Registry
    runtime:
      endpoint: /api/2.0/mlflow/registered-models/create
      method: POST
      url: https://mlflow.internal
    inputs: [model_checkpoint, mlflow_run_id, evaluation_report]
    outputs: [model_version, registry_uri]
    security:
      auth: bearer_token
      secret_ref: MLFLOW_TRACKING_TOKEN

  - id: deploy_model
    type: cli
    purpose: Deploy registered model to serving infrastructure via Seldon Core
    runtime:
      command: |
        kubectl apply -f - <<MANIFEST
        apiVersion: machinelearning.seldon.io/v1
        kind: SeldonDeployment
        metadata:
          name: ${MODEL_NAME}
          namespace: ml-serving
        spec:
          predictors:
            - graph:
                name: model
                modelUri: ${registry_uri}
              traffic: 100
              replicas: 3
        MANIFEST
    inputs: [registry_uri, model_version]
    outputs: [serving_endpoint, deployment_status]
    timeout_sec: 300
    security:
      credentials: [KUBECONFIG]

edges:
  - from: prepare_data
    to: train_model
    mode: sequential

  - from: train_model
    to: evaluate_model
    mode: sequential

  - from: train_model
    to: bias_audit
    mode: sequential

  - from: evaluate_model
    to: review_results
    mode: conditional
    condition: "f1_score >= 0.85 && auc_roc >= 0.90"

  - from: bias_audit
    to: review_results
    mode: sequential

  - from: review_results
    to: register_model
    mode: sequential

  - from: register_model
    to: deploy_model
    mode: sequential

  - from: evaluate_model
    to: prepare_data
    mode: fallback
    label: "Metrics below threshold, revisit data preparation"