tracing-upstream-lineage

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Upstream Lineage: Sources

上游数据血缘:数据源

Trace the origins of data - answer "Where does this data come from?"
追踪数据的起源——回答“这些数据来自哪里?”

Lineage Investigation

血缘调查

Step 1: Identify the Target Type

步骤1:确定目标类型

Determine what we're tracing:
  • Table: Trace what populates this table
  • Column: Trace where this specific column comes from
  • DAG: Trace what data sources this DAG reads from
明确我们要追踪的对象:
  • :追踪哪些数据填充了该表
  • :追踪该特定列的来源
  • DAG:追踪该DAG读取的数据源

Step 2: Find the Producing DAG

步骤2:找到生成数据的DAG

Tables are typically populated by Airflow DAGs. Find the connection:
  1. Search DAGs by name: Use
    list_dags
    and look for DAG names matching the table name
    • load_customers
      ->
      customers
      table
    • etl_daily_orders
      ->
      orders
      table
  2. Explore DAG source code: Use
    get_dag_source
    to read the DAG definition
    • Look for INSERT, MERGE, CREATE TABLE statements
    • Find the target table in the code
  3. Check DAG tasks: Use
    list_tasks
    to see what operations the DAG performs
表通常由Airflow DAG填充。找到关联关系:
  1. 按名称搜索DAG:使用
    list_dags
    查找与表名匹配的DAG名称
    • load_customers
      ->
      customers
    • etl_daily_orders
      ->
      orders
  2. 查看DAG源代码:使用
    get_dag_source
    读取DAG定义
    • 查找INSERT、MERGE、CREATE TABLE语句
    • 在代码中找到目标表
  3. 检查DAG任务:使用
    list_tasks
    查看DAG执行的操作

Step 3: Trace Data Sources

步骤3:追踪数据源

From the DAG code, identify source tables and systems:
SQL Sources (look for FROM clauses):
python
undefined
从DAG代码中识别源表和系统:
SQL数据源(查找FROM子句):
python
undefined

In DAG code:

In DAG code:

SELECT * FROM source_schema.source_table # <- This is an upstream source

**External Sources** (look for connection references):
- `S3Operator` -> S3 bucket source
- `PostgresOperator` -> Postgres database source
- `SalesforceOperator` -> Salesforce API source
- `HttpOperator` -> REST API source

**File Sources**:
- CSV/Parquet files in object storage
- SFTP drops
- Local file paths
SELECT * FROM source_schema.source_table # <- This is an upstream source

**外部数据源**(查找连接引用):
- `S3Operator` -> S3存储桶数据源
- `PostgresOperator` -> Postgres数据库数据源
- `SalesforceOperator` -> Salesforce API数据源
- `HttpOperator` -> REST API数据源

**文件数据源**:
- 对象存储中的CSV/Parquet文件
- SFTP上传文件
- 本地文件路径

Step 4: Build the Lineage Chain

步骤4:构建血缘链

Recursively trace each source:
TARGET: analytics.orders_daily
    ^
    +-- DAG: etl_daily_orders
            ^
            +-- SOURCE: raw.orders (table)
            |       ^
            |       +-- DAG: ingest_orders
            |               ^
            |               +-- SOURCE: Salesforce API (external)
            |
            +-- SOURCE: dim.customers (table)
                    ^
                    +-- DAG: load_customers
                            ^
                            +-- SOURCE: PostgreSQL (external DB)
递归追踪每个数据源:
TARGET: analytics.orders_daily
    ^
    +-- DAG: etl_daily_orders
            ^
            +-- SOURCE: raw.orders (table)
            |       ^
            |       +-- DAG: ingest_orders
            |               ^
            |               +-- SOURCE: Salesforce API (external)
            |
            +-- SOURCE: dim.customers (table)
                    ^
                    +-- DAG: load_customers
                            ^
                            +-- SOURCE: PostgreSQL (external DB)

Step 5: Check Source Health

步骤5:检查数据源健康状况

For each upstream source:
  • Tables: Check freshness with the checking-freshness skill
  • DAGs: Check recent run status with
    get_dag_stats
  • External systems: Note connection info from DAG code
针对每个上游数据源:
  • :使用checking-freshness技能检查数据新鲜度
  • DAG:使用
    get_dag_stats
    检查最近的运行状态
  • 外部系统:从DAG代码中查看连接信息

Lineage for Columns

列级血缘追踪

When tracing a specific column:
  1. Find the column in the target table schema
  2. Search DAG source code for references to that column name
  3. Trace through transformations:
    • Direct mappings:
      source.col AS target_col
    • Transformations:
      COALESCE(a.col, b.col) AS target_col
    • Aggregations:
      SUM(detail.amount) AS total_amount
当追踪特定列时:
  1. 在目标表的 schema 中找到该列
  2. 在DAG源代码中搜索该列名的引用
  3. 追踪数据转换过程:
    • 直接映射:
      source.col AS target_col
    • 转换操作:
      COALESCE(a.col, b.col) AS target_col
    • 聚合操作:
      SUM(detail.amount) AS total_amount

Output: Lineage Report

输出:血缘报告

Summary

摘要

One-line answer: "This table is populated by DAG X from sources Y and Z"
一句话总结:“该表由DAG X从数据源Y和Z填充”

Lineage Diagram

血缘关系图

[Salesforce] --> [raw.opportunities] --> [stg.opportunities] --> [fct.sales]
                        |                        |
                   DAG: ingest_sfdc         DAG: transform_sales
[Salesforce] --> [raw.opportunities] --> [stg.opportunities] --> [fct.sales]
                        |                        |
                   DAG: ingest_sfdc         DAG: transform_sales

Source Details

数据源详情

SourceTypeConnectionFreshnessOwner
raw.ordersTableInternal2h agodata-team
SalesforceAPIsalesforce_connReal-timesales-ops
数据源类型连接信息新鲜度负责人
raw.orders内部2小时前数据团队
SalesforceAPIsalesforce_conn实时销售运营团队

Transformation Chain

转换链

Describe how data flows and transforms:
  1. Raw data lands in
    raw.orders
    via Salesforce API sync
  2. DAG
    transform_orders
    cleans and dedupes into
    stg.orders
  3. DAG
    build_order_facts
    joins with dimensions into
    fct.orders
描述数据的流动和转换过程:
  1. 原始数据通过Salesforce API同步到
    raw.orders
  2. DAG
    transform_orders
    清洗并去重数据,存入
    stg.orders
  3. DAG
    build_order_facts
    将数据与维度表关联,存入
    fct.orders

Data Quality Implications

数据质量影响

  • Single points of failure?
  • Stale upstream sources?
  • Complex transformation chains that could break?
  • 是否存在单点故障?
  • 上游数据源是否过时?
  • 复杂的转换链是否可能断裂?

Related Skills

相关技能

  • Check source freshness: checking-freshness skill
  • Debug source DAG: debugging-dags skill
  • Trace downstream impacts: tracing-downstream-lineage skill
  • Add manual lineage annotations: annotating-task-lineage skill
  • Build custom lineage extractors: creating-openlineage-extractors skill
  • 检查数据源新鲜度:checking-freshness技能
  • 调试源DAG:debugging-dags技能
  • 追踪下游影响:tracing-downstream-lineage技能
  • 添加手动血缘注释:annotating-task-lineage技能
  • 构建自定义血缘提取器:creating-openlineage-extractors技能