airflow-dag-analyzer

Analyzes, validates, and optimizes Apache Airflow DAGs for reliability, performance, and best practices adherence.

509 stars

Best use case

airflow-dag-analyzer is best used when you need a repeatable AI agent workflow instead of a one-off prompt.

Analyzes, validates, and optimizes Apache Airflow DAGs for reliability, performance, and best practices adherence.

Teams using airflow-dag-analyzer should expect a more consistent output, faster repeated execution, less prompt rewriting.

When to use this skill

  • You want a reusable workflow that can be run more than once with consistent structure.

When not to use this skill

  • You only need a quick one-off answer and do not need a reusable workflow.
  • You cannot install or maintain the underlying files, dependencies, or repository context.

Installation

Claude Code / Cursor / Codex

$curl -o ~/.claude/skills/airflow-dag-analyzer/SKILL.md --create-dirs "https://raw.githubusercontent.com/a5c-ai/babysitter/main/library/specializations/data-engineering-analytics/skills/airflow-dag-analyzer/SKILL.md"

Manual Installation

  1. Download SKILL.md from GitHub
  2. Place it in .claude/skills/airflow-dag-analyzer/SKILL.md inside your project
  3. Restart your AI agent — it will auto-discover the skill

How airflow-dag-analyzer Compares

Feature / Agentairflow-dag-analyzerStandard Approach
Platform SupportNot specifiedLimited / Varies
Context Awareness High Baseline
Installation ComplexityUnknownN/A

Frequently Asked Questions

What does this skill do?

Analyzes, validates, and optimizes Apache Airflow DAGs for reliability, performance, and best practices adherence.

Where can I find the source code?

You can find the source code on GitHub using the link provided at the top of the page.

SKILL.md Source

# Airflow DAG Analyzer

Analyzes, validates, and optimizes Apache Airflow DAGs for reliability and performance.

## Overview

This skill examines Apache Airflow DAG definitions to identify performance bottlenecks, reliability issues, and best practice violations. It provides recommendations for task dependency optimization, parallelism configuration, error handling, and resource management.

## Capabilities

- **DAG structure analysis and validation** - Parse and validate DAG structure
- **Task dependency optimization** - Identify bottlenecks and suggest parallel execution
- **Parallelism and concurrency recommendations** - Optimize pool and slot allocation
- **SLA and timeout configuration** - Recommend appropriate timeouts and SLAs
- **Retry and failure handling patterns** - Validate retry logic and alerting
- **Sensor optimization** - Smart sensors, deferrable operators, reschedule mode
- **Resource pool allocation** - Optimize pool usage and worker distribution
- **DAG scheduling optimization** - Catchup, backfill, and schedule interval tuning
- **Cross-DAG dependency detection** - Identify external dependencies and triggers

## Input Schema

```json
{
  "dagCode": {
    "type": "string",
    "description": "The Python DAG definition code",
    "required": true
  },
  "dagId": {
    "type": "string",
    "description": "The DAG identifier"
  },
  "executionHistory": {
    "type": "object",
    "description": "Historical execution metrics",
    "properties": {
      "runs": {
        "type": "array",
        "items": {
          "dagRunId": "string",
          "executionDate": "string",
          "duration": "number",
          "state": "string",
          "taskDurations": "object"
        }
      }
    }
  },
  "clusterConfig": {
    "type": "object",
    "properties": {
      "workerCount": "number",
      "executorType": "string",
      "poolConfigs": "object",
      "airflowVersion": "string"
    }
  },
  "analysisScope": {
    "type": "array",
    "items": {
      "type": "string",
      "enum": ["structure", "performance", "reliability", "resources", "security"]
    },
    "default": ["structure", "performance", "reliability"]
  }
}
```

## Output Schema

```json
{
  "validationResults": {
    "errors": {
      "type": "array",
      "items": {
        "code": "string",
        "message": "string",
        "line": "number",
        "severity": "error"
      }
    },
    "warnings": {
      "type": "array",
      "items": {
        "code": "string",
        "message": "string",
        "line": "number",
        "severity": "warning"
      }
    }
  },
  "optimizations": {
    "type": "array",
    "items": {
      "category": "string",
      "current": "string",
      "recommended": "string",
      "impact": "high|medium|low",
      "effort": "string",
      "codeChange": "string"
    }
  },
  "recommendedConfig": {
    "type": "object",
    "properties": {
      "poolSize": "number",
      "maxActiveRuns": "number",
      "concurrency": "number",
      "defaultRetries": "number",
      "executionTimeout": "string"
    }
  },
  "dependencyGraph": {
    "type": "object",
    "properties": {
      "nodes": "array",
      "edges": "array",
      "criticalPath": "array",
      "parallelGroups": "array"
    }
  },
  "metrics": {
    "taskCount": "number",
    "maxDepth": "number",
    "parallelizationRatio": "number",
    "estimatedDuration": "string"
  },
  "securityFindings": {
    "type": "array",
    "items": {
      "severity": "high|medium|low",
      "finding": "string",
      "recommendation": "string"
    }
  }
}
```

## Usage Examples

### Basic DAG Analysis

```json
{
  "dagCode": "from airflow import DAG\nfrom airflow.operators.python import PythonOperator\n...",
  "dagId": "daily_etl_pipeline"
}
```

### With Execution History

```json
{
  "dagCode": "...",
  "dagId": "daily_etl_pipeline",
  "executionHistory": {
    "runs": [
      {
        "dagRunId": "manual__2024-01-15",
        "duration": 3600,
        "state": "success",
        "taskDurations": {
          "extract": 600,
          "transform": 1800,
          "load": 1200
        }
      }
    ]
  }
}
```

### Full Analysis with Cluster Config

```json
{
  "dagCode": "...",
  "dagId": "complex_ml_pipeline",
  "clusterConfig": {
    "workerCount": 8,
    "executorType": "KubernetesExecutor",
    "poolConfigs": {
      "default_pool": {"slots": 128},
      "ml_pool": {"slots": 32}
    },
    "airflowVersion": "2.8.0"
  },
  "analysisScope": ["structure", "performance", "reliability", "resources", "security"]
}
```

## Validation Rules

### DAG Definition Rules

| Rule | Severity | Description |
|------|----------|-------------|
| DAG-001 | Error | Missing DAG default_args |
| DAG-002 | Error | Invalid schedule_interval |
| DAG-003 | Warning | Catchup enabled for long-running DAG |
| DAG-004 | Warning | No email on failure configured |
| DAG-005 | Info | Consider using @dag decorator |

### Task Definition Rules

| Rule | Severity | Description |
|------|----------|-------------|
| TSK-001 | Error | Task has no upstream or downstream |
| TSK-002 | Warning | Task missing retries configuration |
| TSK-003 | Warning | Execution timeout not set |
| TSK-004 | Warning | PythonOperator with no pool |
| TSK-005 | Info | Consider TaskGroup for related tasks |

### Sensor Rules

| Rule | Severity | Description |
|------|----------|-------------|
| SEN-001 | Warning | Sensor in poke mode (use reschedule) |
| SEN-002 | Warning | Sensor missing timeout |
| SEN-003 | Info | Consider deferrable operator |
| SEN-004 | Warning | External sensor without soft_fail |

### Security Rules

| Rule | Severity | Description |
|------|----------|-------------|
| SEC-001 | Error | Hardcoded credentials |
| SEC-002 | Warning | Using Variable.get without default |
| SEC-003 | Warning | Connection ID not parameterized |
| SEC-004 | Info | Consider Secrets Backend |

## Optimization Patterns

### Parallelization

```python
# Before: Sequential execution
task1 >> task2 >> task3 >> task4

# After: Parallel execution where possible
task1 >> [task2, task3] >> task4
```

### Sensor Optimization

```python
# Before: Poke mode (blocks worker)
FileSensor(
    task_id='wait_for_file',
    filepath='/data/input.csv',
    mode='poke'  # Bad
)

# After: Reschedule mode (releases worker)
FileSensor(
    task_id='wait_for_file',
    filepath='/data/input.csv',
    mode='reschedule',  # Good
    poke_interval=300
)

# Best: Deferrable (Airflow 2.2+)
from airflow.sensors.filesystem import FileSensor
FileSensor(
    task_id='wait_for_file',
    filepath='/data/input.csv',
    deferrable=True
)
```

### TaskGroups

```python
# Before: Flat task structure
extract_orders >> transform_orders >> load_orders
extract_products >> transform_products >> load_products

# After: TaskGroups for organization
with TaskGroup('orders') as orders_group:
    extract >> transform >> load

with TaskGroup('products') as products_group:
    extract >> transform >> load
```

### Dynamic Task Mapping (Airflow 2.3+)

```python
# Before: Static task generation
for i in range(10):
    PythonOperator(task_id=f'process_{i}', ...)

# After: Dynamic task mapping
@task
def process_item(item):
    return item * 2

process_item.expand(item=[1, 2, 3, 4, 5])
```

## Configuration Recommendations

### Default Args Template

```python
default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email': ['alerts@company.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
    'max_retry_delay': timedelta(minutes=30),
    'execution_timeout': timedelta(hours=2),
    'sla': timedelta(hours=1),
}
```

### Pool Configuration

| Workload Type | Recommended Pool Size |
|---------------|----------------------|
| Heavy compute | 2-4 per worker |
| I/O bound | 8-16 per worker |
| API calls | Rate limit based |
| Sensors | Separate pool, high slots |

## Integration Points

### MCP Server Integration

- **yangkyeongmo/mcp-server-apache-airflow** - Airflow REST API integration
- **Dagster MCP** - Alternative orchestration patterns
- **Prefect MCP** - Modern orchestration comparison

### Related Skills

- dbt Project Analyzer (SK-DEA-003) - dbt operator optimization
- Data Lineage Mapper (SK-DEA-010) - Task lineage extraction

### Applicable Processes

- ETL/ELT Pipeline (`etl-elt-pipeline.js`)
- A/B Testing Pipeline (`ab-testing-pipeline.js`)
- Pipeline Migration (`pipeline-migration.js`)
- Data Quality Framework (`data-quality-framework.js`)

## References

- [Airflow Best Practices](https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html)
- [Airflow DAG Writing Best Practices](https://airflow.apache.org/docs/apache-airflow/stable/howto/dag-best-practices.html)
- [Astronomer Guides](https://docs.astronomer.io/learn)
- [Airflow MCP Server](https://github.com/yangkyeongmo/mcp-server-apache-airflow)

## Version History

- **1.0.0** - Initial release with Airflow 2.x support

Related Skills

terraform-analyzer

509
from a5c-ai/babysitter

Specialized skill for analyzing Terraform configurations. Supports parsing, security scanning (tfsec, checkov), cost estimation (infracost), drift detection, and plan visualization across AWS, Azure, and GCP.

db-query-analyzer

509
from a5c-ai/babysitter

Analyze database query performance with execution plans and index recommendations

code-complexity-analyzer

509
from a5c-ai/babysitter

Analyze code complexity metrics including cyclomatic complexity, code smells, and technical debt

cloudformation-analyzer

509
from a5c-ai/babysitter

Validate and analyze AWS CloudFormation templates for security and best practices

semantic-code-analyzer

509
from a5c-ai/babysitter

LLM-powered semantic analysis of code diffs to detect business-logic trojans

sast-analyzer

509
from a5c-ai/babysitter

Static Application Security Testing orchestration and analysis. Execute Semgrep, Bandit, ESLint security plugins, CodeQL, and other SAST tools. Parse, prioritize, and deduplicate findings across multiple tools with remediation guidance.

crypto-analyzer

509
from a5c-ai/babysitter

Cryptographic implementation analysis and validation for encryption algorithms, key sizes, and certificate management

semver-analyzer

509
from a5c-ai/babysitter

Analyze code changes and determine semantic version bumps. Detect breaking changes automatically, suggest version bump (major/minor/patch), generate changelog entries, and validate version consistency.

api-diff-analyzer

509
from a5c-ai/babysitter

Compare API specifications to detect breaking changes. Compare OpenAPI spec versions, categorize changes by severity, generate migration guides, and block breaking changes in CI.

process-analyzer

509
from a5c-ai/babysitter

Analyze processes, identify workflows, define boundaries and scope, and map process requirements for specialization creation.

scope-logic-analyzer

509
from a5c-ai/babysitter

Test equipment integration for signal analysis (oscilloscope and logic analyzer)

protocol-analyzer

509
from a5c-ai/babysitter

Serial protocol analysis and debugging for common embedded interfaces (I2C, SPI, UART)