data-engineering-observability
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChinesePipeline Observability
数据管道可观测性
Tracing and metrics for data pipelines using OpenTelemetry and Prometheus. Instrument code for visibility into performance, errors, and data lineage.
使用OpenTelemetry和Prometheus为数据管道实现追踪与指标监控。通过代码埋点,实现对性能、错误及数据血缘的可视化洞察。
Quick Reference
快速参考
| Tool | Purpose | What it Measures |
|---|---|---|
| OpenTelemetry | Distributed tracing | Pipeline stages, latency, dependencies |
| Prometheus | Metrics | Throughput, error rates, resource utilization |
| Grafana | Visualization | Dashboards combining traces + metrics |
| 工具 | 用途 | 测量内容 |
|---|---|---|
| OpenTelemetry | 分布式追踪 | 管道阶段、延迟、依赖关系 |
| Prometheus | 指标监控 | 吞吐量、错误率、资源利用率 |
| Grafana | 可视化 | 整合追踪与指标的仪表盘 |
Why Observable?
为什么需要可观测性?
- Debugging: Trace failed records through pipeline stages
- Performance: Identify bottlenecks, optimize slow transformations
- Reliability: Set alerts on error rates, SLA breaches
- Cost: Track resource usage, optimize expensive operations
- Compliance: Audit trail of data transformations
- 调试:追踪失败记录在管道各阶段的流转
- 性能优化:识别瓶颈,优化缓慢的转换过程
- 可靠性保障:针对错误率、SLA违规设置告警
- 成本控制:跟踪资源使用,优化高成本操作
- 合规性:数据转换的审计追踪
Skill Dependencies
技能依赖
- - Pipeline structure to instrument
@data-engineering-core - - Prefect/Dagster have built-in observability
@data-engineering-orchestration - - Stream processing patterns need tracing
@data-engineering-streaming
- - 待埋点的管道结构
@data-engineering-core - - Prefect/Dagster内置可观测性功能
@data-engineering-orchestration - - 流处理模式需要追踪功能
@data-engineering-streaming
OpenTelemetry Integration
OpenTelemetry 集成
OpenTelemetry (OTel) provides a vendor-neutral standard for distributed tracing, metrics, and logs.
OpenTelemetry(OTel)提供了一个厂商中立的分布式追踪、指标与日志标准。
Installation
安装
bash
pip install opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlpbash
pip install opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlpBasic Tracing
基础追踪
python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
import loggingpython
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
import loggingSetup tracer provider
Setup tracer provider
provider = TracerProvider()
exporter = OTLPSpanExporter(endpoint="http://localhost:4317")
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("data_pipeline")
def run_pipeline():
with tracer.start_as_current_span("extract") as span:
span.set_attribute("source", "sales.parquet")
span.set_attribute("format", "parquet")
df = pl.scan_parquet("data/sales.parquet").collect()
span.set_attribute("rows_read", len(df))
with tracer.start_as_current_span("transform") as span:
span.set_attribute("operation", "aggregation")
result = df.group_by("category").agg(pl.col("value").sum())
with tracer.start_as_current_span("load") as span:
span.set_attribute("target", "duckdb.summary")
result.to_pandas().to_sql("summary", conn, if_exists="replace")
span.set_attribute("rows_written", len(result))if name == "main":
run_pipeline()
undefinedprovider = TracerProvider()
exporter = OTLPSpanExporter(endpoint="http://localhost:4317")
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("data_pipeline")
def run_pipeline():
with tracer.start_as_current_span("extract") as span:
span.set_attribute("source", "sales.parquet")
span.set_attribute("format", "parquet")
df = pl.scan_parquet("data/sales.parquet").collect()
span.set_attribute("rows_read", len(df))
with tracer.start_as_current_span("transform") as span:
span.set_attribute("operation", "aggregation")
result = df.group_by("category").agg(pl.col("value").sum())
with tracer.start_as_current_span("load") as span:
span.set_attribute("target", "duckdb.summary")
result.to_pandas().to_sql("summary", conn, if_exists="replace")
span.set_attribute("rows_written", len(result))if name == "main":
run_pipeline()
undefinedTrace Context Propagation
追踪上下文传递
For multi-service pipelines, pass trace context:
python
from opentelemetry import propagators
from opentelemetry.propagators.b3 import B3Format对于多服务管道,传递追踪上下文:
python
from opentelemetry import propagators
from opentelemetry.propagators.b3 import B3FormatInject trace context into message headers (Kafka, HTTP)
Inject trace context into message headers (Kafka, HTTP)
carrier = {}
propagator = B3Format()
propagator.inject(carrier, context=trace.get_current_span().get_context())
carrier = {}
propagator = B3Format()
propagator.inject(carrier, context=trace.get_current_span().get_context())
Send carrier dict with message (e.g., Kafka header)
Send carrier dict with message (e.g., Kafka header)
producer.produce(
topic="events",
key=key,
value=json.dumps(data),
headers=list(carrier.items())
)
producer.produce(
topic="events",
key=key,
value=json.dumps(data),
headers=list(carrier.items())
)
Consumer extracts context
Consumer extracts context
context = propagator.extract(carrier=carrier)
with tracer.start_as_current_span("process_message", context=context):
process(data)
---context = propagator.extract(carrier=carrier)
with tracer.start_as_current_span("process_message", context=context):
process(data)
---Prometheus Metrics
Prometheus 指标
Prometheus collects numeric time series data. Push or pull metrics from your application.
Prometheus 收集数值型时间序列数据。可从应用中推送或拉取指标。
Installation
安装
bash
pip install prometheus-clientbash
pip install prometheus-clientBasic Instrumentation
基础埋点
python
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import timepython
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import timeDefine metrics
Define metrics
ROWS_PROCESSED = Counter(
'etl_rows_processed_total',
'Total rows processed by ETL',
['source', 'stage']
)
PROCESSING_TIME = Histogram(
'etl_processing_seconds',
'Time spent processing',
['operation'],
buckets=[0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0]
)
PIPELINE_ERRORS = Counter(
'etl_errors_total',
'Total preprocessing errors',
['stage', 'error_type']
)
MEMORY_USAGE = Gauge(
'etl_memory_bytes',
'Process memory usage in bytes'
)
ROWS_PROCESSED = Counter(
'etl_rows_processed_total',
'Total rows processed by ETL',
['source', 'stage']
)
PROCESSING_TIME = Histogram(
'etl_processing_seconds',
'Time spent processing',
['operation'],
buckets=[0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0]
)
PIPELINE_ERRORS = Counter(
'etl_errors_total',
'Total preprocessing errors',
['stage', 'error_type']
)
MEMORY_USAGE = Gauge(
'etl_memory_bytes',
'Process memory usage in bytes'
)
Start metrics server (Prometheus scrapes this endpoint)
Start metrics server (Prometheus scrapes this endpoint)
start_http_server(8000)
def process_batch(stage: str, batch_id: int):
with PROCESSING_TIME.time(operation=f"batch_{batch_id}"):
try:
rows = extract_and_process(batch_id)
ROWS_PROCESSED.labels(source="kafka", stage=stage).inc(rows)
return rows
except Exception as e:
PIPELINE_ERRORS.labels(stage=stage, error_type=type(e).name).inc()
raise
start_http_server(8000)
def process_batch(stage: str, batch_id: int):
with PROCESSING_TIME.time(operation=f"batch_{batch_id}"):
try:
rows = extract_and_process(batch_id)
ROWS_PROCESSED.labels(source="kafka", stage=stage).inc(rows)
return rows
except Exception as e:
PIPELINE_ERRORS.labels(stage=stage, error_type=type(e).name).inc()
raise
Periodic gauge update
Periodic gauge update
import psutil
def update_memory():
process = psutil.Process()
MEMORY_USAGE.set(process.memory_info().rss)
undefinedimport psutil
def update_memory():
process = psutil.Process()
MEMORY_USAGE.set(process.memory_info().rss)
undefinedCustom Collector
自定义收集器
python
from prometheus_client import CollectorRegistry, Gauge
registry = CollectorRegistry()python
from prometheus_client import CollectorRegistry, Gauge
registry = CollectorRegistry()Custom gauge that computes on demand
Custom gauge that computes on demand
queue_size = Gauge(
'kafka_queue_size',
'Number of messages in queue',
registry=registry
)
def collect_queue_size():
size = kafka_consumer.metrics()['fetch-metrics']['records-lag-max']
queue_size.set(size)
queue_size = Gauge(
'kafka_queue_size',
'Number of messages in queue',
registry=registry
)
def collect_queue_size():
size = kafka_consumer.metrics()['fetch-metrics']['records-lag-max']
queue_size.set(size)
Register with push gateway or scrape
Register with push gateway or scrape
---
---Integration with Orchestration
与编排工具集成
Prefect Built-in Observability
Prefect 内置可观测性
Prefect automatically records:
- Task run status (success/failure)
- Duration
- Retry counts
- Parameters
Enable Prefect Cloud/Server for UI:
bash
prefect cloud login # or prefect server start
prefect agent start -q 'default'Prefect 自动记录:
- 任务运行状态(成功/失败)
- 执行时长
- 重试次数
- 参数
启用Prefect Cloud/Server以使用UI界面:
bash
prefect cloud login # or prefect server start
prefect agent start -q 'default'Dagster Observability
Dagster 可观测性
Dagster Dagit UI shows:
- Asset materialization history
- Run duration and status
- Asset lineage graph
- Resource usage
Enable metrics:
python
from dagster import DagsterMetric
@asset
def monitored_asset():
# Dagster automatically records metrics
passDagster Dagit UI 展示:
- 资产物化历史
- 运行时长与状态
- 资产血缘图
- 资源使用情况
启用指标:
python
from dagster import DagsterMetric
@asset
def monitored_asset():
# Dagster automatically records metrics
passDashboards & Alerting
仪表盘与告警
Grafana Dashboard Example
Grafana 仪表盘示例
Create dashboard with panels:
- Throughput:
rate(etl_rows_processed_total[5m]) - Latency:
histogram_quantile(0.95, etl_processing_seconds_bucket) - Error Rate:
rate(etl_errors_total[5m]) - Memory:
etl_memory_bytes / 1024 / 1024
创建包含以下面板的仪表盘:
- 吞吐量:
rate(etl_rows_processed_total[5m]) - 延迟:
histogram_quantile(0.95, etl_processing_seconds_bucket) - 错误率:
rate(etl_errors_total[5m]) - 内存:
etl_memory_bytes / 1024 / 1024
Alert Rules (Prometheus Alertmanager)
告警规则(Prometheus Alertmanager)
yaml
groups:
- name: etl-alerts
rules:
- alert: HighErrorRate
expr: rate(etl_errors_total[5m]) > 0.1
for: 2m
labels:
severity: critical
annotations:
summary: "ETL error rate elevated"
description: "{{ $labels.stage }} stage error rate: {{ $value }} errors/sec"yaml
groups:
- name: etl-alerts
rules:
- alert: HighErrorRate
expr: rate(etl_errors_total[5m]) > 0.1
for: 2m
labels:
severity: critical
annotations:
summary: "ETL error rate elevated"
description: "{{ $labels.stage }} stage error rate: {{ $value }} errors/sec"Best Practices
最佳实践
Instrumentation
埋点
- ✅ Span every pipeline stage - extract, transform, load, validate
- ✅ Add attributes - dataset names, row counts, file paths
- ✅ Propagate context across async boundaries (threads, processes, network)
- ✅ Record errors in spans with
span.record_exception() - ✅ Sample judiciously - 100% in dev, lower in prod (sampling policy)
- ✅ 为每个管道阶段添加Span - 抽取、转换、加载、验证
- ✅ 添加属性 - 数据集名称、行数、文件路径
- ✅ 跨异步边界传递上下文(线程、进程、网络)
- ✅ 在Span中记录错误,使用
span.record_exception() - ✅ 合理采样 - 开发环境100%采样,生产环境降低采样率(设置采样策略)
Metrics
指标
- ✅ Use counters for events (rows processed, errors)
- ✅ Use histograms for durations (processing time, latency)
- ✅ Use gauges for state (queue size, memory usage)
- ✅ Label dimensions (stage, source, status) but avoid cardinality explosion
- ✅ Export endpoint on separate port (8000) outside app port
- ✅ 使用计数器统计事件(处理的行数、错误数)
- ✅ 使用直方图统计时长(处理时间、延迟)
- ✅ 使用仪表盘统计状态(队列大小、内存使用)
- ✅ 添加标签维度(阶段、来源、状态),但避免基数爆炸
- ✅ 在独立端口暴露导出端点(如8000),与应用端口分离
Production
生产环境
- ✅ Centralized logs - send structured logs to ELK/Datadog
- ✅ Correlation IDs - Include trace IDs in log entries
- ✅ Alert on SLA breaches - latency > threshold, error rate > X%
- ✅ Test observability - Simulate failures, verify traces/metrics
- ✅ Document schema - Define metric names and label values in README
- ✅ 集中式日志 - 将结构化日志发送至ELK/Datadog
- ✅ 关联ID - 在日志条目中包含追踪ID
- ✅ 针对SLA违规设置告警 - 延迟超过阈值、错误率高于X%
- ✅ 测试可观测性 - 模拟故障,验证追踪/指标是否正常
- ✅ 文档化 schema - 在README中定义指标名称与标签值
References
参考资料
- OpenTelemetry Python
- Prometheus Python Client
- Grafana Dashboarding
- - Prefect/Dagster observability features
@data-engineering-orchestration
- OpenTelemetry Python
- Prometheus Python Client
- Grafana Dashboarding
- - Prefect/Dagster 可观测性功能
@data-engineering-orchestration