multiAI Summary Pending
Data Engineering Command Center
Complete methodology for designing, building, operating, and scaling data pipelines and infrastructure. Zero dependencies — pure agent skill.
3,556 stars
byopenclaw
Installation
Claude Code / Cursor / Codex
$curl -o ~/.claude/skills/afrexai-data-engineering/SKILL.md --create-dirs "https://raw.githubusercontent.com/openclaw/skills/main/skills/1kalin/afrexai-data-engineering/SKILL.md"
Manual Installation
- Download SKILL.md from GitHub
- Place it in
.claude/skills/afrexai-data-engineering/SKILL.mdinside your project - Restart your AI agent — it will auto-discover the skill
How Data Engineering Command Center Compares
| Feature / Agent | Data Engineering Command Center | Standard Approach |
|---|---|---|
| Platform Support | multi | Limited / Varies |
| Context Awareness | High | Baseline |
| Installation Complexity | Unknown | N/A |
Frequently Asked Questions
What does this skill do?
Complete methodology for designing, building, operating, and scaling data pipelines and infrastructure. Zero dependencies — pure agent skill.
Which AI agents support this skill?
This skill is compatible with multi.
Where can I find the source code?
You can find the source code on GitHub using the link provided at the top of the page.
SKILL.md Source
# Data Engineering Command Center
Complete methodology for designing, building, operating, and scaling data pipelines and infrastructure. Zero dependencies — pure agent skill.
---
## Phase 1: Data Architecture Assessment
Before building anything, understand the landscape.
### Architecture Brief
```yaml
project_name: ""
business_context: ""
data_consumers:
- team: ""
use_case: "" # analytics | ML | operational | reporting | reverse-ETL
latency_requirement: "" # real-time (<1s) | near-real-time (<5min) | batch (hourly+)
query_pattern: "" # ad-hoc | scheduled | API | dashboard
current_state:
sources: [] # list every system producing data
storage: [] # where data lives today
pain_points: [] # what's broken, slow, unreliable
data_volume:
current_gb_per_day: 0
growth_rate_percent: 0
retention_months: 0
constraints:
budget_monthly_usd: 0
team_size: 0
skill_level: "" # junior | mid | senior | mixed
compliance: [] # GDPR, HIPAA, SOX, PCI, none
cloud_provider: "" # AWS | GCP | Azure | multi | on-prem
```
### Architecture Pattern Decision Matrix
| Signal | Pattern | When to Use |
|--------|---------|-------------|
| All consumers need data hourly+ | **Batch ETL** | Reporting, warehousing, most analytics |
| Some need <5 min latency | **Micro-batch** | Dashboard freshness, near-real-time analytics |
| Events need <1s processing | **Streaming** | Fraud detection, real-time pricing, alerts |
| Need both batch + streaming | **Lambda** | When batch accuracy + real-time speed both matter |
| Want to simplify Lambda | **Kappa** | When you can reprocess from stream replay |
| Data lake + warehouse combined | **Lakehouse** | When you need both cheap storage + fast SQL |
| Sources change independently | **Data Mesh** | Large orgs, domain-owned data, >5 teams |
| ML is primary consumer | **Feature Store** | ML-heavy orgs with feature reuse needs |
### Technology Selection Guide
#### Orchestration
| Tool | Best For | Avoid When |
|------|----------|------------|
| **Airflow** | Complex DAGs, Python-native teams, mature ecosystem | Simple pipelines (<5 tasks) |
| **Dagster** | Software-defined assets, strong typing, dev experience | Legacy team resistant to new paradigms |
| **Prefect** | Dynamic workflows, cloud-native, Python-first | Need on-prem with no cloud dependency |
| **dbt** | SQL transformations, ELT, analytics engineering | Non-SQL transforms, streaming |
| **Temporal** | Long-running workflows, retry-heavy, microservices | Simple ETL, small teams |
| **Cron + scripts** | <3 pipelines, solo engineer, simple schedules | Anything with dependencies or retries |
#### Processing
| Tool | Best For | Avoid When |
|------|----------|------------|
| **Spark** | >100GB, complex transforms, ML pipelines | <10GB (overkill), real-time streaming |
| **DuckDB** | Local analytics, <100GB, SQL on files | Distributed processing, production streaming |
| **Polars** | Single-node, Rust-speed, <50GB, DataFrames | Distributed, need Spark ecosystem |
| **Pandas** | <1GB, quick analysis, prototyping | Production pipelines, anything >5GB |
| **Flink** | True streaming, event-time processing | Batch-only, small team (steep learning curve) |
| **SQL (warehouse)** | ELT in Snowflake/BigQuery/Redshift | Complex ML transforms, binary data |
#### Storage
| Tool | Best For | Avoid When |
|------|----------|------------|
| **Snowflake** | Analytics, separation of compute/storage, multi-cloud | Tight budget, real-time OLTP |
| **BigQuery** | GCP-native, serverless, large-scale analytics | Multi-cloud, need fine-grained cost control |
| **Redshift** | AWS-native, existing AWS ecosystem | Elastic scaling needs, multi-cloud |
| **Databricks** | ML + analytics unified, Spark-native, lakehouse | Pure SQL analytics, small data |
| **PostgreSQL** | OLTP + light analytics, <500GB, budget-conscious | >1TB analytics, real-time dashboards on large data |
| **S3/GCS/ADLS** | Raw data lake, cheap storage, any format | Direct SQL queries (need compute layer) |
| **Delta Lake/Iceberg** | Table format on data lake, ACID on files | Simple file storage, no lakehouse need |
---
## Phase 2: Data Modeling
### Modeling Methodology Decision
| Approach | Best For | Key Concept |
|----------|----------|-------------|
| **Kimball (Dimensional)** | BI/reporting, star schemas | Facts + Dimensions, business-process-centric |
| **Inmon (3NF)** | Enterprise data warehouse, single source of truth | Normalized, subject-area-centric |
| **Data Vault 2.0** | Agile warehousing, auditability, multiple sources | Hubs + Links + Satellites, insert-only |
| **One Big Table (OBT)** | Simple analytics, few joins, dashboard performance | Pre-joined, denormalized, fast queries |
| **Activity Schema** | Event analytics, product analytics | Entity + Activity + Feature columns |
### Dimensional Model Template
```yaml
fact_table:
name: "fact_[business_process]"
grain: "" # one row = one [what]?
grain_statement: "One row per [transaction/event/snapshot] at [time grain]"
measures:
- name: ""
type: "" # additive | semi-additive | non-additive
aggregation: "" # SUM | AVG | COUNT | MIN | MAX | COUNT DISTINCT
business_definition: ""
degenerate_dimensions: [] # IDs stored in fact (order_number, invoice_id)
foreign_keys: [] # links to dimension tables
dimensions:
- name: "dim_[entity]"
type: "" # Type 1 (overwrite) | Type 2 (history) | Type 3 (previous value)
natural_key: "" # business key from source
surrogate_key: "" # warehouse-generated key
attributes:
- name: ""
source: ""
scd_type: "" # 1 | 2 | 3
hierarchy: [] # e.g., [country, region, city, store]
```
### SCD Type Decision Guide
| Scenario | SCD Type | Implementation |
|----------|----------|----------------|
| Don't care about history | **Type 1** | UPDATE in place |
| Need full history | **Type 2** | New row + valid_from/valid_to + is_current flag |
| Only need previous value | **Type 3** | Add previous_[column] |
| Track changes with timestamps | **Type 4** | Mini-dimension (history table) |
| Hybrid: some attrs Type 1, some Type 2 | **Type 6** | Combine 1+2+3 in one table |
**Default recommendation:** Type 2 for anything business-critical (customer status, product price, employee department). Type 1 for everything else.
### Naming Conventions
| Object | Convention | Example |
|--------|-----------|---------|
| Raw/staging tables | `raw_[source]_[table]` | `raw_stripe_payments` |
| Staging models | `stg_[source]__[entity]` | `stg_stripe__payments` |
| Intermediate models | `int_[entity]_[verb]` | `int_orders_pivoted` |
| Mart/fact tables | `fct_[business_process]` | `fct_orders` |
| Dimension tables | `dim_[entity]` | `dim_customers` |
| Metrics/aggregates | `mrt_[domain]_[metric]` | `mrt_sales_daily` |
| Snapshots | `snp_[entity]_[grain]` | `snp_inventory_daily` |
| Columns: boolean | `is_[state]` or `has_[thing]` | `is_active`, `has_subscription` |
| Columns: timestamp | `[event]_at` | `created_at`, `shipped_at` |
| Columns: date | `[event]_date` | `order_date` |
| Columns: ID | `[entity]_id` | `customer_id` |
| Columns: amount | `[thing]_amount` | `order_amount` |
| Columns: count | `[thing]_count` | `line_item_count` |
---
## Phase 3: Pipeline Design Patterns
### Universal Pipeline Template
```yaml
pipeline:
name: ""
owner: ""
schedule: "" # cron expression
sla_minutes: 0 # max acceptable runtime
tier: "" # 1 (critical) | 2 (important) | 3 (nice-to-have)
extract:
source_system: ""
connection: ""
strategy: "" # full | incremental | CDC | log-based
incremental_key: "" # column for incremental (e.g., updated_at)
watermark_storage: "" # where to persist last-extracted position
transform:
engine: "" # SQL | Spark | Python | dbt
stages:
- name: "clean"
operations: [] # dedupe, null handling, type casting, trimming
- name: "conform"
operations: [] # standardize codes, currencies, timezones
- name: "enrich"
operations: [] # lookups, calculations, derived fields
- name: "aggregate"
operations: [] # rollups, pivots, window functions
load:
target_system: ""
strategy: "" # append | upsert | merge | truncate-reload | partition-swap
merge_keys: []
partition_key: ""
clustering_keys: []
quality_gates:
pre_load: [] # checks before writing
post_load: [] # checks after writing
error_handling:
strategy: "" # fail-fast | dead-letter | retry | skip-and-alert
max_retries: 3
retry_delay_seconds: 300
alert_channels: []
```
### Extraction Strategy Decision Tree
```
Is the source database?
├── Yes → Does it support CDC?
│ ├── Yes → Use CDC (Debezium, AWS DMS, Fivetran)
│ │ Best for: high-volume, low-latency, minimal source impact
│ └── No → Does it have a reliable updated_at column?
│ ├── Yes → Incremental extraction on updated_at
│ │ ⚠️ Won't catch hard deletes — add periodic full reconciliation
│ └── No → Full extraction
│ Only viable for small tables (<1M rows)
├── Is it an API?
│ ├── Supports webhooks? → Event-driven ingestion
│ ├── Has cursor/pagination? → Incremental with cursor bookmark
│ └── No pagination? → Full pull with rate-limit handling
├── Is it files (S3, SFTP, email)?
│ └── Event-triggered (S3 notification, file watcher)
│ Validate: schema, completeness, filename pattern
└── Is it streaming (Kafka, Kinesis, Pub/Sub)?
└── Consumer group with offset management
Key decisions: at-least-once vs exactly-once, consumer lag alerting
```
### Load Strategy Decision
| Strategy | When | Trade-off |
|----------|------|-----------|
| **Append** | Event/log data, immutable facts | Simple but grows forever — partition + retain |
| **Upsert/Merge** | Dimension updates, SCD Type 1 | Handles updates but slower on large tables |
| **Truncate-Reload** | Small tables (<1M), reference data | Simple but window of missing data |
| **Partition Swap** | Large fact tables, daily loads | Atomic, fast, but needs partition alignment |
| **Soft Delete** | Need audit trail of deletions | Adds complexity to every downstream query |
### Idempotency Rules (NON-NEGOTIABLE)
Every pipeline MUST be re-runnable without side effects:
1. **Use MERGE/UPSERT, never blind INSERT** for mutable data
2. **Partition-swap for immutable data** — drop partition + reload
3. **Store watermarks externally** — not in the pipeline code
4. **Dedup at ingestion** — use source natural keys
5. **Test by running twice** — output must be identical both times
---
## Phase 4: Data Quality Framework
### Quality Dimensions
| Dimension | Definition | Example Check |
|-----------|-----------|---------------|
| **Completeness** | No missing values where required | `NOT NULL` on required fields, row count within range |
| **Uniqueness** | No unexpected duplicates | Primary key uniqueness, natural key uniqueness |
| **Validity** | Values within expected domain | Enum checks, range checks, regex patterns |
| **Accuracy** | Data matches real-world truth | Cross-system reconciliation, manual spot checks |
| **Freshness** | Data arrives on time | `MAX(loaded_at) > NOW() - INTERVAL '2 hours'` |
| **Consistency** | Same data agrees across systems | Sum reconciliation between source and target |
### Quality Check Templates
```sql
-- Completeness: Required fields not null
SELECT COUNT(*) AS null_violations
FROM {table}
WHERE {required_column} IS NULL;
-- Threshold: 0
-- Uniqueness: No duplicate primary keys
SELECT {pk_column}, COUNT(*) AS dupe_count
FROM {table}
GROUP BY {pk_column}
HAVING COUNT(*) > 1;
-- Threshold: 0
-- Freshness: Data arrived within SLA
SELECT CASE
WHEN MAX({timestamp_col}) > CURRENT_TIMESTAMP - INTERVAL '{sla_hours} hours'
THEN 'PASS' ELSE 'FAIL'
END AS freshness_check
FROM {table};
-- Volume: Row count within expected range
SELECT CASE
WHEN COUNT(*) BETWEEN {min_expected} AND {max_expected}
THEN 'PASS' ELSE 'FAIL'
END AS volume_check
FROM {table}
WHERE {partition_col} = '{run_date}';
-- Referential: FK integrity
SELECT COUNT(*) AS orphan_count
FROM {fact_table} f
LEFT JOIN {dim_table} d ON f.{fk} = d.{pk}
WHERE d.{pk} IS NULL;
-- Threshold: 0
-- Distribution: No unexpected skew
SELECT {column}, COUNT(*) AS cnt,
ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 2) AS pct
FROM {table}
GROUP BY {column}
ORDER BY cnt DESC;
-- Alert if any single value > {max_pct}%
-- Cross-system reconciliation
SELECT
(SELECT SUM(amount) FROM source_system.orders WHERE date = '{date}') AS source_total,
(SELECT SUM(amount) FROM warehouse.fct_orders WHERE order_date = '{date}') AS target_total,
ABS(source_total - target_total) AS variance;
-- Threshold: variance < 0.01 * source_total (1%)
```
### Data Contract Template
```yaml
contract:
name: ""
version: ""
owner: "" # team responsible for producing this data
consumers: [] # teams consuming this data
sla:
freshness_hours: 0
availability_percent: 99.9
support_hours: "" # business-hours | 24x7
schema:
- column: ""
type: ""
nullable: false
description: ""
business_definition: ""
pii: false
checks:
- type: "" # not_null | unique | range | enum | regex | custom
params: {}
breaking_change_policy: "" # notify-30-days | version-bump | never-break
notification_channel: ""
```
### Quality Severity Levels
| Level | Definition | Response |
|-------|-----------|----------|
| **P0 — Critical** | Data corruption, wrong numbers in production dashboards, compliance data wrong | Stop pipeline, alert immediately, rollback if possible |
| **P1 — High** | Missing data for key reports, SLA breach, >5% of records affected | Alert team, fix within 4 hours, post-mortem required |
| **P2 — Medium** | Non-critical field quality, <1% records affected, no downstream impact | Fix in next sprint, add monitoring to prevent recurrence |
| **P3 — Low** | Cosmetic issues, edge cases, non-critical data | Backlog, fix when convenient |
---
## Phase 5: Performance Optimization
### SQL Optimization Checklist
| Problem | Fix | Impact |
|---------|-----|--------|
| Full table scan | Add/use partition pruning | 10-100x faster |
| Large joins | Pre-aggregate before joining | 5-50x faster |
| SELECT * | Select only needed columns | 2-10x faster (columnar stores) |
| Correlated subquery | Rewrite as JOIN or window function | 10-100x faster |
| DISTINCT on large result | Fix upstream duplication instead | 2-5x faster |
| ORDER BY without LIMIT | Add LIMIT or remove if not needed | Prevents memory spills |
| String operations in WHERE | Pre-compute, use lookup table | Enables index usage |
| Multiple passes over same data | Combine with CASE WHEN + GROUP BY | 2-5x faster |
| NOT IN with NULLs | Use NOT EXISTS or LEFT JOIN IS NULL | Correctness + performance |
### Spark Optimization Guide
| Problem | Solution |
|---------|----------|
| Shuffle-heavy joins | Broadcast small table (`broadcast(df)`) if <100MB |
| Data skew | Salt the skewed key: add random prefix, join on salted key, aggregate |
| Small files | Coalesce output: `.coalesce(target_files)` or use adaptive query execution |
| Too many partitions | `spark.sql.shuffle.partitions` = 2-3x cluster cores |
| OOM errors | Increase `spark.executor.memory`, reduce partition size, spill to disk |
| Slow writes | Use Parquet with snappy, partition by date, avoid small writes |
| Repeated computation | `.cache()` or `.persist()` DataFrames used >1 time |
| Complex transformations | Push down predicates, filter early, select early |
### Partitioning Strategy
| Data Type | Partition Key | Why |
|-----------|--------------|-----|
| Transactional/event | Date (daily or monthly) | Most queries filter by time range |
| Multi-tenant | Tenant ID + date | Isolate tenant queries, time-range pruning |
| Geospatial | Region + date | Regional queries are common |
| Log data | Date + hour | High volume needs finer partitions |
| Reference/dimension | Don't partition | Too small, full scan is fine |
**Rules:**
- Target 100MB-1GB per partition (compressed)
- <10,000 total partitions per table
- Never partition on high-cardinality columns (user_id)
- Always include partition key in WHERE clauses
---
## Phase 6: Data Governance & Cataloging
### Data Classification
| Level | Examples | Controls |
|-------|---------|----------|
| **Public** | Product catalog, published stats | No restrictions |
| **Internal** | Aggregated metrics, non-PII analytics | Auth required, audit logging |
| **Confidential** | Customer PII, financial records, HR data | Encryption, column-level access, masking |
| **Restricted** | SSN, payment cards, health records, passwords | Encryption at rest + transit, tokenization, audit every access, retention limits |
### PII Handling Rules
1. **Identify:** Scan all sources for PII columns (name, email, phone, SSN, DOB, address, IP)
2. **Classify:** Tag each with sensitivity level
3. **Minimize:** Only ingest PII you actually need
4. **Protect:**
- Hash or tokenize in staging (SHA-256 with salt for pseudonymization)
- Dynamic masking for non-privileged users
- Column-level encryption for restricted data
5. **Retain:** Auto-delete after retention period
6. **Audit:** Log every query touching PII columns
7. **Right to delete:** Build a deletion pipeline that propagates across all derived tables
### Data Catalog Entry Template
```yaml
dataset:
name: ""
description: ""
owner_team: ""
steward: "" # person responsible for quality
domain: "" # sales | marketing | finance | product | engineering
tier: "" # gold (trusted) | silver (cleaned) | bronze (raw)
lineage:
sources: [] # upstream datasets/systems
transformations: "" # brief description of key transforms
downstream: [] # who consumes this
refresh:
schedule: ""
sla_hours: 0
last_successful_run: ""
quality:
tests: [] # list of quality checks
last_score: 0 # 0-100
known_issues: []
access:
classification: "" # public | internal | confidential | restricted
pii_columns: []
access_request_process: "" # how to get access
usage:
avg_daily_queries: 0
top_consumers: []
cost_monthly_usd: 0
```
---
## Phase 7: Pipeline Monitoring & Alerting
### Pipeline Health Dashboard
```yaml
dashboard:
pipeline_metrics:
- metric: "Pipeline Success Rate"
formula: "successful_runs / total_runs * 100"
target: ">99%"
alert_threshold: "<95%"
- metric: "Average Runtime"
formula: "avg(end_time - start_time) over 7 days"
target: "<SLA"
alert_threshold: ">80% of SLA"
- metric: "Data Freshness"
formula: "NOW() - MAX(loaded_at)"
target: "<SLA hours"
alert_threshold: ">SLA"
- metric: "Data Volume Variance"
formula: "abs(today_rows - avg_7d_rows) / avg_7d_rows * 100"
target: "<20%"
alert_threshold: ">50%"
- metric: "Quality Check Pass Rate"
formula: "passed_checks / total_checks * 100"
target: "100%"
alert_threshold: "<95%"
- metric: "Failed Pipeline Count"
formula: "count where status = failed in last 24h"
target: "0"
alert_threshold: ">0"
- metric: "Backfill Queue"
formula: "count of pending backfill requests"
target: "0"
alert_threshold: ">5"
- metric: "Infrastructure Cost"
formula: "compute + storage + egress"
target: "<budget"
alert_threshold: ">110% budget"
```
### Alert Severity
| Severity | Condition | Response Time | Example |
|----------|-----------|---------------|---------|
| **P0** | Revenue/compliance impacting | 15 min | Payment pipeline down, regulatory report delayed |
| **P1** | Business-critical dashboard stale | 1 hour | Executive dashboard >4h stale |
| **P2** | Non-critical pipeline failed | 4 hours | Marketing attribution delayed |
| **P3** | Warning/degradation | Next business day | Pipeline 80% of SLA, minor quality drift |
### Structured Logging Standard
Every pipeline run MUST log:
```json
{
"pipeline_name": "",
"run_id": "",
"started_at": "",
"completed_at": "",
"status": "success|failed|partial",
"stage": "",
"rows_extracted": 0,
"rows_transformed": 0,
"rows_loaded": 0,
"rows_rejected": 0,
"quality_checks_passed": 0,
"quality_checks_failed": 0,
"duration_seconds": 0,
"error_message": "",
"watermark_before": "",
"watermark_after": ""
}
```
---
## Phase 8: Testing Strategy
### Pipeline Test Pyramid
| Layer | What to Test | How | When |
|-------|-------------|-----|------|
| **Unit** | Individual transforms, business logic | pytest with fixtures, dbt unit tests | Every PR |
| **Integration** | Source connectivity, schema compatibility | Test against staging/dev environment | Daily + PR |
| **Contract** | Schema hasn't changed, data types stable | Schema registry, contract tests | Every pipeline run |
| **Data Quality** | Completeness, uniqueness, freshness, validity | Quality framework checks | Every pipeline run |
| **E2E** | Full pipeline produces correct output | Golden dataset comparison | Weekly + release |
| **Performance** | Runtime within SLA, no regression | Benchmark against historical runs | Weekly |
### dbt Testing Checklist
```yaml
# For every model, define at minimum:
models:
- name: fct_orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: customer_id
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_id
- name: order_amount
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
max_value: 1000000
- name: order_status
tests:
- accepted_values:
values: ['pending', 'confirmed', 'shipped', 'delivered', 'cancelled']
- name: ordered_at
tests:
- not_null
- dbt_utils.recency:
datepart: day
field: ordered_at
interval: 2
```
### Backfill Protocol
When you need to reprocess historical data:
1. **Scope:** Define exact date range and affected tables
2. **Impact assessment:** What downstream models/dashboards will be affected?
3. **Communication:** Notify consumers of temporary data inconsistency
4. **Isolation:** Run backfill in separate compute to avoid impacting current pipelines
5. **Validation:** Compare row counts and key metrics pre/post backfill
6. **Execution:** Process in reverse-chronological order (most recent first)
7. **Monitoring:** Watch for resource spikes, duplicate creation
8. **Verification:** Reconcile against source after completion
9. **Documentation:** Log what was backfilled, why, and any anomalies found
---
## Phase 9: Cost Optimization
### Cloud Cost Reduction Strategies
| Strategy | Savings | Effort |
|----------|---------|--------|
| Right-size compute (auto-scaling) | 20-40% | Low |
| Use spot/preemptible instances for batch | 60-80% | Medium |
| Compress data (Parquet + Snappy/Zstd) | 50-80% storage | Low |
| Lifecycle policies (hot → warm → cold → archive) | 40-70% storage | Low |
| Eliminate unused tables/pipelines | 10-30% | Low |
| Optimize query patterns (partition pruning) | 30-60% compute | Medium |
| Reserved capacity for steady-state | 30-50% | Medium |
| Cache expensive queries | 20-50% compute | Medium |
### Cost Allocation Template
```yaml
cost_tracking:
by_pipeline:
- pipeline: ""
compute_monthly_usd: 0
storage_monthly_usd: 0
egress_monthly_usd: 0
total: 0
cost_per_row: 0 # total / rows_processed
business_value: "" # what revenue/decision does this enable?
roi_justified: true # is the cost worth it?
optimization_opportunities:
- description: ""
estimated_savings_usd: 0
effort: "" # low | medium | high
priority: 0 # 1 = do now
```
### Cost Red Flags
- Single pipeline >30% of total spend
- Cost per row increasing month-over-month
- Tables with 0 queries in 30 days
- Dev/staging environments running 24/7
- Full table scans on >1TB tables
- Uncompressed data in cloud storage
- Cross-region data transfer
---
## Phase 10: Operational Runbooks
### Pipeline Failure Triage
```
Pipeline failed →
1. Check error message in logs
├── Connection timeout → Check source availability, network, credentials
├── Schema mismatch → Source schema changed → update extract + notify
├── Data quality check failed → Investigate source data, check for anomalies
├── Out of memory → Increase resources or optimize query
├── Permission denied → Check IAM roles, token expiry
├── Duplicate key violation → Check idempotency, investigate source dupes
└── Timeout (SLA breach) → Check data volume spike, query plan, cluster health
2. Determine impact
├── What dashboards/reports are affected?
├── What's the data freshness SLA?
└── Who needs to be notified?
3. Fix
├── Transient (network, timeout) → Retry
├── Data issue → Fix source data, re-run with quality gate override if safe
├── Schema change → Update pipeline, backfill if needed
└── Infrastructure → Scale up, file ticket with cloud provider
4. Post-fix
├── Verify data correctness
├── Update runbook with new failure mode
└── Add monitoring/alerting to catch earlier next time
```
### Schema Change Management
When a source system changes schema:
1. **Detect:** Schema comparison check in extraction pipeline (hash schema, compare to registered)
2. **Classify:**
- **Additive** (new column): Usually safe — add to pipeline, backfill if needed
- **Rename**: Map old → new in transform, update downstream
- **Type change**: Assess compatibility, may need cast or historical rebuild
- **Column removed**: Critical — breaks queries, need immediate attention
3. **Test:** Run pipeline in dry-run mode with new schema
4. **Deploy:** Update transforms, quality checks, documentation
5. **Communicate:** Notify downstream consumers via data contract channel
### Disaster Recovery
| Scenario | RPO | RTO | Recovery Steps |
|----------|-----|-----|----------------|
| Pipeline code lost | 0 (git) | 1h | Redeploy from git, restore orchestrator state |
| Warehouse data corrupted | Varies | 4h | Restore from Time Travel/snapshot, re-run affected pipelines |
| Source system down | N/A | Wait | Queue extractions, catch up with incremental once restored |
| Cloud region outage | 24h | 8h | Failover to DR region if configured, else wait |
| Credential compromise | 0 | 2h | Rotate all credentials, audit access logs, re-run affected pipelines |
---
## Phase 11: Advanced Patterns
### Slowly Changing Dimension Type 2 (SQL Template)
```sql
-- Merge pattern for SCD Type 2
MERGE INTO dim_customer AS target
USING (
SELECT * FROM stg_customers
WHERE updated_at > (SELECT MAX(valid_from) FROM dim_customer)
) AS source
ON target.customer_natural_key = source.customer_id
AND target.is_current = TRUE
-- Update: close old record
WHEN MATCHED AND (
target.customer_name != source.name OR
target.customer_status != source.status
-- list all Type 2 tracked columns
) THEN UPDATE SET
is_current = FALSE,
valid_to = CURRENT_TIMESTAMP
-- Insert: new record (both new customers and changed ones)
WHEN NOT MATCHED THEN INSERT (
customer_natural_key, customer_name, customer_status,
valid_from, valid_to, is_current
) VALUES (
source.customer_id, source.name, source.status,
CURRENT_TIMESTAMP, '9999-12-31', TRUE
);
-- Then insert new versions of changed records
INSERT INTO dim_customer (
customer_natural_key, customer_name, customer_status,
valid_from, valid_to, is_current
)
SELECT customer_id, name, status,
CURRENT_TIMESTAMP, '9999-12-31', TRUE
FROM stg_customers s
WHERE EXISTS (
SELECT 1 FROM dim_customer d
WHERE d.customer_natural_key = s.customer_id
AND d.is_current = FALSE
AND d.valid_to = CURRENT_TIMESTAMP
);
```
### CDC with Debezium (Architecture Pattern)
```
Source DB → Debezium Connector → Kafka Topic →
├── Stream processor (Flink/Spark Streaming) → Target DB
├── S3 sink connector → Data Lake (raw)
└── Elasticsearch sink → Search index
```
Key decisions:
- **Topic per table** or **single topic**: Per table (easier routing, independent scaling)
- **Schema registry**: Always use (Confluent Schema Registry or AWS Glue)
- **Serialization**: Avro (compact + schema evolution) or Protobuf (strict + fast)
- **Offset management**: Connector manages; monitor consumer lag
### Feature Store Pattern
```yaml
feature_store:
entity: "customer"
entity_key: "customer_id"
features:
- name: "total_orders_30d"
description: "Total orders in last 30 days"
type: "INT"
source: "fct_orders"
computation: "batch" # batch | streaming | on-demand
freshness: "daily"
ttl_hours: 48
- name: "avg_order_value_90d"
description: "Average order value last 90 days"
type: "FLOAT"
source: "fct_orders"
computation: "batch"
freshness: "daily"
ttl_hours: 48
- name: "last_login_minutes_ago"
description: "Minutes since last login event"
type: "INT"
source: "events_stream"
computation: "streaming"
freshness: "real-time"
ttl_hours: 1
serving:
online: true # low-latency feature serving (Redis/DynamoDB)
offline: true # batch feature retrieval for training
point_in_time_correct: true # prevent feature leakage in ML training
```
### Data Mesh Principles
If operating at scale (>5 data teams):
1. **Domain ownership**: Each business domain owns its data products (not central data team)
2. **Data as a product**: Treat datasets like products — SLAs, documentation, discoverability
3. **Self-serve platform**: Central team builds the platform, domains build on top
4. **Federated governance**: Standards and interoperability maintained centrally, implementation decentralized
**When NOT to use Data Mesh:**
- <5 data producers/consumers
- Small team (<20 engineers total)
- Single business domain
- Early-stage company (over-engineering)
---
## Quality Scoring Rubric (0-100)
| Dimension | Weight | Scoring |
|-----------|--------|---------|
| **Pipeline Reliability** | 20 | 0=frequent failures, 10=some failures with manual recovery, 20=99.5%+ success rate with auto-retry |
| **Data Quality** | 20 | 0=no checks, 10=basic null/unique checks, 20=comprehensive quality framework with contracts |
| **Performance** | 15 | 0=regularly breaches SLA, 8=meets SLA, 15=well under SLA with optimization |
| **Documentation** | 10 | 0=none, 5=basic README, 10=full catalog entries with lineage and business definitions |
| **Monitoring** | 15 | 0=no alerts, 8=failure alerts only, 15=proactive monitoring with dashboards and anomaly detection |
| **Testing** | 10 | 0=no tests, 5=basic smoke tests, 10=full test pyramid (unit+integration+contract+E2E) |
| **Cost Efficiency** | 10 | 0=no cost tracking, 5=tracked, 10=optimized with ROI justification per pipeline |
**Scoring guide:**
- 0-40: Critical gaps — prioritize pipeline reliability and data quality
- 41-60: Functional but fragile — add monitoring, testing, documentation
- 61-80: Solid — optimize performance, cost, governance
- 81-100: Excellent — maintain, innovate, mentor
---
## Edge Cases & Gotchas
### Timezone Traps
- Store everything in UTC. Convert only at presentation layer
- Event timestamps: use event time, not processing time
- Daylight saving: `TIMESTAMP WITH TIME ZONE`, never `WITHOUT`
- Late-arriving data: watermark strategy + allowed lateness window
### Late-Arriving Data
- Define maximum acceptable lateness per source
- Reprocess affected partitions when late data arrives
- Track late arrival rate as a quality metric
- Consider separate "late data" pipeline that patches in
### Exactly-Once Processing
- True exactly-once is expensive. Most systems need at-least-once + idempotent writes
- Use transaction IDs or natural keys for deduplication
- Kafka: use idempotent producer + transactional consumer
- Database: MERGE/UPSERT on natural key
### Schema Evolution
- **Forward compatible**: New code reads old data (safe to deploy new readers first)
- **Backward compatible**: Old code reads new data (safe to deploy new writers first)
- **Full compatible**: Both directions (safest, most restrictive)
- Use Avro or Protobuf with schema registry for streaming data
### Multi-Tenant Data
- Tenant ID in every table, every query, every log
- Row-level security in warehouse
- Separate compute per tenant (or at least isolation)
- Never join across tenants without explicit business reason
- Tenant-aware backfill (don't rebuild all tenants for one tenant's issue)
### Data Lake Anti-Patterns
- "Data Swamp": ingesting everything with no organization or catalog → only ingest what has a known consumer
- Small files: thousands of <1MB files → compact regularly (target 100MB-1GB)
- No table format: raw Parquet/CSV without Delta/Iceberg → loses ACID, schema evolution, time travel
- No access controls: single bucket, everyone admin → implement IAM per domain/team
---
## Natural Language Commands
Say any of these to activate specific workflows:
1. **"Design a data pipeline for [source] to [target]"** → Full pipeline template with extraction strategy, transforms, load pattern, quality checks
2. **"Model [entity/domain] for analytics"** → Dimensional model with fact/dimension tables, grain, measures, SCD types
3. **"Optimize this query/pipeline"** → Performance analysis with specific recommendations
4. **"Set up data quality for [table/pipeline]"** → Quality framework with checks, contracts, monitoring
5. **"Audit our data infrastructure"** → Full assessment using scoring rubric
6. **"Help with [Spark/Airflow/dbt/Kafka] issue"** → Troubleshooting with technology-specific guidance
7. **"Design a data catalog for our org"** → Catalog template with governance, classification, lineage
8. **"Plan a data migration from [old] to [new]"** → Migration plan with validation, rollback, parallel-run
9. **"Set up monitoring for our pipelines"** → Dashboard template with alerts, logging standards, runbooks
10. **"Review our data costs"** → Cost analysis with optimization strategies and ROI framework
11. **"Handle schema change in [source]"** → Change management protocol with impact assessment
12. **"Backfill [table] for [date range]"** → Backfill protocol with validation and communication plan