airflow-dag
Create Apache Airflow DAGs for construction data pipelines. Orchestrate ETL, validation, and reporting workflows.
Best use case
airflow-dag is best used when you need a repeatable AI agent workflow instead of a one-off prompt.
Create Apache Airflow DAGs for construction data pipelines. Orchestrate ETL, validation, and reporting workflows.
Teams using airflow-dag should expect a more consistent output, faster repeated execution, less prompt rewriting.
When to use this skill
- You want a reusable workflow that can be run more than once with consistent structure.
When not to use this skill
- You only need a quick one-off answer and do not need a reusable workflow.
- You cannot install or maintain the underlying files, dependencies, or repository context.
Installation
Claude Code / Cursor / Codex
Manual Installation
- Download SKILL.md from GitHub
- Place it in
.claude/skills/airflow-dag/SKILL.mdinside your project - Restart your AI agent — it will auto-discover the skill
How airflow-dag Compares
| Feature / Agent | airflow-dag | Standard Approach |
|---|---|---|
| Platform Support | Not specified | Limited / Varies |
| Context Awareness | High | Baseline |
| Installation Complexity | Unknown | N/A |
Frequently Asked Questions
What does this skill do?
Create Apache Airflow DAGs for construction data pipelines. Orchestrate ETL, validation, and reporting workflows.
Where can I find the source code?
You can find the source code on GitHub using the link provided at the top of the page.
SKILL.md Source
# Apache Airflow DAG for Construction
## Overview
Apache Airflow orchestrates complex data pipelines. This skill creates DAGs for construction ETL processes - from BIM extraction to cost reports.
## Python Implementation
```python
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional, Callable
from dataclasses import dataclass
from enum import Enum
import json
class TaskStatus(Enum):
"""Task execution status."""
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class DAGTask:
"""Single task in DAG."""
task_id: str
operator: str
params: Dict[str, Any]
upstream: List[str]
downstream: List[str]
@dataclass
class DAGConfig:
"""DAG configuration."""
dag_id: str
schedule: str
start_date: datetime
catchup: bool
default_args: Dict[str, Any]
tags: List[str]
class ConstructionDAGBuilder:
"""Build Airflow DAGs for construction pipelines."""
# Default DAG arguments
DEFAULT_ARGS = {
'owner': 'ddc',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(hours=2)
}
def __init__(self, dag_id: str,
schedule: str = '@daily',
tags: List[str] = None):
self.dag_id = dag_id
self.schedule = schedule
self.tags = tags or ['construction', 'ddc']
self.tasks: Dict[str, DAGTask] = {}
def add_bash_task(self, task_id: str,
command: str,
upstream: List[str] = None) -> str:
"""Add bash command task."""
self.tasks[task_id] = DAGTask(
task_id=task_id,
operator='BashOperator',
params={'bash_command': command},
upstream=upstream or [],
downstream=[]
)
self._update_downstream(task_id, upstream)
return task_id
def add_python_task(self, task_id: str,
python_callable: str,
op_kwargs: Dict = None,
upstream: List[str] = None) -> str:
"""Add Python callable task."""
self.tasks[task_id] = DAGTask(
task_id=task_id,
operator='PythonOperator',
params={
'python_callable': python_callable,
'op_kwargs': op_kwargs or {}
},
upstream=upstream or [],
downstream=[]
)
self._update_downstream(task_id, upstream)
return task_id
def add_sensor_task(self, task_id: str,
filepath: str,
upstream: List[str] = None) -> str:
"""Add file sensor task."""
self.tasks[task_id] = DAGTask(
task_id=task_id,
operator='FileSensor',
params={
'filepath': filepath,
'poke_interval': 300,
'timeout': 3600
},
upstream=upstream or [],
downstream=[]
)
self._update_downstream(task_id, upstream)
return task_id
def add_branch_task(self, task_id: str,
python_callable: str,
upstream: List[str] = None) -> str:
"""Add branching task."""
self.tasks[task_id] = DAGTask(
task_id=task_id,
operator='BranchPythonOperator',
params={'python_callable': python_callable},
upstream=upstream or [],
downstream=[]
)
self._update_downstream(task_id, upstream)
return task_id
def _update_downstream(self, task_id: str, upstream: List[str]):
"""Update downstream references."""
if upstream:
for up_task in upstream:
if up_task in self.tasks:
self.tasks[up_task].downstream.append(task_id)
def generate_dag_code(self) -> str:
"""Generate Airflow DAG Python code."""
code = '''
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.sensors.filesystem import FileSensor
from datetime import datetime, timedelta
default_args = {
'owner': 'ddc',
'depends_on_past': False,
'email_on_failure': True,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
'''
code += f'''
with DAG(
dag_id='{self.dag_id}',
default_args=default_args,
schedule_interval='{self.schedule}',
start_date=datetime(2024, 1, 1),
catchup=False,
tags={self.tags}
) as dag:
'''
# Generate task definitions
for task_id, task in self.tasks.items():
code += self._generate_task_code(task)
code += '\n'
# Generate dependencies
code += '\n # Task dependencies\n'
for task_id, task in self.tasks.items():
if task.upstream:
for upstream in task.upstream:
code += f" {upstream} >> {task_id}\n"
return code
def _generate_task_code(self, task: DAGTask) -> str:
"""Generate code for single task."""
if task.operator == 'BashOperator':
return f''' {task.task_id} = BashOperator(
task_id='{task.task_id}',
bash_command="{task.params['bash_command']}"
)'''
elif task.operator == 'PythonOperator':
kwargs = json.dumps(task.params.get('op_kwargs', {}))
return f''' {task.task_id} = PythonOperator(
task_id='{task.task_id}',
python_callable={task.params['python_callable']},
op_kwargs={kwargs}
)'''
elif task.operator == 'FileSensor':
return f''' {task.task_id} = FileSensor(
task_id='{task.task_id}',
filepath='{task.params["filepath"]}',
poke_interval={task.params['poke_interval']},
timeout={task.params['timeout']}
)'''
elif task.operator == 'BranchPythonOperator':
return f''' {task.task_id} = BranchPythonOperator(
task_id='{task.task_id}',
python_callable={task.params['python_callable']}
)'''
return ''
def save_dag(self, output_path: str):
"""Save DAG to file."""
code = self.generate_dag_code()
with open(output_path, 'w') as f:
f.write(code)
return output_path
class ConstructionPipelineTemplates:
"""Pre-built construction pipeline templates."""
@staticmethod
def bim_validation_pipeline(dag_id: str = 'bim_validation') -> ConstructionDAGBuilder:
"""Create BIM validation pipeline."""
builder = ConstructionDAGBuilder(dag_id, schedule='@daily',
tags=['bim', 'validation'])
# Wait for file
builder.add_sensor_task('wait_for_model', '/data/input/*.ifc')
# Convert to Excel
builder.add_bash_task(
'convert_ifc',
'IfcExporter.exe /data/input/*.ifc bbox',
upstream=['wait_for_model']
)
# Validate data
builder.add_python_task(
'validate_data',
'validate_bim_data',
{'rules_file': '/config/validation_rules.xlsx'},
upstream=['convert_ifc']
)
# Branch based on validation
builder.add_branch_task(
'check_validation',
'check_validation_result',
upstream=['validate_data']
)
# Success path
builder.add_python_task(
'generate_report',
'generate_validation_report',
upstream=['check_validation']
)
# Failure path
builder.add_python_task(
'send_alert',
'send_validation_alert',
upstream=['check_validation']
)
return builder
@staticmethod
def cost_estimation_pipeline(dag_id: str = 'cost_estimation') -> ConstructionDAGBuilder:
"""Create cost estimation pipeline."""
builder = ConstructionDAGBuilder(dag_id, schedule='@weekly',
tags=['cost', 'estimation'])
# Extract BIM data
builder.add_bash_task('extract_bim', 'RvtExporter.exe /data/model.rvt complete bbox')
# Generate QTO
builder.add_python_task(
'generate_qto',
'generate_quantity_takeoff',
upstream=['extract_bim']
)
# Match with cost database
builder.add_python_task(
'match_costs',
'match_cwicr_costs',
upstream=['generate_qto']
)
# Calculate estimate
builder.add_python_task(
'calculate_estimate',
'calculate_project_estimate',
upstream=['match_costs']
)
# Generate report
builder.add_python_task(
'create_report',
'create_cost_report',
upstream=['calculate_estimate']
)
return builder
@staticmethod
def batch_conversion_pipeline(dag_id: str = 'batch_convert') -> ConstructionDAGBuilder:
"""Create batch CAD conversion pipeline."""
builder = ConstructionDAGBuilder(dag_id, schedule='0 2 * * *', # 2 AM daily
tags=['conversion', 'batch'])
# Scan for new files
builder.add_python_task('scan_files', 'scan_input_folder')
# Convert Revit files
builder.add_bash_task(
'convert_rvt',
'for %%f in (/data/input/*.rvt) do RvtExporter.exe "%%f" standard',
upstream=['scan_files']
)
# Convert IFC files
builder.add_bash_task(
'convert_ifc',
'for %%f in (/data/input/*.ifc) do IfcExporter.exe "%%f"',
upstream=['scan_files']
)
# Convert DWG files
builder.add_bash_task(
'convert_dwg',
'for %%f in (/data/input/*.dwg) do DwgExporter.exe "%%f"',
upstream=['scan_files']
)
# Consolidate results
builder.add_python_task(
'consolidate',
'consolidate_conversion_results',
upstream=['convert_rvt', 'convert_ifc', 'convert_dwg']
)
# Archive input files
builder.add_python_task(
'archive',
'archive_processed_files',
upstream=['consolidate']
)
return builder
```
## Quick Start
```python
# Create custom pipeline
builder = ConstructionDAGBuilder('my_pipeline', schedule='@daily')
# Add tasks
builder.add_bash_task('convert', 'RvtExporter.exe model.rvt')
builder.add_python_task('analyze', 'analyze_data', upstream=['convert'])
builder.add_python_task('report', 'create_report', upstream=['analyze'])
# Generate DAG code
code = builder.generate_dag_code()
print(code)
# Save to file
builder.save_dag('/airflow/dags/my_pipeline.py')
```
## Pipeline Templates
### 1. BIM Validation
```python
templates = ConstructionPipelineTemplates()
validation_dag = templates.bim_validation_pipeline()
validation_dag.save_dag('/airflow/dags/bim_validation.py')
```
### 2. Cost Estimation
```python
cost_dag = templates.cost_estimation_pipeline()
cost_dag.save_dag('/airflow/dags/cost_estimation.py')
```
### 3. Batch Conversion
```python
batch_dag = templates.batch_conversion_pipeline()
batch_dag.save_dag('/airflow/dags/batch_convert.py')
```
## Resources
- **DDC Book**: Chapter 4.2 - Apache Airflow Orchestration
- **Airflow Docs**: https://airflow.apache.org/docs/Related Skills
apache-airflow-orchestration
Complete guide for Apache Airflow orchestration including DAGs, operators, sensors, XComs, task dependencies, dynamic workflows, and production deployment
airflow-workflows
Apache Airflow DAG design, operators, and scheduling best practices.
airflow-expert
Expert-level Apache Airflow orchestration, DAGs, operators, sensors, XComs, task dependencies, and scheduling
airflow-etl
Generate Apache Airflow ETL pipelines for government websites and document sources. Explores websites to find downloadable documents, verifies commercial use licenses, and creates complete Airflow DAG assets with daily scheduling. Use when user wants to create ETL pipelines, scrape government documents, or automate document collection workflows.
airflow-dag-patterns
Build production Apache Airflow DAGs with best practices for operators, sensors, testing, and deployment. Use when creating data pipelines, orchestrating workflows, or scheduling batch jobs.
airflow-3x-migration
Comprehensive guide and patterns for migrating Apache Airflow 2.x workflows to Airflow 3.x, covering import changes, deprecated features, and new paradigms like Asset scheduling and TaskFlow API.
ahu-airflow
Fan Selection & Airflow Analysis Agent
airflow
Python DAG workflow orchestration using Apache Airflow for data pipelines, ETL processes, and scheduled task automation
bgo
Automates the complete Blender build-go workflow, from building and packaging your extension/add-on to removing old versions, installing, enabling, and launching Blender for quick testing and iteration.
large-data-with-dask
Specific optimization strategies for Python scripts working with larger-than-memory datasets via Dask.
langsmith-fetch
Debug LangChain and LangGraph agents by fetching execution traces from LangSmith Studio. Use when debugging agent behavior, investigating errors, analyzing tool calls, checking memory operations, or examining agent performance. Automatically fetches recent traces and analyzes execution patterns. Requires langsmith-fetch CLI installed.
langchain-tool-calling
How chat models call tools - includes bind_tools, tool choice strategies, parallel tool calling, and tool message handling