CDC Streaming Pipeline
Data7 nodes · 8 edgesdata
Visual
ex-cdc-streaming.osop.yaml
# Change Data Capture Streaming Pipeline
# Capture database changes, filter, transform, apply to downstream systems, verify consistency
osop_version: "2.0"
id: cdc-streaming
name: CDC Streaming Pipeline
nodes:
- id: capture_changes
type: cli
purpose: Start Debezium connector to capture row-level changes from source database WAL
runtime:
command: |
curl -X POST https://kafka-connect.internal/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "source-db-cdc",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "${SOURCE_DB_HOST}",
"database.port": "5432",
"database.dbname": "production",
"database.server.name": "source-db",
"table.include.list": "public.orders,public.customers,public.inventory",
"slot.name": "cdc_slot",
"plugin.name": "pgoutput",
"publication.name": "cdc_publication"
}
}'
outputs: [connector_name, captured_tables, initial_snapshot_complete]
timeout_sec: 300
security:
credentials: [SOURCE_DB_REPLICATION_USER, KAFKA_CONNECT_AUTH]
explain: |
Uses PostgreSQL logical replication via pgoutput plugin.
Initial snapshot captures current state, then streams incremental changes.
- id: filter_events
type: cli
purpose: Apply Kafka Streams filter to drop internal/system changes and PII scrubbing
runtime:
command: |
java -jar cdc-filter.jar \
--input-topics source-db.public.orders,source-db.public.customers,source-db.public.inventory \
--output-topic cdc-filtered \
--filter-config filter-rules.yaml \
--pii-scrub email,phone,ssn
inputs: [connector_name]
outputs: [filtered_topic, events_dropped_count, pii_scrubbed_count]
timeout_sec: 60
explain: |
Filters out: DELETE events on audit tables, internal flag updates,
and test account changes. PII fields are hashed before downstream delivery.
- id: transform_events
type: cli
purpose: Apply schema transformations and business logic enrichment via Flink job
runtime:
command: |
flink run \
--class com.company.cdc.TransformJob \
--parallelism 4 \
cdc-transform.jar \
--source-topic cdc-filtered \
--sink-topic cdc-transformed \
--schema-registry https://schema-registry.internal
inputs: [filtered_topic]
outputs: [transformed_topic, schema_version, transform_errors]
timeout_sec: 120
explain: |
Flink stateful stream processing: joins order events with customer dimension,
computes derived fields, and validates against Avro schema registry.
- id: apply_to_warehouse
type: db
purpose: Apply transformed change events to analytics data warehouse
runtime:
engine: bigquery
connection: project.analytics.cdc_sink
inputs: [transformed_topic]
outputs: [warehouse_applied_count, warehouse_lag_ms]
timeout_sec: 60
explain: "BigQuery streaming insert via Kafka Connect BigQuery sink connector."
- id: apply_to_cache
type: cli
purpose: Update Redis cache with latest entity state from change events
runtime:
command: |
python apply_to_cache.py \
--topic cdc-transformed \
--redis-cluster redis-cluster.internal:6379 \
--key-pattern "entity:{table}:{id}" \
--ttl 3600
inputs: [transformed_topic]
outputs: [cache_updated_count, cache_misses]
timeout_sec: 60
security:
credentials: [REDIS_AUTH_TOKEN]
- id: verify_consistency
type: cli
purpose: Run periodic consistency checks between source, warehouse, and cache
runtime:
command: |
python verify_cdc_consistency.py \
--source-db postgresql://source:5432/production \
--warehouse bigquery://project.analytics \
--cache redis-cluster.internal:6379 \
--tables orders,customers,inventory \
--sample-rate 0.01 \
--max-lag-seconds 30
inputs: [warehouse_applied_count, cache_updated_count]
outputs: [consistency_passed, lag_seconds, mismatched_records]
timeout_sec: 300
retry_policy:
max_retries: 3
backoff_sec: 60
- id: alert_on_failure
type: api
purpose: Send PagerDuty alert if consistency check fails or lag exceeds threshold
runtime:
endpoint: /v2/enqueue
method: POST
url: https://events.pagerduty.com
inputs: [consistency_passed, lag_seconds, mismatched_records]
outputs: [alert_id]
security:
auth: bearer_token
secret_ref: PAGERDUTY_ROUTING_KEY
edges:
- from: capture_changes
to: filter_events
mode: sequential
- from: filter_events
to: transform_events
mode: sequential
- from: transform_events
to: apply_to_warehouse
mode: parallel
- from: transform_events
to: apply_to_cache
mode: parallel
- from: apply_to_warehouse
to: verify_consistency
mode: sequential
- from: apply_to_cache
to: verify_consistency
mode: sequential
- from: verify_consistency
to: alert_on_failure
mode: conditional
condition: "consistency_passed == false || lag_seconds > 30"
- from: verify_consistency
to: verify_consistency
mode: loop
when: "true"
label: "Continuous consistency monitoring every 5 minutes"