ETL Data Pipeline

Data
6 nodes · 6 edgesdata
ex-etl-pipeline.osop.yaml
# ETL Pipeline
# Extract from external API, transform with Spark, validate, load to warehouse.
osop_version: "2.0"
id: etl-pipeline
name: ETL Data Pipeline

nodes:
  - id: extract_api
    type: api
    purpose: Extract raw data from third-party analytics API
    runtime:
      endpoint: /v2/export
      method: GET
      url: https://analytics.vendor.com
    security:
      auth: bearer_token
      secret_ref: ANALYTICS_API_KEY
    outputs:
      - raw_data
    retry_policy:
      max_retries: 3
      backoff_sec: 10
    timeout_sec: 120

  - id: clean_data
    type: data
    purpose: Remove duplicates, handle nulls, and fix encoding issues
    runtime:
      engine: spark
      config:
        app_name: etl-clean
        master: yarn
    inputs:
      - raw_data
    outputs:
      - cleaned_data

  - id: normalize
    type: data
    purpose: Normalize fields, convert types, and apply business rules
    runtime:
      engine: spark
      config:
        app_name: etl-normalize
        master: yarn
    inputs:
      - cleaned_data
    outputs:
      - normalized_data

  - id: validate_schema
    type: cli
    purpose: Validate transformed data against expected schema
    runtime:
      command: "great_expectations checkpoint run etl_validation"
    inputs:
      - normalized_data
    outputs:
      - validation_result

  - id: load_warehouse
    type: db
    purpose: Load validated data into the analytics warehouse
    runtime:
      engine: bigquery
      connection: project.analytics.fact_events
    inputs:
      - normalized_data
    outputs:
      - load_receipt
    explain: "Only loads if validation passed; otherwise pipeline halts."

  - id: update_dashboard
    type: api
    purpose: Trigger dashboard refresh after successful data load
    runtime:
      endpoint: /api/refresh
      method: POST
      url: https://dashboard.internal
    inputs:
      - load_receipt
    outputs:
      - refresh_status

edges:
  - from: extract_api
    to: clean_data
    mode: sequential

  - from: clean_data
    to: normalize
    mode: sequential

  - from: normalize
    to: validate_schema
    mode: sequential

  - from: validate_schema
    to: load_warehouse
    mode: conditional
    condition: "validation_result.success == true"

  - from: load_warehouse
    to: update_dashboard
    mode: sequential

  - from: validate_schema
    to: extract_api
    mode: error
    condition: "validation_result.success == false"
    explain: "On validation failure, re-extract with corrective parameters."