ml-data-pipeline-architecture
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseML Data Pipeline Architecture
ML数据管道架构
Patterns for efficient ML data pipelines using Polars, Arrow, and ClickHouse.
ADR: 2026-01-22-polars-preference-hook (efficiency preferences framework)
Note: A PreToolUse hook enforces Polars preference. To use Pandas, addat file top.# polars-exception: <reason>
使用Polars、Arrow和ClickHouse构建高效ML数据管道的模式。
ADR: 2026-01-22-polars-preference-hook(效率偏好框架)
注意:PreToolUse钩子会强制优先使用Polars。若要使用Pandas,请在文件顶部添加。# polars-exception: <原因>
When to Use This Skill
何时使用该技能
Use this skill when:
- Deciding between Polars and Pandas for a data pipeline
- Optimizing memory usage with zero-copy Arrow patterns
- Loading data from ClickHouse into PyTorch DataLoaders
- Implementing lazy evaluation for large datasets
- Migrating existing Pandas code to Polars
在以下场景中使用该技能:
- 为数据管道选择Polars或Pandas时
- 使用零拷贝Arrow模式优化内存使用时
- 将ClickHouse中的数据加载到PyTorch DataLoaders时
- 为大型数据集实现延迟计算时
- 将现有Pandas代码迁移到Polars时
1. Decision Tree: Polars vs Pandas
1. 决策树:Polars vs Pandas
Dataset size?
├─ < 1M rows → Pandas OK (simpler API, richer ecosystem)
├─ 1M-10M rows → Consider Polars (2-5x faster, less memory)
└─ > 10M rows → Use Polars (required for memory efficiency)
Operations?
├─ Simple transforms → Either works
├─ Group-by aggregations → Polars 5-10x faster
├─ Complex joins → Polars with lazy evaluation
└─ Streaming/chunked → Polars scan_* functions
Integration?
├─ scikit-learn heavy → Pandas (better interop)
├─ PyTorch/custom → Polars + Arrow (zero-copy to tensor)
└─ ClickHouse source → Arrow stream → Polars (optimal)数据集大小?
├─ < 100万行 → Pandas适用(API更简单,生态更丰富)
├─ 100万-1000万行 → 考虑使用Polars(速度快2-5倍,内存占用更少)
└─ > 1000万行 → 必须使用Polars(满足内存效率要求)
操作类型?
├─ 简单转换 → 两者均可
├─ 分组聚合 → Polars快5-10倍
├─ 复杂连接 → 使用带延迟计算的Polars
└─ 流式/分块处理 → Polars scan_*函数
集成场景?
├─ 重度使用scikit-learn → Pandas(互操作性更好)
├─ PyTorch/自定义场景 → Polars + Arrow(零拷贝到张量)
└─ 数据源为ClickHouse → Arrow流 → Polars(最优选择)2. Zero-Copy Pipeline Architecture
2. 零拷贝管道架构
The Problem with Pandas
Pandas的问题
python
undefinedpython
undefinedBAD: 3 memory copies
糟糕:3次内存拷贝
df = pd.read_sql(query, conn) # Copy 1: DB → pandas
X = df[features].values # Copy 2: pandas → numpy
tensor = torch.from_numpy(X) # Copy 3: numpy → tensor
df = pd.read_sql(query, conn) # 拷贝1:数据库 → pandas
X = df[features].values # 拷贝2:pandas → numpy
tensor = torch.from_numpy(X) # 拷贝3:numpy → tensor
Peak memory: 3x data size
峰值内存:数据大小的3倍
undefinedundefinedThe Solution with Arrow
使用Arrow的解决方案
python
undefinedpython
undefinedGOOD: 0-1 memory copies
优秀:0-1次内存拷贝
import clickhouse_connect
import polars as pl
import torch
client = clickhouse_connect.get_client(...)
arrow_table = client.query_arrow("SELECT * FROM bars") # Arrow in DB memory
df = pl.from_arrow(arrow_table) # Zero-copy view
X = df.select(features).to_numpy() # Single allocation
tensor = torch.from_numpy(X) # View (no copy)
import clickhouse_connect
import polars as pl
import torch
client = clickhouse_connect.get_client(...)
arrow_table = client.query_arrow("SELECT * FROM bars") # Arrow存储在数据库内存中
df = pl.from_arrow(arrow_table) # 零拷贝视图
X = df.select(features).to_numpy() # 单次内存分配
tensor = torch.from_numpy(X) # 视图(无拷贝)
Peak memory: 1.2x data size
峰值内存:数据大小的1.2倍
---
---3. ClickHouse Integration Patterns
3. ClickHouse集成模式
Pattern A: Arrow Stream (Recommended)
模式A:Arrow流(推荐)
python
def query_arrow(client, query: str) -> pl.DataFrame:
"""ClickHouse → Arrow → Polars (zero-copy chain)."""
arrow_table = client.query_arrow(f"{query} FORMAT ArrowStream")
return pl.from_arrow(arrow_table)python
def query_arrow(client, query: str) -> pl.DataFrame:
"""ClickHouse → Arrow → Polars(零拷贝链路)。"""
arrow_table = client.query_arrow(f"{query} FORMAT ArrowStream")
return pl.from_arrow(arrow_table)Usage
使用示例
df = query_arrow(client, "SELECT * FROM bars WHERE ts >= '2024-01-01'")
undefineddf = query_arrow(client, "SELECT * FROM bars WHERE ts >= '2024-01-01'")
undefinedPattern B: Polars Native (Simpler)
模式B:Polars原生集成(更简单)
python
undefinedpython
undefinedPolars has native ClickHouse support (see pola.rs for version requirements)
Polars原生支持ClickHouse(查看pola.rs了解版本要求)
df = pl.read_database_uri(
query="SELECT * FROM bars",
uri="clickhouse://user:pass@host/db"
)
undefineddf = pl.read_database_uri(
query="SELECT * FROM bars",
uri="clickhouse://user:pass@host/db"
)
undefinedPattern C: Parquet Export (Batch Jobs)
模式C:Parquet导出(批处理任务)
python
undefinedpython
undefinedFor reproducible batch processing
用于可复现的批处理
client.query("SELECT * FROM bars INTO OUTFILE 'data.parquet' FORMAT Parquet")
df = pl.scan_parquet("data.parquet") # Lazy, memory-mapped
---client.query("SELECT * FROM bars INTO OUTFILE 'data.parquet' FORMAT Parquet")
df = pl.scan_parquet("data.parquet") # 延迟加载,内存映射
---4. PyTorch DataLoader Integration
4. PyTorch DataLoader集成
Minimal Change Pattern
最小改动模式
python
from torch.utils.data import TensorDataset, DataLoaderpython
from torch.utils.data import TensorDataset, DataLoaderAccept both pandas and polars
同时支持pandas和polars
def prepare_data(df) -> tuple[torch.Tensor, torch.Tensor]:
if isinstance(df, pd.DataFrame):
df = pl.from_pandas(df)
X = df.select(features).to_numpy()
y = df.select(target).to_numpy()
return (
torch.from_numpy(X).float(),
torch.from_numpy(y).float()
)X, y = prepare_data(df)
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, pin_memory=True)
undefineddef prepare_data(df) -> tuple[torch.Tensor, torch.Tensor]:
if isinstance(df, pd.DataFrame):
df = pl.from_pandas(df)
X = df.select(features).to_numpy()
y = df.select(target).to_numpy()
return (
torch.from_numpy(X).float(),
torch.from_numpy(y).float()
)X, y = prepare_data(df)
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, pin_memory=True)
undefinedCustom PolarsDataset (Large Data)
自定义PolarsDataset(大数据场景)
python
class PolarsDataset(torch.utils.data.Dataset):
"""Memory-efficient dataset from Polars DataFrame."""
def __init__(self, df: pl.DataFrame, features: list[str], target: str):
self.arrow = df.to_arrow() # Arrow backing for zero-copy slicing
self.features = features
self.target = target
def __len__(self) -> int:
return self.arrow.num_rows
def __getitem__(self, idx: int) -> tuple[torch.Tensor, torch.Tensor]:
row = self.arrow.slice(idx, 1)
x = torch.tensor([row[f][0].as_py() for f in self.features], dtype=torch.float32)
y = torch.tensor(row[self.target][0].as_py(), dtype=torch.float32)
return x, ypython
class PolarsDataset(torch.utils.data.Dataset):
"""基于Polars DataFrame的内存高效数据集。"""
def __init__(self, df: pl.DataFrame, features: list[str], target: str):
self.arrow = df.to_arrow() # 基于Arrow实现零拷贝切片
self.features = features
self.target = target
def __len__(self) -> int:
return self.arrow.num_rows
def __getitem__(self, idx: int) -> tuple[torch.Tensor, torch.Tensor]:
row = self.arrow.slice(idx, 1)
x = torch.tensor([row[f][0].as_py() for f in self.features], dtype=torch.float32)
y = torch.tensor(row[self.target][0].as_py(), dtype=torch.float32)
return x, y5. Lazy Evaluation Patterns
5. 延迟计算模式
Pipeline Composition
管道组合
python
undefinedpython
undefinedDefine transformations lazily (no computation yet)
延迟定义转换(尚未执行计算)
pipeline = (
pl.scan_parquet("raw_data.parquet")
.filter(pl.col("timestamp") >= start_date)
.with_columns([
(pl.col("close").pct_change()).alias("returns"),
(pl.col("volume").log()).alias("log_volume"),
])
.select(features + [target])
)
pipeline = (
pl.scan_parquet("raw_data.parquet")
.filter(pl.col("timestamp") >= start_date)
.with_columns([
(pl.col("close").pct_change()).alias("returns"),
(pl.col("volume").log()).alias("log_volume"),
])
.select(features + [target])
)
Execute only when needed
仅在需要时执行
train_df = pipeline.filter(pl.col("timestamp") < split_date).collect()
test_df = pipeline.filter(pl.col("timestamp") >= split_date).collect()
undefinedtrain_df = pipeline.filter(pl.col("timestamp") < split_date).collect()
test_df = pipeline.filter(pl.col("timestamp") >= split_date).collect()
undefinedStreaming for Large Files
大文件流式处理
python
undefinedpython
undefinedProcess file in chunks (never loads full file)
分块处理文件(不会加载整个文件)
def process_large_file(path: str, chunk_size: int = 100_000):
reader = pl.scan_parquet(path)
for batch in reader.iter_batches(n_rows=chunk_size):
# Process each chunk
features = compute_features(batch)
yield features.to_numpy()
---def process_large_file(path: str, chunk_size: int = 100_000):
reader = pl.scan_parquet(path)
for batch in reader.iter_batches(n_rows=chunk_size):
# 处理每个分块
features = compute_features(batch)
yield features.to_numpy()
---6. Schema Validation
6. Schema验证
Pydantic for Config
使用Pydantic进行配置
python
from pydantic import BaseModel, field_validator
class FeatureConfig(BaseModel):
features: list[str]
target: str
seq_len: int = 15
@field_validator("features")
@classmethod
def validate_features(cls, v):
required = {"returns_vs", "momentum_z", "atr_pct"}
missing = required - set(v)
if missing:
raise ValueError(f"Missing required features: {missing}")
return vpython
from pydantic import BaseModel, field_validator
class FeatureConfig(BaseModel):
features: list[str]
target: str
seq_len: int = 15
@field_validator("features")
@classmethod
def validate_features(cls, v):
required = {"returns_vs", "momentum_z", "atr_pct"}
missing = required - set(v)
if missing:
raise ValueError(f"缺少必需特征:{missing}")
return vDataFrame Schema Validation
DataFrame Schema验证
python
def validate_schema(df: pl.DataFrame, required: list[str], stage: str) -> None:
"""Fail-fast schema validation."""
missing = [c for c in required if c not in df.columns]
if missing:
raise ValueError(
f"[{stage}] Missing columns: {missing}\n"
f"Available: {sorted(df.columns)}"
)python
def validate_schema(df: pl.DataFrame, required: list[str], stage: str) -> None:
"""快速失败的Schema验证。"""
missing = [c for c in required if c not in df.columns]
if missing:
raise ValueError(
f"[{stage}] 缺少列:{missing}\n"
f"可用列:{sorted(df.columns)}"
)7. Performance Benchmarks
7. 性能基准测试
| Operation | Pandas | Polars | Speedup |
|---|---|---|---|
| Read CSV (1GB) | 45s | 4s | 11x |
| Filter rows | 2.1s | 0.4s | 5x |
| Group-by agg | 3.8s | 0.3s | 13x |
| Sort | 5.2s | 0.4s | 13x |
| Memory peak | 10GB | 2.5GB | 4x |
Benchmark: 50M rows, 20 columns, MacBook M2
| 操作类型 | Pandas | Polars | 速度提升倍数 |
|---|---|---|---|
| 读取1GB CSV文件 | 45s | 4s | 11x |
| 行过滤 | 2.1s | 0.4s | 5x |
| 分组聚合 | 3.8s | 0.3s | 13x |
| 排序 | 5.2s | 0.4s | 13x |
| 峰值内存占用 | 10GB | 2.5GB | 4x |
基准测试环境:5000万行,20列,MacBook M2
8. Migration Checklist
8. 迁移检查清单
Phase 1: Add Arrow Support
阶段1:添加Arrow支持
- Add to dependencies (see PyPI)
polars = "<version>" - Implement in data client
query_arrow() - Verify zero-copy with memory profiler
- 在依赖中添加(查看PyPI)
polars = "<version>" - 在数据客户端中实现
query_arrow() - 使用内存分析器验证零拷贝功能
Phase 2: Polars at Entry Points
阶段2:在入口处使用Polars
- Add wrapper at trainer entry
pl.from_pandas() - Update to accept both types
prepare_sequences() - Add schema validation after conversion
- 在训练器入口添加包装器
pl.from_pandas() - 更新以支持两种类型
prepare_sequences() - 转换后添加Schema验证
Phase 3: Full Lazy Evaluation
阶段3:完全延迟计算
- Convert file reads to
pl.scan_* - Compose transformations lazily
- Call only before
.collect().to_numpy()
- 将文件读取转换为
pl.scan_* - 延迟组合转换操作
- 仅在之前调用
.to_numpy().collect()
9. Anti-Patterns to Avoid
9. 需要避免的反模式
DON'T: Mix APIs Unnecessarily
不要:不必要地混合API
python
undefinedpython
undefinedBAD: Convert back and forth
糟糕:来回转换
df_polars = pl.from_pandas(df_pandas)
df_pandas_again = df_polars.to_pandas() # Why?
undefineddf_polars = pl.from_pandas(df_pandas)
df_pandas_again = df_polars.to_pandas() # 何必多此一举?
undefinedDON'T: Collect Too Early
不要:过早调用collect
python
undefinedpython
undefinedBAD: Defeats lazy evaluation
糟糕:违背延迟计算的初衷
df = pl.scan_parquet("data.parquet").collect() # Full load
filtered = df.filter(...) # After the fact
df = pl.scan_parquet("data.parquet").collect() # 加载全部数据
filtered = df.filter(...) # 事后过滤
GOOD: Filter before collect
优秀:先过滤再collect
df = pl.scan_parquet("data.parquet").filter(...).collect()
undefineddf = pl.scan_parquet("data.parquet").filter(...).collect()
undefinedDON'T: Ignore Memory Pressure
不要:忽略内存压力
python
undefinedpython
undefinedBAD: Loads entire file
糟糕:加载整个文件
df = pl.read_parquet("huge_file.parquet")
df = pl.read_parquet("huge_file.parquet")
GOOD: Stream in chunks
优秀:分块流式处理
for batch in pl.scan_parquet("huge_file.parquet").iter_batches():
process(batch)
---for batch in pl.scan_parquet("huge_file.parquet").iter_batches():
process(batch)
---References
参考资料
Troubleshooting
故障排除
| Issue | Cause | Solution |
|---|---|---|
| Memory spike during load | Collecting too early | Use lazy evaluation, call collect() only when needed |
| Arrow conversion fails | Unsupported data type | Check for object columns, convert to native types |
| ClickHouse connection error | Wrong port or credentials | Verify host:8123 (HTTP) or host:9000 (native) |
| Zero-copy not working | Intermediate pandas conversion | Remove to_pandas() calls, stay in Arrow/Polars |
| Polars hook blocking code | Pandas used without exception | Add |
| Slow group-by operations | Using pandas for large datasets | Migrate to Polars for 5-10x speedup |
| Schema validation failure | Column names case-sensitive | Verify exact column names from source |
| PyTorch DataLoader OOM | Loading full dataset into memory | Use PolarsDataset with Arrow backing for lazy access |
| Parquet scan performance | Not using predicate pushdown | Add filters before collect() for lazy evaluation |
| Type mismatch in tensor | Float64 vs Float32 mismatch | Explicitly cast with .cast(pl.Float32) before numpy |
| 问题描述 | 原因 | 解决方案 |
|---|---|---|
| 加载时内存突增 | 过早调用collect | 使用延迟计算,仅在需要时调用collect() |
| Arrow转换失败 | 不支持的数据类型 | 检查是否存在object类型列,转换为原生类型 |
| ClickHouse连接错误 | 端口或凭证错误 | 验证主机端口:8123(HTTP)或9000(原生协议) |
| 零拷贝功能不生效 | 中间进行了Pandas转换 | 移除to_pandas()调用,全程使用Arrow/Polars |
| Polars钩子阻止代码运行 | 使用Pandas未添加例外声明 | 在文件顶部添加 |
| 分组聚合操作缓慢 | 对大数据集使用Pandas | 迁移到Polars以获得5-10倍的速度提升 |
| Schema验证失败 | 列名区分大小写 | 验证与数据源完全一致的列名 |
| PyTorch DataLoader内存不足 | 将整个数据集加载到内存中 | 使用基于Arrow的PolarsDataset实现延迟访问 |
| Parquet扫描性能差 | 未使用谓词下推 | 在collect()之前添加过滤条件以利用延迟计算 |
| 张量类型不匹配 | Float64与Float32不匹配 | 在转换为numpy前使用.cast(pl.Float32)显式转换 |