ml-data-pipeline-architecture

Patterns for efficient ML data pipelines using Polars, Arrow, and ClickHouse. TRIGGERS - data pipeline, polars vs pandas, arrow format, clickhouse ml, efficient loading, zero-copy, memory optimization.

29 stars

Best use case

ml-data-pipeline-architecture is best used when you need a repeatable AI agent workflow instead of a one-off prompt.

Patterns for efficient ML data pipelines using Polars, Arrow, and ClickHouse. TRIGGERS - data pipeline, polars vs pandas, arrow format, clickhouse ml, efficient loading, zero-copy, memory optimization.

Teams using ml-data-pipeline-architecture should expect a more consistent output, faster repeated execution, less prompt rewriting.

When to use this skill

  • You want a reusable workflow that can be run more than once with consistent structure.

When not to use this skill

  • You only need a quick one-off answer and do not need a reusable workflow.
  • You cannot install or maintain the underlying files, dependencies, or repository context.

Installation

Claude Code / Cursor / Codex

$curl -o ~/.claude/skills/ml-data-pipeline-architecture/SKILL.md --create-dirs "https://raw.githubusercontent.com/terrylica/cc-skills/main/plugins/devops-tools/skills/ml-data-pipeline-architecture/SKILL.md"

Manual Installation

  1. Download SKILL.md from GitHub
  2. Place it in .claude/skills/ml-data-pipeline-architecture/SKILL.md inside your project
  3. Restart your AI agent — it will auto-discover the skill

How ml-data-pipeline-architecture Compares

Feature / Agentml-data-pipeline-architectureStandard Approach
Platform SupportNot specifiedLimited / Varies
Context Awareness High Baseline
Installation ComplexityUnknownN/A

Frequently Asked Questions

What does this skill do?

Patterns for efficient ML data pipelines using Polars, Arrow, and ClickHouse. TRIGGERS - data pipeline, polars vs pandas, arrow format, clickhouse ml, efficient loading, zero-copy, memory optimization.

Where can I find the source code?

You can find the source code on GitHub using the link provided at the top of the page.

SKILL.md Source

# ML Data Pipeline Architecture

Patterns for efficient ML data pipelines using Polars, Arrow, and ClickHouse.

**ADR**: [2026-01-22-polars-preference-hook](/docs/adr/2026-01-22-polars-preference-hook.md) (efficiency preferences framework)

> **Note**: A PreToolUse hook enforces Polars preference. To use Pandas, add `# polars-exception: <reason>` at file top.

> **Self-Evolving Skill**: This skill improves through use. If instructions are wrong, parameters drifted, or a workaround was needed — fix this file immediately, don't defer. Only update for real, reproducible issues.

## When to Use This Skill

Use this skill when:

- Deciding between Polars and Pandas for a data pipeline
- Optimizing memory usage with zero-copy Arrow patterns
- Loading data from ClickHouse into PyTorch DataLoaders
- Implementing lazy evaluation for large datasets
- Migrating existing Pandas code to Polars

---

## 1. Decision Tree: Polars vs Pandas

```
Dataset size?
├─ < 1M rows → Pandas OK (simpler API, richer ecosystem)
├─ 1M-10M rows → Consider Polars (2-5x faster, less memory)
└─ > 10M rows → Use Polars (required for memory efficiency)

Operations?
├─ Simple transforms → Either works
├─ Group-by aggregations → Polars 5-10x faster
├─ Complex joins → Polars with lazy evaluation
└─ Streaming/chunked → Polars scan_* functions

Integration?
├─ scikit-learn heavy → Pandas (better interop)
├─ PyTorch/custom → Polars + Arrow (zero-copy to tensor)
└─ ClickHouse source → Arrow stream → Polars (optimal)
```

---

## 2. Zero-Copy Pipeline Architecture

### The Problem with Pandas

```python
# BAD: 3 memory copies
df = pd.read_sql(query, conn)     # Copy 1: DB → pandas
X = df[features].values           # Copy 2: pandas → numpy
tensor = torch.from_numpy(X)      # Copy 3: numpy → tensor
# Peak memory: 3x data size
```

### The Solution with Arrow

```python
# GOOD: 0-1 memory copies
import clickhouse_connect
import polars as pl
import torch

client = clickhouse_connect.get_client(...)
arrow_table = client.query_arrow("SELECT * FROM bars")  # Arrow in DB memory
df = pl.from_arrow(arrow_table)                          # Zero-copy view
X = df.select(features).to_numpy()                       # Single allocation
tensor = torch.from_numpy(X)                             # View (no copy)
# Peak memory: 1.2x data size
```

---

## 3. ClickHouse Integration Patterns

### Pattern A: Arrow Stream (Recommended)

```python
def query_arrow(client, query: str) -> pl.DataFrame:
    """ClickHouse → Arrow → Polars (zero-copy chain)."""
    arrow_table = client.query_arrow(f"{query} FORMAT ArrowStream")
    return pl.from_arrow(arrow_table)

# Usage
df = query_arrow(client, "SELECT * FROM bars WHERE ts >= '2024-01-01'")
```

### Pattern B: Polars Native (Simpler)

```python
# Polars has native ClickHouse support (see pola.rs for version requirements)
df = pl.read_database_uri(
    query="SELECT * FROM bars",
    uri="clickhouse://user:pass@host/db"
)
```

### Pattern C: Parquet Export (Batch Jobs)

```python
# For reproducible batch processing
client.query("SELECT * FROM bars INTO OUTFILE 'data.parquet' FORMAT Parquet")
df = pl.scan_parquet("data.parquet")  # Lazy, memory-mapped
```

---

## 4. PyTorch DataLoader Integration

### Minimal Change Pattern

```python
from torch.utils.data import TensorDataset, DataLoader

# Accept both pandas and polars
def prepare_data(df) -> tuple[torch.Tensor, torch.Tensor]:
    if isinstance(df, pd.DataFrame):
        df = pl.from_pandas(df)

    X = df.select(features).to_numpy()
    y = df.select(target).to_numpy()

    return (
        torch.from_numpy(X).float(),
        torch.from_numpy(y).float()
    )

X, y = prepare_data(df)
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, pin_memory=True)
```

### Custom PolarsDataset (Large Data)

```python
class PolarsDataset(torch.utils.data.Dataset):
    """Memory-efficient dataset from Polars DataFrame."""

    def __init__(self, df: pl.DataFrame, features: list[str], target: str):
        self.arrow = df.to_arrow()  # Arrow backing for zero-copy slicing
        self.features = features
        self.target = target

    def __len__(self) -> int:
        return self.arrow.num_rows

    def __getitem__(self, idx: int) -> tuple[torch.Tensor, torch.Tensor]:
        row = self.arrow.slice(idx, 1)
        x = torch.tensor([row[f][0].as_py() for f in self.features], dtype=torch.float32)
        y = torch.tensor(row[self.target][0].as_py(), dtype=torch.float32)
        return x, y
```

---

## 5. Lazy Evaluation Patterns

### Pipeline Composition

```python
# Define transformations lazily (no computation yet)
pipeline = (
    pl.scan_parquet("raw_data.parquet")
    .filter(pl.col("timestamp") >= start_date)
    .with_columns([
        (pl.col("close").pct_change()).alias("returns"),
        (pl.col("volume").log()).alias("log_volume"),
    ])
    .select(features + [target])
)

# Execute only when needed
train_df = pipeline.filter(pl.col("timestamp") < split_date).collect()
test_df = pipeline.filter(pl.col("timestamp") >= split_date).collect()
```

### Streaming for Large Files

```python
# Process file in chunks (never loads full file)
def process_large_file(path: str, chunk_size: int = 100_000):
    reader = pl.scan_parquet(path)

    for batch in reader.iter_batches(n_rows=chunk_size):
        # Process each chunk
        features = compute_features(batch)
        yield features.to_numpy()
```

---

## 6. Schema Validation

### Pydantic for Config

```python
from pydantic import BaseModel, field_validator

class FeatureConfig(BaseModel):
    features: list[str]
    target: str
    seq_len: int = 15

    @field_validator("features")
    @classmethod
    def validate_features(cls, v):
        required = {"returns_vs", "momentum_z", "atr_pct"}
        missing = required - set(v)
        if missing:
            raise ValueError(f"Missing required features: {missing}")
        return v
```

### DataFrame Schema Validation

```python
def validate_schema(df: pl.DataFrame, required: list[str], stage: str) -> None:
    """Fail-fast schema validation."""
    missing = [c for c in required if c not in df.columns]
    if missing:
        raise ValueError(
            f"[{stage}] Missing columns: {missing}\n"
            f"Available: {sorted(df.columns)}"
        )
```

---

## 7. Performance Benchmarks

| Operation      | Pandas | Polars | Speedup |
| -------------- | ------ | ------ | ------- |
| Read CSV (1GB) | 45s    | 4s     | 11x     |
| Filter rows    | 2.1s   | 0.4s   | 5x      |
| Group-by agg   | 3.8s   | 0.3s   | 13x     |
| Sort           | 5.2s   | 0.4s   | 13x     |
| Memory peak    | 10GB   | 2.5GB  | 4x      |

_Benchmark: 50M rows, 20 columns, MacBook M2_

---

## 8. Migration Checklist

### Phase 1: Add Arrow Support

- [ ] Add `polars = "<version>"` to dependencies (see [PyPI](https://pypi.org/project/polars/))
- [ ] Implement `query_arrow()` in data client
- [ ] Verify zero-copy with memory profiler

### Phase 2: Polars at Entry Points

- [ ] Add `pl.from_pandas()` wrapper at trainer entry
- [ ] Update `prepare_sequences()` to accept both types
- [ ] Add schema validation after conversion

### Phase 3: Full Lazy Evaluation

- [ ] Convert file reads to `pl.scan_*`
- [ ] Compose transformations lazily
- [ ] Call `.collect()` only before `.to_numpy()`

---

## 9. Anti-Patterns to Avoid

### DON'T: Mix APIs Unnecessarily

```python
# BAD: Convert back and forth
df_polars = pl.from_pandas(df_pandas)
df_pandas_again = df_polars.to_pandas()  # Why?
```

### DON'T: Collect Too Early

```python
# BAD: Defeats lazy evaluation
df = pl.scan_parquet("data.parquet").collect()  # Full load
filtered = df.filter(...)  # After the fact

# GOOD: Filter before collect
df = pl.scan_parquet("data.parquet").filter(...).collect()
```

### DON'T: Ignore Memory Pressure

```python
# BAD: Loads entire file
df = pl.read_parquet("huge_file.parquet")

# GOOD: Stream in chunks
for batch in pl.scan_parquet("huge_file.parquet").iter_batches():
    process(batch)
```

---

## References

- [Polars User Guide](https://docs.pola.rs/)
- [Polars Migration Guide](https://docs.pola.rs/user-guide/migration/pandas/)
- [Apache Arrow Python](https://arrow.apache.org/docs/python/)
- [ClickHouse Python Client](https://clickhouse.com/docs/integrations/python)
- [PyTorch Data Loading](https://pytorch.org/tutorials/beginner/data_loading_tutorial.html)
- [Polars Preference Hook ADR](/docs/adr/2026-01-22-polars-preference-hook.md)

---

## Troubleshooting

| Issue                       | Cause                            | Solution                                             |
| --------------------------- | -------------------------------- | ---------------------------------------------------- |
| Memory spike during load    | Collecting too early             | Use lazy evaluation, call collect() only when needed |
| Arrow conversion fails      | Unsupported data type            | Check for object columns, convert to native types    |
| ClickHouse connection error | Wrong port or credentials        | Verify host:8123 (HTTP) or host:9000 (native)        |
| Zero-copy not working       | Intermediate pandas conversion   | Remove to_pandas() calls, stay in Arrow/Polars       |
| Polars hook blocking code   | Pandas used without exception    | Add `# polars-exception: reason` comment at file top |
| Slow group-by operations    | Using pandas for large datasets  | Migrate to Polars for 5-10x speedup                  |
| Schema validation failure   | Column names case-sensitive      | Verify exact column names from source                |
| PyTorch DataLoader OOM      | Loading full dataset into memory | Use PolarsDataset with Arrow backing for lazy access |
| Parquet scan performance    | Not using predicate pushdown     | Add filters before collect() for lazy evaluation     |
| Type mismatch in tensor     | Float64 vs Float32 mismatch      | Explicitly cast with .cast(pl.Float32) before numpy  |


## Post-Execution Reflection

After this skill completes, reflect before closing the task:

0. **Locate yourself.** — Find this SKILL.md's canonical path before editing.
1. **What failed?** — Fix the instruction that caused it.
2. **What worked better than expected?** — Promote to recommended practice.
3. **What drifted?** — Fix any script, reference, or dependency that no longer matches reality.
4. **Log it.** — Evolution-log entry with trigger, fix, and evidence.

Do NOT defer. The next invocation inherits whatever you leave behind.

Related Skills

skill-architecture

29
from terrylica/cc-skills

Create new skills, modify existing skills, and understand skill architecture. Use when users want to create a skill from scratch, learn YAML frontmatter standards, validate skill structure, understand progressive disclosure patterns, or choose between structural patterns (workflow, task, reference, capabilities, suite). Also use for troubleshooting skills that don't trigger correctly, optimizing skill descriptions, or learning best practices for writing effective skill instructions.

realtime-audio-architecture

29
from terrylica/cc-skills

Real-time audio playback patterns for macOS Apple Silicon. TRIGGERS - audio jitter, tts choppy, sounddevice, afplay jitter, audio architecture, playback glitch, GIL contention audio, launchd audio priority, wrong audio device, airpods, bluetooth audio, device switching.

voice-quality-audition

29
from terrylica/cc-skills

Audition Kokoro TTS voices to compare quality and grade. TRIGGERS - audition voices, kokoro voices, voice comparison, tts voice, voice quality, compare voices.

settings-and-tuning

29
from terrylica/cc-skills

Configure TTS voices, speed, timeouts, queue depth, and bot settings. TRIGGERS - configure tts, change voice, tts speed, queue depth, tts timeout, bot config, tune settings, adjust parameters.

full-stack-bootstrap

29
from terrylica/cc-skills

One-time bootstrap for Kokoro TTS engine, Telegram bot, and BotFather setup. TRIGGERS - setup tts, install kokoro, botfather, bootstrap tts-tg-sync, configure telegram bot, full stack setup.

diagnostic-issue-resolver

29
from terrylica/cc-skills

Diagnose and resolve TTS and Telegram bot issues. TRIGGERS - tts not working, bot not responding, kokoro error, audio not playing, lock stuck, telegram bot troubleshoot, diagnose issue.

component-version-upgrade

29
from terrylica/cc-skills

Upgrade Kokoro model, bot dependencies, or TTS components. TRIGGERS - upgrade kokoro, update model, upgrade bot, update dependencies, version bump, component update.

clean-component-removal

29
from terrylica/cc-skills

Remove TTS and Telegram sync components cleanly. TRIGGERS - uninstall tts, remove telegram bot, uninstall kokoro, clean tts, teardown, component removal.

send-message

29
from terrylica/cc-skills

Use when user wants to send a text message on Telegram as their personal account via MTProto, text someone, or message a contact by username, phone, or chat ID.

send-media

29
from terrylica/cc-skills

Use when user wants to send or upload a file, photo, video, voice note, or document on Telegram via their personal account.

search-messages

29
from terrylica/cc-skills

Use when user wants to search for messages across all Telegram chats or within a specific chat, find old messages by text, or look up Telegram message history filtered by sender.

pin-message

29
from terrylica/cc-skills

Use when user wants to pin or unpin a message in a Telegram chat, group, or channel, or manage pinned messages.