ML Data Pipeline Architecture
ML数据管道架构
Patterns for efficient ML data pipelines using Polars, Arrow, and ClickHouse.
ADR:
2026-01-22-polars-preference-hook (efficiency preferences framework)
Note: A PreToolUse hook enforces Polars preference. To use Pandas, add
# polars-exception: <reason>
at file top.
使用Polars、Arrow和ClickHouse构建高效ML数据管道的模式。
注意:PreToolUse钩子会强制优先使用Polars。若要使用Pandas,请在文件顶部添加
。
When to Use This Skill
何时使用该技能
Use this skill when:
- Deciding between Polars and Pandas for a data pipeline
- Optimizing memory usage with zero-copy Arrow patterns
- Loading data from ClickHouse into PyTorch DataLoaders
- Implementing lazy evaluation for large datasets
- Migrating existing Pandas code to Polars
在以下场景中使用该技能:
- 为数据管道选择Polars或Pandas时
- 使用零拷贝Arrow模式优化内存使用时
- 将ClickHouse中的数据加载到PyTorch DataLoaders时
- 为大型数据集实现延迟计算时
- 将现有Pandas代码迁移到Polars时
1. Decision Tree: Polars vs Pandas
1. 决策树:Polars vs Pandas
Dataset size?
├─ < 1M rows → Pandas OK (simpler API, richer ecosystem)
├─ 1M-10M rows → Consider Polars (2-5x faster, less memory)
└─ > 10M rows → Use Polars (required for memory efficiency)
Operations?
├─ Simple transforms → Either works
├─ Group-by aggregations → Polars 5-10x faster
├─ Complex joins → Polars with lazy evaluation
└─ Streaming/chunked → Polars scan_* functions
Integration?
├─ scikit-learn heavy → Pandas (better interop)
├─ PyTorch/custom → Polars + Arrow (zero-copy to tensor)
└─ ClickHouse source → Arrow stream → Polars (optimal)
数据集大小?
├─ < 100万行 → Pandas适用(API更简单,生态更丰富)
├─ 100万-1000万行 → 考虑使用Polars(速度快2-5倍,内存占用更少)
└─ > 1000万行 → 必须使用Polars(满足内存效率要求)
操作类型?
├─ 简单转换 → 两者均可
├─ 分组聚合 → Polars快5-10倍
├─ 复杂连接 → 使用带延迟计算的Polars
└─ 流式/分块处理 → Polars scan_*函数
集成场景?
├─ 重度使用scikit-learn → Pandas(互操作性更好)
├─ PyTorch/自定义场景 → Polars + Arrow(零拷贝到张量)
└─ 数据源为ClickHouse → Arrow流 → Polars(最优选择)
2. Zero-Copy Pipeline Architecture
2. 零拷贝管道架构
The Problem with Pandas
Pandas的问题
BAD: 3 memory copies
糟糕:3次内存拷贝
df = pd.read_sql(query, conn) # Copy 1: DB → pandas
X = df[features].values # Copy 2: pandas → numpy
tensor = torch.from_numpy(X) # Copy 3: numpy → tensor
df = pd.read_sql(query, conn) # 拷贝1:数据库 → pandas
X = df[features].values # 拷贝2:pandas → numpy
tensor = torch.from_numpy(X) # 拷贝3:numpy → tensor
Peak memory: 3x data size
峰值内存:数据大小的3倍
The Solution with Arrow
使用Arrow的解决方案
GOOD: 0-1 memory copies
优秀:0-1次内存拷贝
import clickhouse_connect
import polars as pl
import torch
client = clickhouse_connect.get_client(...)
arrow_table = client.query_arrow("SELECT * FROM bars") # Arrow in DB memory
df = pl.from_arrow(arrow_table) # Zero-copy view
X = df.select(features).to_numpy() # Single allocation
tensor = torch.from_numpy(X) # View (no copy)
import clickhouse_connect
import polars as pl
import torch
client = clickhouse_connect.get_client(...)
arrow_table = client.query_arrow("SELECT * FROM bars") # Arrow存储在数据库内存中
df = pl.from_arrow(arrow_table) # 零拷贝视图
X = df.select(features).to_numpy() # 单次内存分配
tensor = torch.from_numpy(X) # 视图(无拷贝)
Peak memory: 1.2x data size
峰值内存:数据大小的1.2倍
3. ClickHouse Integration Patterns
3. ClickHouse集成模式
Pattern A: Arrow Stream (Recommended)
模式A:Arrow流(推荐)
python
def query_arrow(client, query: str) -> pl.DataFrame:
"""ClickHouse → Arrow → Polars (zero-copy chain)."""
arrow_table = client.query_arrow(f"{query} FORMAT ArrowStream")
return pl.from_arrow(arrow_table)
python
def query_arrow(client, query: str) -> pl.DataFrame:
"""ClickHouse → Arrow → Polars(零拷贝链路)。"""
arrow_table = client.query_arrow(f"{query} FORMAT ArrowStream")
return pl.from_arrow(arrow_table)
df = query_arrow(client, "SELECT * FROM bars WHERE ts >= '2024-01-01'")
df = query_arrow(client, "SELECT * FROM bars WHERE ts >= '2024-01-01'")
Pattern B: Polars Native (Simpler)
模式B:Polars原生集成(更简单)
Polars has native ClickHouse support (see pola.rs for version requirements)
Polars原生支持ClickHouse(查看pola.rs了解版本要求)
df = pl.read_database_uri(
query="SELECT * FROM bars",
uri="clickhouse://user:pass@host/db"
)
df = pl.read_database_uri(
query="SELECT * FROM bars",
uri="clickhouse://user:pass@host/db"
)
Pattern C: Parquet Export (Batch Jobs)
模式C:Parquet导出(批处理任务)
For reproducible batch processing
用于可复现的批处理
client.query("SELECT * FROM bars INTO OUTFILE 'data.parquet' FORMAT Parquet")
df = pl.scan_parquet("data.parquet") # Lazy, memory-mapped
client.query("SELECT * FROM bars INTO OUTFILE 'data.parquet' FORMAT Parquet")
df = pl.scan_parquet("data.parquet") # 延迟加载,内存映射
4. PyTorch DataLoader Integration
4. PyTorch DataLoader集成
Minimal Change Pattern
最小改动模式
python
from torch.utils.data import TensorDataset, DataLoader
python
from torch.utils.data import TensorDataset, DataLoader
Accept both pandas and polars
同时支持pandas和polars
def prepare_data(df) -> tuple[torch.Tensor, torch.Tensor]:
if isinstance(df, pd.DataFrame):
df = pl.from_pandas(df)
X = df.select(features).to_numpy()
y = df.select(target).to_numpy()
return (
torch.from_numpy(X).float(),
torch.from_numpy(y).float()
)
X, y = prepare_data(df)
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, pin_memory=True)
def prepare_data(df) -> tuple[torch.Tensor, torch.Tensor]:
if isinstance(df, pd.DataFrame):
df = pl.from_pandas(df)
X = df.select(features).to_numpy()
y = df.select(target).to_numpy()
return (
torch.from_numpy(X).float(),
torch.from_numpy(y).float()
)
X, y = prepare_data(df)
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, pin_memory=True)
Custom PolarsDataset (Large Data)
自定义PolarsDataset(大数据场景)
python
class PolarsDataset(torch.utils.data.Dataset):
"""Memory-efficient dataset from Polars DataFrame."""
def __init__(self, df: pl.DataFrame, features: list[str], target: str):
self.arrow = df.to_arrow() # Arrow backing for zero-copy slicing
self.features = features
self.target = target
def __len__(self) -> int:
return self.arrow.num_rows
def __getitem__(self, idx: int) -> tuple[torch.Tensor, torch.Tensor]:
row = self.arrow.slice(idx, 1)
x = torch.tensor([row[f][0].as_py() for f in self.features], dtype=torch.float32)
y = torch.tensor(row[self.target][0].as_py(), dtype=torch.float32)
return x, y
python
class PolarsDataset(torch.utils.data.Dataset):
"""基于Polars DataFrame的内存高效数据集。"""
def __init__(self, df: pl.DataFrame, features: list[str], target: str):
self.arrow = df.to_arrow() # 基于Arrow实现零拷贝切片
self.features = features
self.target = target
def __len__(self) -> int:
return self.arrow.num_rows
def __getitem__(self, idx: int) -> tuple[torch.Tensor, torch.Tensor]:
row = self.arrow.slice(idx, 1)
x = torch.tensor([row[f][0].as_py() for f in self.features], dtype=torch.float32)
y = torch.tensor(row[self.target][0].as_py(), dtype=torch.float32)
return x, y
5. Lazy Evaluation Patterns
5. 延迟计算模式
Define transformations lazily (no computation yet)
延迟定义转换(尚未执行计算)
pipeline = (
pl.scan_parquet("raw_data.parquet")
.filter(pl.col("timestamp") >= start_date)
.with_columns([
(pl.col("close").pct_change()).alias("returns"),
(pl.col("volume").log()).alias("log_volume"),
])
.select(features + [target])
)
pipeline = (
pl.scan_parquet("raw_data.parquet")
.filter(pl.col("timestamp") >= start_date)
.with_columns([
(pl.col("close").pct_change()).alias("returns"),
(pl.col("volume").log()).alias("log_volume"),
])
.select(features + [target])
)
Execute only when needed
仅在需要时执行
train_df = pipeline.filter(pl.col("timestamp") < split_date).collect()
test_df = pipeline.filter(pl.col("timestamp") >= split_date).collect()
train_df = pipeline.filter(pl.col("timestamp") < split_date).collect()
test_df = pipeline.filter(pl.col("timestamp") >= split_date).collect()
Streaming for Large Files
大文件流式处理
Process file in chunks (never loads full file)
分块处理文件(不会加载整个文件)
def process_large_file(path: str, chunk_size: int = 100_000):
reader = pl.scan_parquet(path)
for batch in reader.iter_batches(n_rows=chunk_size):
# Process each chunk
features = compute_features(batch)
yield features.to_numpy()
def process_large_file(path: str, chunk_size: int = 100_000):
reader = pl.scan_parquet(path)
for batch in reader.iter_batches(n_rows=chunk_size):
# 处理每个分块
features = compute_features(batch)
yield features.to_numpy()
6. Schema Validation
6. Schema验证
Pydantic for Config
使用Pydantic进行配置
python
from pydantic import BaseModel, field_validator
class FeatureConfig(BaseModel):
features: list[str]
target: str
seq_len: int = 15
@field_validator("features")
@classmethod
def validate_features(cls, v):
required = {"returns_vs", "momentum_z", "atr_pct"}
missing = required - set(v)
if missing:
raise ValueError(f"Missing required features: {missing}")
return v
python
from pydantic import BaseModel, field_validator
class FeatureConfig(BaseModel):
features: list[str]
target: str
seq_len: int = 15
@field_validator("features")
@classmethod
def validate_features(cls, v):
required = {"returns_vs", "momentum_z", "atr_pct"}
missing = required - set(v)
if missing:
raise ValueError(f"缺少必需特征:{missing}")
return v
DataFrame Schema Validation
DataFrame Schema验证
python
def validate_schema(df: pl.DataFrame, required: list[str], stage: str) -> None:
"""Fail-fast schema validation."""
missing = [c for c in required if c not in df.columns]
if missing:
raise ValueError(
f"[{stage}] Missing columns: {missing}\n"
f"Available: {sorted(df.columns)}"
)
python
def validate_schema(df: pl.DataFrame, required: list[str], stage: str) -> None:
"""快速失败的Schema验证。"""
missing = [c for c in required if c not in df.columns]
if missing:
raise ValueError(
f"[{stage}] 缺少列:{missing}\n"
f"可用列:{sorted(df.columns)}"
)
7. Performance Benchmarks
7. 性能基准测试
| Operation | Pandas | Polars | Speedup |
|---|
| Read CSV (1GB) | 45s | 4s | 11x |
| Filter rows | 2.1s | 0.4s | 5x |
| Group-by agg | 3.8s | 0.3s | 13x |
| Sort | 5.2s | 0.4s | 13x |
| Memory peak | 10GB | 2.5GB | 4x |
Benchmark: 50M rows, 20 columns, MacBook M2
| 操作类型 | Pandas | Polars | 速度提升倍数 |
|---|
| 读取1GB CSV文件 | 45s | 4s | 11x |
| 行过滤 | 2.1s | 0.4s | 5x |
| 分组聚合 | 3.8s | 0.3s | 13x |
| 排序 | 5.2s | 0.4s | 13x |
| 峰值内存占用 | 10GB | 2.5GB | 4x |
基准测试环境:5000万行,20列,MacBook M2
8. Migration Checklist
8. 迁移检查清单
Phase 1: Add Arrow Support
阶段1:添加Arrow支持
Phase 2: Polars at Entry Points
阶段2:在入口处使用Polars
Phase 3: Full Lazy Evaluation
阶段3:完全延迟计算
9. Anti-Patterns to Avoid
9. 需要避免的反模式
DON'T: Mix APIs Unnecessarily
不要:不必要地混合API
BAD: Convert back and forth
糟糕:来回转换
df_polars = pl.from_pandas(df_pandas)
df_pandas_again = df_polars.to_pandas() # Why?
df_polars = pl.from_pandas(df_pandas)
df_pandas_again = df_polars.to_pandas() # 何必多此一举?
DON'T: Collect Too Early
不要:过早调用collect
BAD: Defeats lazy evaluation
糟糕:违背延迟计算的初衷
df = pl.scan_parquet("data.parquet").collect() # Full load
filtered = df.filter(...) # After the fact
df = pl.scan_parquet("data.parquet").collect() # 加载全部数据
filtered = df.filter(...) # 事后过滤
GOOD: Filter before collect
优秀:先过滤再collect
df = pl.scan_parquet("data.parquet").filter(...).collect()
df = pl.scan_parquet("data.parquet").filter(...).collect()
DON'T: Ignore Memory Pressure
不要:忽略内存压力
BAD: Loads entire file
糟糕:加载整个文件
df = pl.read_parquet("huge_file.parquet")
df = pl.read_parquet("huge_file.parquet")
GOOD: Stream in chunks
优秀:分块流式处理
for batch in pl.scan_parquet("huge_file.parquet").iter_batches():
process(batch)
for batch in pl.scan_parquet("huge_file.parquet").iter_batches():
process(batch)
| Issue | Cause | Solution |
|---|
| Memory spike during load | Collecting too early | Use lazy evaluation, call collect() only when needed |
| Arrow conversion fails | Unsupported data type | Check for object columns, convert to native types |
| ClickHouse connection error | Wrong port or credentials | Verify host:8123 (HTTP) or host:9000 (native) |
| Zero-copy not working | Intermediate pandas conversion | Remove to_pandas() calls, stay in Arrow/Polars |
| Polars hook blocking code | Pandas used without exception | Add # polars-exception: reason
comment at file top |
| Slow group-by operations | Using pandas for large datasets | Migrate to Polars for 5-10x speedup |
| Schema validation failure | Column names case-sensitive | Verify exact column names from source |
| PyTorch DataLoader OOM | Loading full dataset into memory | Use PolarsDataset with Arrow backing for lazy access |
| Parquet scan performance | Not using predicate pushdown | Add filters before collect() for lazy evaluation |
| Type mismatch in tensor | Float64 vs Float32 mismatch | Explicitly cast with .cast(pl.Float32) before numpy |
| 问题描述 | 原因 | 解决方案 |
|---|
| 加载时内存突增 | 过早调用collect | 使用延迟计算,仅在需要时调用collect() |
| Arrow转换失败 | 不支持的数据类型 | 检查是否存在object类型列,转换为原生类型 |
| ClickHouse连接错误 | 端口或凭证错误 | 验证主机端口:8123(HTTP)或9000(原生协议) |
| 零拷贝功能不生效 | 中间进行了Pandas转换 | 移除to_pandas()调用,全程使用Arrow/Polars |
| Polars钩子阻止代码运行 | 使用Pandas未添加例外声明 | 在文件顶部添加注释 |
| 分组聚合操作缓慢 | 对大数据集使用Pandas | 迁移到Polars以获得5-10倍的速度提升 |
| Schema验证失败 | 列名区分大小写 | 验证与数据源完全一致的列名 |
| PyTorch DataLoader内存不足 | 将整个数据集加载到内存中 | 使用基于Arrow的PolarsDataset实现延迟访问 |
| Parquet扫描性能差 | 未使用谓词下推 | 在collect()之前添加过滤条件以利用延迟计算 |
| 张量类型不匹配 | Float64与Float32不匹配 | 在转换为numpy前使用.cast(pl.Float32)显式转换 |