Modular Pipeline with Sub-Workflows

v1.1 Feature

Demonstrates v1.1 sub-workflows — compose complex pipelines from reusable building blocks.

6 nodes · 6 edgesv1.1 features
v1.1sub-workflowcompositiondata-pipeline
Visual
extract_salessystem
paralleltransform
extract_inventorysystem
paralleltransform
transformsystem
sequentialquality_gate
quality_gatesystem
conditionalload_warehouse
fallbacknotify_team
load_warehousedb
sequentialnotify_team
notify_teamapi
ex-sub-workflow-composition.osop.yaml
osop_version: "1.1"
id: "sub-workflow-composition"
name: "Modular Pipeline with Sub-Workflows"
description: "Demonstrates v1.1 sub-workflows — compose complex pipelines from reusable building blocks."
tags: [v1.1, sub-workflow, composition, data-pipeline]

imports:
  - "./etl-extract.osop"
  - "./etl-transform.osop"
  - "./quality-check.osop"

timeout_sec: 7200  # 2 hour global deadline

nodes:
  - id: "extract_sales"
    type: "system"
    purpose: "Extract sales data from source systems."
    workflow_ref: "etl-extract"
    workflow_inputs:
      source: "salesforce"
      date_range: "${inputs.date_range}"

  - id: "extract_inventory"
    type: "system"
    purpose: "Extract inventory data from warehouse system."
    workflow_ref: "etl-extract"
    workflow_inputs:
      source: "warehouse_api"
      date_range: "${inputs.date_range}"

  - id: "transform"
    type: "system"
    purpose: "Clean and normalize extracted data."
    workflow_ref: "etl-transform"
    workflow_inputs:
      sales_data: "${outputs.extract_sales.data}"
      inventory_data: "${outputs.extract_inventory.data}"

  - id: "quality_gate"
    type: "system"
    purpose: "Run data quality checks."
    workflow_ref: "quality-check"
    workflow_inputs:
      dataset: "${outputs.transform.result}"
      rules: "strict"

  - id: "load_warehouse"
    type: "db"
    purpose: "Load validated data into analytics warehouse."
    runtime:
      engine: "bigquery"
      table: "analytics.daily_report"

  - id: "notify_team"
    type: "api"
    purpose: "Send completion notification to Slack."
    runtime:
      endpoint: "https://hooks.slack.com/services/..."
      method: "POST"

edges:
  - from: "extract_sales"
    to: "transform"
    mode: "parallel"
  - from: "extract_inventory"
    to: "transform"
    mode: "parallel"
    join_mode: "wait_all"
  - from: "transform"
    to: "quality_gate"
    mode: "sequential"
  - from: "quality_gate"
    to: "load_warehouse"
    mode: "conditional"
    when: "outputs.quality_gate.passed == true"
  - from: "load_warehouse"
    to: "notify_team"
    mode: "sequential"
  - from: "quality_gate"
    to: "notify_team"
    mode: "fallback"
    label: "Alert team on quality failure"