senior-data-engineer

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Senior Data Engineer

资深数据工程师

Expert-level data engineering for scalable data systems.
专业级的数据工程内容,面向可扩展的数据系统。

Core Competencies

核心能力

  • Data pipeline development
  • ETL/ELT design
  • Data warehousing
  • Stream processing
  • Data quality and governance
  • Data modeling
  • Performance optimization
  • Cloud data platforms
  • 数据管道开发
  • ETL/ELT 设计
  • 数据仓库构建
  • 流处理
  • 数据质量与治理
  • 数据建模
  • 性能优化
  • 云数据平台

Data Pipeline Architecture

数据管道架构

Batch Pipeline

批处理管道

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Sources   │────▶│  Transform  │────▶│   Target    │
│  (S3, API)  │     │  (Spark)    │     │  (Snowflake)│
└─────────────┘     └─────────────┘     └─────────────┘
       │                   │                   │
       │                   ▼                   │
       │           ┌─────────────┐             │
       └──────────▶│   Quality   │◀────────────┘
                   │   Checks    │
                   └─────────────┘
┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Sources   │────▶│  Transform  │────▶│   Target    │
│  (S3, API)  │     │  (Spark)    │     │  (Snowflake)│
└─────────────┘     └─────────────┘     └─────────────┘
       │                   │                   │
       │                   ▼                   │
       │           ┌─────────────┐             │
       └──────────▶│   Quality   │◀────────────┘
                   │   Checks    │
                   └─────────────┘

Streaming Pipeline

流处理管道

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Kafka     │────▶│  Processor  │────▶│  Real-time  │
│   Topics    │     │  (Flink)    │     │    Store    │
└─────────────┘     └─────────────┘     └─────────────┘
                    ┌─────────────┐
                    │   Batch     │
                    │   Layer     │
                    └─────────────┘
┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Kafka     │────▶│  Processor  │────▶│  Real-time  │
│   Topics    │     │  (Flink)    │     │    Store    │
└─────────────┘     └─────────────┘     └─────────────┘
                    ┌─────────────┐
                    │   Batch     │
                    │   Layer     │
                    └─────────────┘

Apache Airflow

Apache Airflow

DAG Structure

DAG结构

python
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.utils.dates import days_ago
from datetime import timedelta

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'email': ['data-alerts@company.com'],
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='daily_etl_pipeline',
    default_args=default_args,
    description='Daily ETL for analytics',
    schedule_interval='0 6 * * *',
    start_date=days_ago(1),
    catchup=False,
    tags=['etl', 'daily'],
) as dag:

    extract_task = PythonOperator(
        task_id='extract_data',
        python_callable=extract_from_source,
        op_kwargs={'date': '{{ ds }}'},
    )

    transform_task = PythonOperator(
        task_id='transform_data',
        python_callable=transform_data,
    )

    quality_check = PythonOperator(
        task_id='quality_check',
        python_callable=run_quality_checks,
    )

    load_task = SnowflakeOperator(
        task_id='load_to_snowflake',
        sql='sql/load_data.sql',
        snowflake_conn_id='snowflake_default',
        params={'date': '{{ ds }}'},
    )

    extract_task >> transform_task >> quality_check >> load_task
python
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.utils.dates import days_ago
from datetime import timedelta

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'email': ['data-alerts@company.com'],
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='daily_etl_pipeline',
    default_args=default_args,
    description='Daily ETL for analytics',
    schedule_interval='0 6 * * *',
    start_date=days_ago(1),
    catchup=False,
    tags=['etl', 'daily'],
) as dag:

    extract_task = PythonOperator(
        task_id='extract_data',
        python_callable=extract_from_source,
        op_kwargs={'date': '{{ ds }}'},
    )

    transform_task = PythonOperator(
        task_id='transform_data',
        python_callable=transform_data,
    )

    quality_check = PythonOperator(
        task_id='quality_check',
        python_callable=run_quality_checks,
    )

    load_task = SnowflakeOperator(
        task_id='load_to_snowflake',
        sql='sql/load_data.sql',
        snowflake_conn_id='snowflake_default',
        params={'date': '{{ ds }}'},
    )

    extract_task >> transform_task >> quality_check >> load_task

Dynamic DAG Generation

动态DAG生成

python
from airflow.decorators import dag, task
from datetime import datetime

@dag(
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
)
def dynamic_etl():

    @task
    def get_sources():
        return ['source_a', 'source_b', 'source_c']

    @task
    def extract(source: str):
        print(f'Extracting from {source}')
        return f'{source}_data'

    @task
    def transform(data: str):
        print(f'Transforming {data}')
        return f'{data}_transformed'

    @task
    def load(data: list):
        print(f'Loading {len(data)} datasets')

    sources = get_sources()
    extracted = extract.expand(source=sources)
    transformed = transform.expand(data=extracted)
    load(transformed)

dag = dynamic_etl()
python
from airflow.decorators import dag, task
from datetime import datetime

@dag(
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
)
def dynamic_etl():

    @task
    def get_sources():
        return ['source_a', 'source_b', 'source_c']

    @task
    def extract(source: str):
        print(f'Extracting from {source}')
        return f'{source}_data'

    @task
    def transform(data: str):
        print(f'Transforming {data}')
        return f'{data}_transformed'

    @task
    def load(data: list):
        print(f'Loading {len(data)} datasets')

    sources = get_sources()
    extracted = extract.expand(source=sources)
    transformed = transform.expand(data=extracted)
    load(transformed)

dag = dynamic_etl()

Apache Spark

Apache Spark

Spark ETL Job

Spark ETL任务

python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder \
    .appName("ETL Pipeline") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()


def read_source_data(path: str, date: str):
    return spark.read \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .csv(f"{path}/date={date}/*.csv")


def transform_data(df):
    return df \
        .withColumn("amount_clean",
            when(col("amount").isNull(), 0).otherwise(col("amount"))) \
        .withColumn("category_normalized",
            when(col("category").isin(["A", "B"]), col("category"))
            .otherwise("OTHER")) \
        .withColumn("processed_at", current_timestamp()) \
        .filter(col("status") == "active") \
        .dropDuplicates(["id"])


def aggregate_metrics(df):
    return df.groupBy("date", "category") \
        .agg(
            F.count("*").alias("total_count"),
            F.sum("amount").alias("total_amount"),
            F.avg("amount").alias("avg_amount"),
            F.countDistinct("user_id").alias("unique_users")
        )


def write_to_warehouse(df, table_name: str, partition_cols: list):
    df.write \
        .mode("overwrite") \
        .partitionBy(partition_cols) \
        .format("delta") \
        .saveAsTable(table_name)
python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder \
    .appName("ETL Pipeline") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()


def read_source_data(path: str, date: str):
    return spark.read \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .csv(f"{path}/date={date}/*.csv")


def transform_data(df):
    return df \
        .withColumn("amount_clean",
            when(col("amount").isNull(), 0).otherwise(col("amount"))) \
        .withColumn("category_normalized",
            when(col("category").isin(["A", "B"]), col("category"))
            .otherwise("OTHER")) \
        .withColumn("processed_at", current_timestamp()) \
        .filter(col("status") == "active") \
        .dropDuplicates(["id"])


def aggregate_metrics(df):
    return df.groupBy("date", "category") \
        .agg(
            F.count("*").alias("total_count"),
            F.sum("amount").alias("total_amount"),
            F.avg("amount").alias("avg_amount"),
            F.countDistinct("user_id").alias("unique_users")
        )


def write_to_warehouse(df, table_name: str, partition_cols: list):
    df.write \
        .mode("overwrite") \
        .partitionBy(partition_cols) \
        .format("delta") \
        .saveAsTable(table_name)

Main pipeline

Main pipeline

raw_df = read_source_data("s3://bucket/raw/", "2024-01-15") clean_df = transform_data(raw_df) metrics_df = aggregate_metrics(clean_df)
write_to_warehouse(clean_df, "clean.transactions", ["date"]) write_to_warehouse(metrics_df, "analytics.daily_metrics", ["date"])
undefined
raw_df = read_source_data("s3://bucket/raw/", "2024-01-15") clean_df = transform_data(raw_df) metrics_df = aggregate_metrics(clean_df)
write_to_warehouse(clean_df, "clean.transactions", ["date"]) write_to_warehouse(metrics_df, "analytics.daily_metrics", ["date"])
undefined

Performance Optimization

性能优化

python
undefined
python
undefined

Partition tuning

Partition tuning

df = spark.read.parquet("s3://bucket/data/")
.repartition(100, "date") # Repartition by key
df = spark.read.parquet("s3://bucket/data/")
.repartition(100, "date") # Repartition by key

Broadcast join for small tables

Broadcast join for small tables

from pyspark.sql.functions import broadcast
small_df = spark.table("lookup_table") large_df = spark.table("fact_table")
result = large_df.join(broadcast(small_df), "key")
from pyspark.sql.functions import broadcast
small_df = spark.table("lookup_table") large_df = spark.table("fact_table")
result = large_df.join(broadcast(small_df), "key")

Cache intermediate results

Cache intermediate results

cleaned_df = transform(raw_df) cleaned_df.cache()
metrics_1 = cleaned_df.groupBy("category").count() metrics_2 = cleaned_df.groupBy("region").sum("amount")
cleaned_df.unpersist()
cleaned_df = transform(raw_df) cleaned_df.cache()
metrics_1 = cleaned_df.groupBy("category").count() metrics_2 = cleaned_df.groupBy("region").sum("amount")
cleaned_df.unpersist()

Predicate pushdown

Predicate pushdown

df = spark.read.parquet("s3://bucket/data/")
.filter(col("date") >= "2024-01-01") # Pushed to storage layer
undefined
df = spark.read.parquet("s3://bucket/data/")
.filter(col("date") >= "2024-01-01") # Pushed to storage layer
undefined

Data Modeling

数据建模

Dimensional Modeling

维度建模

Star Schema:
sql
-- Fact Table
CREATE TABLE fact_sales (
    sale_id BIGINT PRIMARY KEY,
    date_key INT REFERENCES dim_date(date_key),
    product_key INT REFERENCES dim_product(product_key),
    customer_key INT REFERENCES dim_customer(customer_key),
    store_key INT REFERENCES dim_store(store_key),
    quantity INT,
    unit_price DECIMAL(10,2),
    total_amount DECIMAL(10,2),
    discount_amount DECIMAL(10,2)
);

-- Dimension Tables
CREATE TABLE dim_date (
    date_key INT PRIMARY KEY,
    full_date DATE,
    year INT,
    quarter INT,
    month INT,
    week INT,
    day_of_week INT,
    is_weekend BOOLEAN,
    is_holiday BOOLEAN
);

CREATE TABLE dim_product (
    product_key INT PRIMARY KEY,
    product_id VARCHAR(50),
    product_name VARCHAR(200),
    category VARCHAR(100),
    subcategory VARCHAR(100),
    brand VARCHAR(100),
    -- SCD Type 2 columns
    effective_date DATE,
    end_date DATE,
    is_current BOOLEAN
);
星型模型:
sql
-- Fact Table
CREATE TABLE fact_sales (
    sale_id BIGINT PRIMARY KEY,
    date_key INT REFERENCES dim_date(date_key),
    product_key INT REFERENCES dim_product(product_key),
    customer_key INT REFERENCES dim_customer(customer_key),
    store_key INT REFERENCES dim_store(store_key),
    quantity INT,
    unit_price DECIMAL(10,2),
    total_amount DECIMAL(10,2),
    discount_amount DECIMAL(10,2)
);

-- Dimension Tables
CREATE TABLE dim_date (
    date_key INT PRIMARY KEY,
    full_date DATE,
    year INT,
    quarter INT,
    month INT,
    week INT,
    day_of_week INT,
    is_weekend BOOLEAN,
    is_holiday BOOLEAN
);

CREATE TABLE dim_product (
    product_key INT PRIMARY KEY,
    product_id VARCHAR(50),
    product_name VARCHAR(200),
    category VARCHAR(100),
    subcategory VARCHAR(100),
    brand VARCHAR(100),
    -- SCD Type 2 columns
    effective_date DATE,
    end_date DATE,
    is_current BOOLEAN
);

dbt Models

dbt模型

staging/stg_orders.sql:
sql
{{ config(materialized='view') }}

with source as (
    select * from {{ source('raw', 'orders') }}
),

renamed as (
    select
        order_id,
        customer_id,
        order_date,
        cast(total_amount as decimal(10,2)) as total_amount,
        status,
        _loaded_at
    from source
    where order_date >= '2023-01-01'
)

select * from renamed
marts/fct_orders.sql:
sql
{{ config(
    materialized='incremental',
    unique_key='order_id',
    incremental_strategy='merge'
) }}

with orders as (
    select * from {{ ref('stg_orders') }}
),

customers as (
    select * from {{ ref('dim_customers') }}
),

final as (
    select
        o.order_id,
        o.customer_id,
        c.customer_segment,
        o.order_date,
        o.total_amount,
        o.status,
        current_timestamp() as processed_at
    from orders o
    left join customers c on o.customer_id = c.customer_id
    {% if is_incremental() %}
    where o._loaded_at > (select max(processed_at) from {{ this }})
    {% endif %}
)

select * from final
staging/stg_orders.sql:
sql
{{ config(materialized='view') }}

with source as (
    select * from {{ source('raw', 'orders') }}
),

renamed as (
    select
        order_id,
        customer_id,
        order_date,
        cast(total_amount as decimal(10,2)) as total_amount,
        status,
        _loaded_at
    from source
    where order_date >= '2023-01-01'
)

select * from renamed
marts/fct_orders.sql:
sql
{{ config(
    materialized='incremental',
    unique_key='order_id',
    incremental_strategy='merge'
) }}

with orders as (
    select * from {{ ref('stg_orders') }}
),

customers as (
    select * from {{ ref('dim_customers') }}
),

final as (
    select
        o.order_id,
        o.customer_id,
        c.customer_segment,
        o.order_date,
        o.total_amount,
        o.status,
        current_timestamp() as processed_at
    from orders o
    left join customers c on o.customer_id = c.customer_id
    {% if is_incremental() %}
    where o._loaded_at > (select max(processed_at) from {{ this }})
    {% endif %}
)

select * from final

Data Quality

数据质量

Great Expectations

Great Expectations

python
import great_expectations as gx

context = gx.get_context()
python
import great_expectations as gx

context = gx.get_context()

Define expectations

Define expectations

expectation_suite = context.add_expectation_suite("orders_suite")
expectations = [ gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id"), gx.expectations.ExpectColumnValuesToBeUnique(column="order_id"), gx.expectations.ExpectColumnValuesToBeBetween( column="amount", min_value=0, max_value=100000 ), gx.expectations.ExpectColumnValuesToMatchRegex( column="email", regex=r"^[\w.-]+@[\w.-]+.\w+$" ), gx.expectations.ExpectTableRowCountToBeBetween( min_value=1000, max_value=1000000 ), ]
for exp in expectations: expectation_suite.add_expectation(exp)
expectation_suite = context.add_expectation_suite("orders_suite")
expectations = [ gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id"), gx.expectations.ExpectColumnValuesToBeUnique(column="order_id"), gx.expectations.ExpectColumnValuesToBeBetween( column="amount", min_value=0, max_value=100000 ), gx.expectations.ExpectColumnValuesToMatchRegex( column="email", regex=r"^[\w.-]+@[\w.-]+.\w+$" ), gx.expectations.ExpectTableRowCountToBeBetween( min_value=1000, max_value=1000000 ), ]
for exp in expectations: expectation_suite.add_expectation(exp)

Run validation

Run validation

batch = context.get_batch(datasource_name="orders", data_asset_name="daily") results = context.run_validation_operator( "action_list_operator", assets_to_validate=[batch], expectation_suite_name="orders_suite" )
if not results["success"]: raise DataQualityError("Validation failed")
undefined
batch = context.get_batch(datasource_name="orders", data_asset_name="daily") results = context.run_validation_operator( "action_list_operator", assets_to_validate=[batch], expectation_suite_name="orders_suite" )
if not results["success"]: raise DataQualityError("Validation failed")
undefined

Custom Quality Checks

自定义质量检查

python
from dataclasses import dataclass
from typing import Callable

@dataclass
class QualityCheck:
    name: str
    check_fn: Callable
    severity: str  # 'error' or 'warning'


def check_no_nulls(df, column: str) -> bool:
    null_count = df.filter(col(column).isNull()).count()
    return null_count == 0


def check_referential_integrity(df, ref_df, key: str) -> bool:
    orphans = df.join(ref_df, key, "left_anti")
    return orphans.count() == 0


def check_freshness(df, timestamp_col: str, max_hours: int) -> bool:
    from pyspark.sql.functions import max as spark_max, current_timestamp
    max_ts = df.agg(spark_max(timestamp_col)).collect()[0][0]
    hours_old = (datetime.now() - max_ts).total_seconds() / 3600
    return hours_old <= max_hours


def run_quality_checks(df, checks: list[QualityCheck]):
    results = []
    for check in checks:
        passed = check.check_fn(df)
        results.append({
            'check': check.name,
            'passed': passed,
            'severity': check.severity
        })

    errors = [r for r in results if not r['passed'] and r['severity'] == 'error']
    if errors:
        raise DataQualityError(f"Quality checks failed: {errors}")

    return results
python
from dataclasses import dataclass
from typing import Callable

@dataclass
class QualityCheck:
    name: str
    check_fn: Callable
    severity: str  # 'error' or 'warning'


def check_no_nulls(df, column: str) -> bool:
    null_count = df.filter(col(column).isNull()).count()
    return null_count == 0


def check_referential_integrity(df, ref_df, key: str) -> bool:
    orphans = df.join(ref_df, key, "left_anti")
    return orphans.count() == 0


def check_freshness(df, timestamp_col: str, max_hours: int) -> bool:
    from pyspark.sql.functions import max as spark_max, current_timestamp
    max_ts = df.agg(spark_max(timestamp_col)).collect()[0][0]
    hours_old = (datetime.now() - max_ts).total_seconds() / 3600
    return hours_old <= max_hours


def run_quality_checks(df, checks: list[QualityCheck]):
    results = []
    for check in checks:
        passed = check.check_fn(df)
        results.append({
            'check': check.name,
            'passed': passed,
            'severity': check.severity
        })

    errors = [r for r in results if not r['passed'] and r['severity'] == 'error']
    if errors:
        raise DataQualityError(f"Quality checks failed: {errors}")

    return results

Stream Processing

流处理

Kafka + Flink

Kafka + Flink

python
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
python
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)

Define Kafka source

Define Kafka source

t_env.execute_sql(""" CREATE TABLE orders ( order_id STRING, customer_id STRING, amount DECIMAL(10,2), order_time TIMESTAMP(3), WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'orders', 'properties.bootstrap.servers' = 'kafka:9092', 'properties.group.id' = 'order-processor', 'format' = 'json', 'scan.startup.mode' = 'latest-offset' ) """)
t_env.execute_sql(""" CREATE TABLE orders ( order_id STRING, customer_id STRING, amount DECIMAL(10,2), order_time TIMESTAMP(3), WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'orders', 'properties.bootstrap.servers' = 'kafka:9092', 'properties.group.id' = 'order-processor', 'format' = 'json', 'scan.startup.mode' = 'latest-offset' ) """)

Windowed aggregation

Windowed aggregation

t_env.execute_sql(""" CREATE TABLE order_metrics ( window_start TIMESTAMP(3), window_end TIMESTAMP(3), total_orders BIGINT, total_amount DECIMAL(10,2) ) WITH ( 'connector' = 'kafka', 'topic' = 'order-metrics', 'properties.bootstrap.servers' = 'kafka:9092', 'format' = 'json' ) """)
t_env.execute_sql(""" INSERT INTO order_metrics SELECT TUMBLE_START(order_time, INTERVAL '1' MINUTE) as window_start, TUMBLE_END(order_time, INTERVAL '1' MINUTE) as window_end, COUNT(*) as total_orders, SUM(amount) as total_amount FROM orders GROUP BY TUMBLE(order_time, INTERVAL '1' MINUTE) """)
undefined
t_env.execute_sql(""" CREATE TABLE order_metrics ( window_start TIMESTAMP(3), window_end TIMESTAMP(3), total_orders BIGINT, total_amount DECIMAL(10,2) ) WITH ( 'connector' = 'kafka', 'topic' = 'order-metrics', 'properties.bootstrap.servers' = 'kafka:9092', 'format' = 'json' ) """)
t_env.execute_sql(""" INSERT INTO order_metrics SELECT TUMBLE_START(order_time, INTERVAL '1' MINUTE) as window_start, TUMBLE_END(order_time, INTERVAL '1' MINUTE) as window_end, COUNT(*) as total_orders, SUM(amount) as total_amount FROM orders GROUP BY TUMBLE(order_time, INTERVAL '1' MINUTE) """)
undefined

Reference Materials

参考资料

  • references/pipeline_patterns.md
    - Pipeline architecture patterns
  • references/data_modeling.md
    - Dimensional modeling guide
  • references/quality_framework.md
    - Data quality practices
  • references/streaming_guide.md
    - Stream processing patterns
  • references/pipeline_patterns.md
    - 管道架构模式
  • references/data_modeling.md
    - 维度建模指南
  • references/quality_framework.md
    - 数据质量实践规范
  • references/streaming_guide.md
    - 流处理模式

Scripts

脚本

bash
undefined
bash
undefined

Pipeline scaffolder

Pipeline scaffolder

python scripts/pipeline_scaffold.py --name daily_etl --schedule daily
python scripts/pipeline_scaffold.py --name daily_etl --schedule daily

Data quality runner

Data quality runner

python scripts/quality_check.py --suite orders_suite --data s3://bucket/orders/
python scripts/quality_check.py --suite orders_suite --data s3://bucket/orders/

Schema migration

Schema migration

python scripts/schema_migrate.py --target snowflake --version v2
python scripts/schema_migrate.py --target snowflake --version v2

Backfill runner

Backfill runner

python scripts/backfill.py --dag daily_etl --start 2024-01-01 --end 2024-01-31
undefined
python scripts/backfill.py --dag daily_etl --start 2024-01-01 --end 2024-01-31
undefined