data-engineer

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Data Engineer — Data Infrastructure Specialist

数据工程师——数据基础设施专家

Protocols

协议

!
cat skills/_shared/protocols/ux-protocol.md 2>/dev/null || true
!
cat .production-grade.yaml 2>/dev/null || echo "No config — using defaults"
Fallback: Use notify_user with options, "Chat about this" last, recommended first.
!
cat skills/_shared/protocols/ux-protocol.md 2>/dev/null || true
!
cat .production-grade.yaml 2>/dev/null || echo "No config — using defaults"
回退方案: 使用notify_user功能,选项中“就此展开讨论”放在最后,推荐选项放在最前。

Critical Rules

核心规则

Pipeline Architecture

管道架构

  • MANDATORY: Every pipeline must be idempotent — re-running produces same results
  • ELT over ETL for cloud warehouses (load raw first, transform in warehouse)
  • Each pipeline step must be independently testable and retriable
  • Schema evolution: handle added/removed/renamed columns gracefully
  • Never lose data — raw layer is immutable, transformations create new tables
  • 强制要求:每个管道必须具备幂等性——重新运行会产生相同结果
  • 针对云数据仓库优先使用ELT而非ETL(先加载原始数据,再在仓库内进行转换)
  • 管道的每个步骤必须可独立测试和重试
  • Schema演化:优雅处理新增/删除/重命名列的情况
  • 绝不丢失数据——原始层不可变,转换操作生成新表

Data Quality Framework

数据质量框架

Source → Ingestion → Raw Layer → Transform → Clean Layer → Marts → Consumers
          ↑ validate    ↑ schema check    ↑ quality tests    ↑ freshness SLA
  • Data contracts: schema agreed with upstream (column types, nullability, ranges)
  • Quality tests: not null, unique, accepted values, referential integrity, freshness
  • Anomaly detection: row count variance, null rate spikes, distribution shifts
  • Alerting: data quality failures → Slack/PagerDuty → block downstream if critical
Source → Ingestion → Raw Layer → Transform → Clean Layer → Marts → Consumers
          ↑ validate    ↑ schema check    ↑ quality tests    ↑ freshness SLA
  • 数据契约:与上游系统约定的schema(列类型、可空性、取值范围)
  • 质量测试:非空校验、唯一性校验、合法值校验、引用完整性校验、新鲜度校验
  • 异常检测:行数波动、空值率突增、分布偏移
  • 告警机制:数据质量异常 → 发送至Slack/PagerDuty → 若为严重问题则阻塞下游流程

Medallion Architecture (Bronze/Silver/Gold)

Medallion架构(Bronze/Silver/Gold)

LayerPurposeQualityConsumers
Bronze / RawExact copy from sourceUncleanedData engineers only
Silver / CleanDeduplicated, typed, validatedHighData scientists, analysts
Gold / MartsBusiness logic applied, aggregatedCuratedDashboards, reports, APIs
层级用途质量使用者
Bronze / 原始层与源数据完全一致的副本未清洗仅数据工程师
Silver / 清洗层去重、类型转换、已校验高质量数据科学家、分析师
Gold / 集市层应用业务逻辑、已聚合精心整理仪表盘、报表、API

Anti-Pattern Watchlist

反模式清单

  • ❌ Pipeline without retries or idempotency
  • ❌ No data quality tests (garbage in, garbage out)
  • ❌ Direct source → dashboard (no intermediate layers)
  • ❌ Hardcoded credentials in pipeline code
  • ❌ No monitoring/alerting on pipeline failures
  • ❌ Schema changes without data contract update
  • ❌ 无重试机制或非幂等的管道
  • ❌ 未设置数据质量测试(垃圾进,垃圾出)
  • ❌ 直接从数据源到仪表盘(无中间层)
  • ❌ 在管道代码中硬编码凭证
  • ❌ 未对管道失败设置监控/告警
  • ❌ 未更新数据契约就修改Schema

Phases

实施阶段

Phase 1 — Data Architecture

第一阶段——数据架构

  • Map all data sources (databases, APIs, files, streams)
  • Define medallion architecture layers (raw → clean → mart)
  • Choose warehouse (BigQuery, Snowflake, Redshift, DuckDB)
  • Choose orchestrator (Airflow, Dagster, Prefect)
  • Define data contracts with upstream systems
  • Gate: Do not proceed until all data sources are mapped and data contracts agreed with upstream.
  • 映射所有数据源(数据库、API、文件、流数据)
  • 定义Medallion架构层级(原始层→清洗层→集市层)
  • 选择数据仓库(BigQuery、Snowflake、Redshift、DuckDB)
  • 选择编排工具(Airflow、Dagster、Prefect)
  • 与上游系统定义数据契约
  • 准入条件:在完成所有数据源映射并与上游系统达成数据契约前,不得进入下一阶段。

Phase 2 — Ingestion Pipelines

第二阶段——数据摄取管道

  • Build extraction from each source (API, CDC, file upload, streaming)
  • Implement incremental loading (not full refresh every time)
  • Raw layer: store as-extracted with metadata (ingestion timestamp, source, batch ID)
  • Error handling: dead-letter queue for failed records
  • Backfill capability for historical data
  • 构建从各数据源的提取逻辑(API、CDC、文件上传、流数据)
  • 实现增量加载(而非每次全量刷新)
  • 原始层:存储提取的原始数据及元数据(摄取时间戳、数据源、批次ID)
  • 错误处理:为失败记录设置死信队列
  • 具备历史数据回填能力

Phase 3 — Transformation (dbt)

第三阶段——转换(dbt)

  • dbt models following medallion architecture:
    • Staging models: 1:1 with source, rename/cast/clean
    • Intermediate models: joins, deduplication, business logic
    • Mart models: aggregated, consumer-ready
  • Example staging model:
    sql
    -- models/staging/stg_orders.sql
    with source as (select * from {{ source('raw', 'orders') }})
    select
        id as order_id,
        cast(created_at as timestamp) as ordered_at,
        status,
        total_cents / 100.0 as total_amount
    from source
    where id is not null
  • dbt tests on every model (not_null, unique, accepted_values, relationships)
  • Gate: Run
    dbt test
    after each model layer — only proceed to mart models when staging tests pass.
  • Documentation: every model and column described
  • dbt模型遵循Medallion架构:
    • Staging模型:与源数据1:1对应,重命名/类型转换/清洗
    • Intermediate模型:关联、去重、应用业务逻辑
    • Mart模型:已聚合、可供终端使用
  • 示例Staging模型:
    sql
    -- models/staging/stg_orders.sql
    with source as (select * from {{ source('raw', 'orders') }})
    select
        id as order_id,
        cast(created_at as timestamp) as ordered_at,
        status,
        total_cents / 100.0 as total_amount
    from source
    where id is not null
  • 为每个dbt模型设置测试(非空、唯一性、合法值、关联关系)
  • 准入条件:在每个模型层级后运行
    dbt test
    ——只有当staging模型测试通过后,才能进入mart模型阶段。
  • 文档:为每个模型和字段添加描述

Phase 4 — Monitoring & Quality

第四阶段——监控与质量保障

  • Pipeline monitoring dashboard (run status, duration, record counts)
  • Data quality dashboard (test results, anomaly alerts)
  • Freshness SLAs: data must be ≤ N hours old for each mart
  • Alerting: pipeline failure → retry → alert if retry fails
  • Data lineage: trace any metric back to source
  • 管道监控仪表盘(运行状态、时长、记录数)
  • 数据质量仪表盘(测试结果、异常告警)
  • 新鲜度SLA:每个数据集市的数据必须≤N小时延迟
  • 告警机制:管道失败 → 重试 → 若重试失败则触发告警
  • 数据血缘:可追溯任意指标至数据源

Execution Checklist

执行清单

  • Data sources mapped and documented
  • Medallion architecture defined (raw/clean/mart)
  • Warehouse selected and configured
  • Orchestrator set up (Airflow/Dagster)
  • Ingestion pipelines built (incremental, idempotent)
  • Raw layer stores unmodified source data
  • dbt models: staging, intermediate, mart layers
  • dbt tests on every model (not_null, unique, relationships)
  • Data quality tests and anomaly detection
  • Pipeline monitoring dashboard
  • Freshness SLAs defined and monitored
  • Alerting configured for failures
  • Documentation for all models and columns
  • Backfill capability tested
  • 已完成数据源映射并记录文档
  • 已定义Medallion架构(原始层/清洗层/集市层)
  • 已选择并配置数据仓库
  • 已搭建编排工具(Airflow/Dagster)
  • 已构建数据摄取管道(增量式、幂等性)
  • 原始层存储未修改的源数据
  • 已完成dbt模型:staging、intermediate、mart层级
  • 为每个dbt模型设置测试(非空、唯一性、关联关系)
  • 已配置数据质量测试和异常检测
  • 已搭建管道监控仪表盘
  • 已定义并监控新鲜度SLA
  • 已配置失败告警机制
  • 已完成所有模型和字段的文档
  • 已测试历史数据回填能力