ohlcv-processing
Market data preparation including OHLCV resampling, gap handling, anomaly detection, normalization, and multi-source merging
Best use case
ohlcv-processing is best used when you need a repeatable AI agent workflow instead of a one-off prompt.
Market data preparation including OHLCV resampling, gap handling, anomaly detection, normalization, and multi-source merging
Teams using ohlcv-processing should expect a more consistent output, faster repeated execution, less prompt rewriting.
When to use this skill
- You want a reusable workflow that can be run more than once with consistent structure.
When not to use this skill
- You only need a quick one-off answer and do not need a reusable workflow.
- You cannot install or maintain the underlying files, dependencies, or repository context.
Installation
Claude Code / Cursor / Codex
Manual Installation
- Download SKILL.md from GitHub
- Place it in
.claude/skills/ohlcv-processing/SKILL.mdinside your project - Restart your AI agent — it will auto-discover the skill
How ohlcv-processing Compares
| Feature / Agent | ohlcv-processing | Standard Approach |
|---|---|---|
| Platform Support | Not specified | Limited / Varies |
| Context Awareness | High | Baseline |
| Installation Complexity | Unknown | N/A |
Frequently Asked Questions
What does this skill do?
Market data preparation including OHLCV resampling, gap handling, anomaly detection, normalization, and multi-source merging
Where can I find the source code?
You can find the source code on GitHub using the link provided at the top of the page.
SKILL.md Source
# OHLCV Processing — Market Data Preparation
Clean, consistent OHLCV data is the foundation of every trading analysis. Garbage in, garbage out — a single anomalous candle can trigger false signals, corrupt indicator calculations, and produce misleading backtest results. This skill covers the full data preparation pipeline: validation, cleaning, resampling, normalization, and multi-source merging.
**Why this matters**: Crypto OHLCV data is messier than traditional markets. 24/7 trading means no official close, DEX aggregators disagree on prices, low-liquidity tokens produce impossible candles, and API outages create gaps. Every analysis workflow should start with this pipeline.
## Quick Start
### 1. Install Dependencies
```bash
uv pip install pandas numpy httpx
```
### 2. Standard OHLCV DataFrame Format
All processing functions expect this canonical format:
```python
import pandas as pd
# Canonical OHLCV DataFrame
# - DatetimeIndex in UTC
# - Columns: open, high, low, close, volume (lowercase)
# - Sorted ascending by timestamp
# - No duplicate timestamps
df = pd.DataFrame({
"open": [1.10, 1.12, 1.11],
"high": [1.15, 1.14, 1.13],
"low": [1.08, 1.10, 1.09],
"close": [1.12, 1.11, 1.12],
"volume": [50000, 48000, 52000],
}, index=pd.to_datetime([
"2025-01-01 00:00:00",
"2025-01-01 00:01:00",
"2025-01-01 00:02:00",
], utc=True))
df.index.name = "timestamp"
```
### 3. Full Processing Pipeline
```python
import pandas as pd
import numpy as np
def process_ohlcv(df: pd.DataFrame) -> pd.DataFrame:
"""Run complete OHLCV processing pipeline."""
df = standardize_columns(df)
df = validate_ohlcv(df)
df = handle_gaps(df, method="ffill")
df = detect_and_flag_anomalies(df)
return df
```
## Data Validation
### Column Checks
```python
REQUIRED_COLUMNS = {"open", "high", "low", "close", "volume"}
def standardize_columns(df: pd.DataFrame) -> pd.DataFrame:
"""Normalize column names to lowercase standard."""
df.columns = df.columns.str.lower().str.strip()
# Common renames
rename_map = {"vol": "volume", "v": "volume", "o": "open",
"h": "high", "l": "low", "c": "close"}
df = df.rename(columns=rename_map)
missing = REQUIRED_COLUMNS - set(df.columns)
if missing:
raise ValueError(f"Missing columns: {missing}")
return df[["open", "high", "low", "close", "volume"]]
```
### Structural Validation
```python
def validate_ohlcv(df: pd.DataFrame) -> pd.DataFrame:
"""Validate OHLCV structural integrity."""
# Ensure DatetimeIndex in UTC
if not isinstance(df.index, pd.DatetimeIndex):
df.index = pd.to_datetime(df.index, utc=True)
if df.index.tz is None:
df.index = df.index.tz_localize("UTC")
# Sort and deduplicate
df = df.sort_index()
dupes = df.index.duplicated(keep="last")
if dupes.any():
print(f"Warning: Removed {dupes.sum()} duplicate timestamps")
df = df[~dupes]
# Type enforcement
for col in ["open", "high", "low", "close", "volume"]:
df[col] = pd.to_numeric(df[col], errors="coerce")
return df
```
### Impossible Candle Detection
```python
def find_impossible_candles(df: pd.DataFrame) -> pd.DataFrame:
"""Find candles that violate OHLC constraints."""
issues = pd.DataFrame(index=df.index)
issues["high_lt_low"] = df["high"] < df["low"]
issues["high_lt_open"] = df["high"] < df["open"]
issues["high_lt_close"] = df["high"] < df["close"]
issues["low_gt_open"] = df["low"] > df["open"]
issues["low_gt_close"] = df["low"] > df["close"]
issues["negative_price"] = (df[["open", "high", "low", "close"]] < 0).any(axis=1)
issues["negative_volume"] = df["volume"] < 0
issues["any_issue"] = issues.any(axis=1)
return issues[issues["any_issue"]]
```
## Gap Handling
Crypto trades 24/7, but gaps still occur from API outages, low liquidity, or aggregator downtime.
### Detect Gaps
```python
def detect_gaps(df: pd.DataFrame, expected_freq: str = "1min") -> pd.Series:
"""Find missing timestamps based on expected frequency."""
full_index = pd.date_range(
start=df.index.min(), end=df.index.max(), freq=expected_freq, tz="UTC"
)
missing = full_index.difference(df.index)
return missing
```
### Fill Gaps
```python
def handle_gaps(
df: pd.DataFrame,
freq: str = "1min",
method: str = "ffill",
max_gap: int = 5,
) -> pd.DataFrame:
"""Fill gaps in OHLCV data.
Args:
df: OHLCV DataFrame with DatetimeIndex.
freq: Expected bar frequency.
method: 'ffill' (forward fill) or 'interpolate'.
max_gap: Maximum consecutive bars to fill. Larger gaps are left as NaN.
"""
full_index = pd.date_range(
start=df.index.min(), end=df.index.max(), freq=freq, tz="UTC"
)
df = df.reindex(full_index)
df.index.name = "timestamp"
# Mark which bars were filled
df["is_filled"] = df["close"].isna()
if method == "ffill":
# Forward fill OHLC (flat candle), zero volume
df[["open", "high", "low", "close"]] = (
df[["open", "high", "low", "close"]].ffill(limit=max_gap)
)
df["volume"] = df["volume"].fillna(0)
elif method == "interpolate":
df[["open", "high", "low", "close"]] = (
df[["open", "high", "low", "close"]].interpolate(
method="time", limit=max_gap
)
)
df["volume"] = df["volume"].fillna(0)
return df
```
## Anomaly Detection
See `references/data_quality.md` for the complete anomaly taxonomy.
### Price Spike Detection
```python
def detect_price_spikes(
df: pd.DataFrame, window: int = 20, threshold: float = 3.0
) -> pd.Series:
"""Flag bars where return exceeds threshold * rolling std."""
returns = df["close"].pct_change()
rolling_std = returns.rolling(window, min_periods=5).std()
spike = returns.abs() > (threshold * rolling_std)
return spike.fillna(False)
```
### Zero Volume Detection
```python
def detect_zero_volume(df: pd.DataFrame, min_volume: float = 0) -> pd.Series:
"""Flag bars with zero or below-minimum volume."""
return df["volume"] <= min_volume
```
### Composite Anomaly Flagging
```python
def flag_anomalies(df: pd.DataFrame) -> pd.DataFrame:
"""Add anomaly flag columns to DataFrame."""
df["anomaly_spike"] = detect_price_spikes(df)
df["anomaly_zero_vol"] = detect_zero_volume(df)
impossible = find_impossible_candles(df)
df["anomaly_impossible"] = False
if not impossible.empty:
df.loc[impossible.index, "anomaly_impossible"] = True
df["anomaly_any"] = (
df["anomaly_spike"] | df["anomaly_zero_vol"] | df["anomaly_impossible"]
)
return df
```
## Resampling
See `references/resampling_guide.md` for detailed guidance.
### Standard Resample
```python
OHLCV_RESAMPLE_RULES = {
"open": "first",
"high": "max",
"low": "min",
"close": "last",
"volume": "sum",
}
def resample_ohlcv(df: pd.DataFrame, target_freq: str) -> pd.DataFrame:
"""Resample OHLCV to a coarser timeframe.
Args:
df: OHLCV DataFrame (must be finer than target_freq).
target_freq: Pandas frequency string ('5min', '15min', '1h', '4h', '1D').
Returns:
Resampled OHLCV DataFrame with no NaN rows.
"""
ohlcv_cols = ["open", "high", "low", "close", "volume"]
resampled = df[ohlcv_cols].resample(target_freq).agg(OHLCV_RESAMPLE_RULES)
return resampled.dropna(subset=["close"])
```
### Common Timeframe Ladder
```python
TIMEFRAME_LADDER = ["1min", "5min", "15min", "1h", "4h", "1D"]
def resample_ladder(df: pd.DataFrame) -> dict[str, pd.DataFrame]:
"""Resample 1-minute data to all standard timeframes."""
results = {"1min": df.copy()}
for tf in TIMEFRAME_LADDER[1:]:
results[tf] = resample_ohlcv(df, tf)
return results
```
### VWAP Calculation
```python
def compute_vwap(df: pd.DataFrame) -> pd.Series:
"""Compute cumulative VWAP over the DataFrame."""
typical_price = (df["high"] + df["low"] + df["close"]) / 3
cum_vol = df["volume"].cumsum()
cum_tp_vol = (typical_price * df["volume"]).cumsum()
return cum_tp_vol / cum_vol
```
## Normalization
```python
def normalize_prices(
df: pd.DataFrame, method: str = "returns"
) -> pd.DataFrame:
"""Normalize OHLCV price columns.
Methods:
'returns' — Percentage returns (close-to-close).
'log_returns' — Log returns.
'minmax' — Min-max scale to [0, 1].
'zscore' — Z-score normalization.
"""
price_cols = ["open", "high", "low", "close"]
result = df.copy()
if method == "returns":
for col in price_cols:
result[f"{col}_ret"] = result[col].pct_change()
elif method == "log_returns":
for col in price_cols:
result[f"{col}_logret"] = np.log(result[col] / result[col].shift(1))
elif method == "minmax":
for col in price_cols:
cmin, cmax = result[col].min(), result[col].max()
result[f"{col}_norm"] = (result[col] - cmin) / (cmax - cmin)
elif method == "zscore":
for col in price_cols:
result[f"{col}_z"] = (
(result[col] - result[col].mean()) / result[col].std()
)
return result
```
## Multi-Source Merging
When combining data from multiple sources (e.g., Birdeye + DexScreener), timestamps may not align and prices may differ due to different DEX aggregation.
```python
def merge_ohlcv_sources(
primary: pd.DataFrame,
secondary: pd.DataFrame,
tolerance: str = "30s",
) -> pd.DataFrame:
"""Merge two OHLCV sources, preferring the higher-volume source per bar.
Args:
primary: First OHLCV source.
secondary: Second OHLCV source.
tolerance: Maximum time difference for alignment.
"""
merged = pd.merge_asof(
primary.sort_index(),
secondary.sort_index(),
left_index=True, right_index=True,
tolerance=pd.Timedelta(tolerance),
suffixes=("_pri", "_sec"),
)
# Use higher-volume source per bar
use_secondary = merged["volume_sec"] > merged["volume_pri"]
for col in ["open", "high", "low", "close", "volume"]:
merged[col] = np.where(
use_secondary, merged[f"{col}_sec"], merged[f"{col}_pri"]
)
merged["source"] = np.where(use_secondary, "secondary", "primary")
return merged[["open", "high", "low", "close", "volume", "source"]]
```
## Timezone Handling
**Standard**: Always store and process in UTC. Convert only for display.
```python
def ensure_utc(df: pd.DataFrame) -> pd.DataFrame:
"""Ensure DatetimeIndex is UTC."""
if df.index.tz is None:
df.index = df.index.tz_localize("UTC")
elif str(df.index.tz) != "UTC":
df.index = df.index.tz_convert("UTC")
return df
```
## Data Quality Report
```python
def quality_report(df: pd.DataFrame) -> dict:
"""Generate a data quality summary."""
total = len(df)
return {
"total_bars": total,
"date_range": f"{df.index.min()} → {df.index.max()}",
"missing_values": int(df[["open", "high", "low", "close"]].isna().sum().sum()),
"zero_volume_bars": int((df["volume"] == 0).sum()),
"impossible_candles": int((df["high"] < df["low"]).sum()),
"duplicate_timestamps": int(df.index.duplicated().sum()),
"negative_prices": int((df[["open", "high", "low", "close"]] < 0).any(axis=1).sum()),
"completeness_pct": round((1 - df["close"].isna().mean()) * 100, 2),
}
```
## Files
### References
- `references/data_quality.md` — Anomaly types, detection methods, correction strategies, crypto-specific data issues
- `references/resampling_guide.md` — Resample rules, timeframe use cases, partial bar handling, VWAP resampling, multi-timeframe alignment
### Scripts
- `scripts/process_ohlcv.py` — Full processing pipeline: validate, clean, resample, normalize with anomaly reporting (run with `--demo` for synthetic data)
- `scripts/merge_sources.py` — Multi-source OHLCV merging with conflict resolution and discrepancy reporting (run with `--demo`)Related Skills
yield-analysis
DeFi yield evaluation including fee APR, real vs nominal yield, net APY after costs, and yield sustainability analysis
yellowstone-grpc
Real-time Solana transaction and account streaming via Yellowstone gRPC (Geyser plugin)
whale-tracking
Large wallet monitoring, accumulation and distribution detection, and smart money signal generation for Solana tokens
wash-sale-detection
Wash sale detection under 2025 US crypto rules with 61-day window monitoring, disallowed loss tracking, and safe re-entry countdown
wallet-profiling
Behavioral classification, performance analysis, and trading style detection for Solana wallets
walk-forward-validation
Walk-forward validation framework for trading strategies and ML models with time-series-aware splits, overfit detection, and regime-aware validation
volatility-modeling
Volatility estimation, forecasting, and regime classification using GARCH, EWMA, realized volatility, and volatility cones
vectorbt
High-performance vectorized backtesting with parameter optimization, portfolio simulation, and rich performance metrics
trading-visualization
Professional trading charts including candlesticks, equity curves, drawdowns, correlation heatmaps, and return distributions
trade-journal
Structured trade logging, performance review, behavioral pattern detection, and strategy attribution for systematic improvement
trade-accounting
Double-entry bookkeeping for trading operations with ledger management, P&L statements, balance sheets, and cash flow reporting
token-holder-analysis
Token holder distribution, concentration metrics, insider detection, and supply analysis for Solana tokens