data-engineering-study-material

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Data Engineering Study Material

数据工程学习资料

Skill by ara.so — Data Skills collection.
ara.so提供的技能资料 —— 数据技能合集。

Overview

概述

This project is a comprehensive study guide and reference repository for data engineering concepts, tools, and practices. It serves as a centralized resource for learning core data engineering principles, understanding modern data stack components, and preparing for data engineering roles.
The repository covers:
  • Data engineering fundamentals and architecture patterns
  • ETL/ELT pipeline design and implementation
  • Data warehousing and lake architectures
  • Streaming and batch processing frameworks
  • Cloud data platforms (AWS, GCP, Azure)
  • Data quality, governance, and observability
  • Infrastructure as Code and orchestration tools
  • Interview preparation and best practices
本项目是一个涵盖数据工程概念、工具与实践的综合性学习指南和参考仓库。它作为集中式资源,用于学习核心数据工程原理、理解现代数据栈组件,以及为数据工程岗位做准备。
本仓库涵盖以下内容:
  • 数据工程基础与架构模式
  • ETL/ELT管道设计与实现
  • 数据仓库与数据湖架构
  • 流处理与批处理框架
  • 云数据平台(AWS、GCP、Azure)
  • 数据质量、治理与可观测性
  • 基础设施即代码与编排工具
  • 面试准备与最佳实践

Installation

安装说明

This is a study material repository, not an installable package. Clone it to access the materials:
bash
git clone https://github.com/Ahmeduddin3403/data-engineering-study-material.git
cd data-engineering-study-material
这是一个学习资料仓库,并非可安装包。克隆仓库即可获取资料:
bash
git clone https://github.com/Ahmeduddin3403/data-engineering-study-material.git
cd data-engineering-study-material

Repository Structure

仓库结构

The materials are typically organized by topic area:
data-engineering-study-material/
├── fundamentals/          # Core concepts and principles
├── tools/                 # Tool-specific guides
├── architectures/         # Design patterns and architectures
├── pipelines/             # ETL/ELT examples
├── cloud-platforms/       # Cloud-specific implementations
├── streaming/             # Real-time processing
├── batch-processing/      # Batch job patterns
├── data-quality/          # Testing and validation
├── orchestration/         # Workflow management
├── interview-prep/        # Interview questions and answers
└── projects/              # Hands-on project examples
资料通常按主题领域组织:
data-engineering-study-material/
├── fundamentals/          # 核心概念与原理
├── tools/                 # 工具专属指南
├── architectures/         # 设计模式与架构
├── pipelines/             # ETL/ELT示例
├── cloud-platforms/       # 云平台专属实现
├── streaming/             # 实时处理
├── batch-processing/      # 批处理作业模式
├── data-quality/          # 测试与验证
├── orchestration/         # 工作流管理
├── interview-prep/        # 面试问题与答案
└── projects/              # 实操项目示例

Core Data Engineering Concepts

核心数据工程概念

ETL Pipeline Example (Python)

ETL管道示例(Python)

python
import pandas as pd
from sqlalchemy import create_engine
import logging
python
import pandas as pd
from sqlalchemy import create_engine
import logging

Configure logging

Configure logging

logging.basicConfig(level=logging.INFO) logger = logging.getLogger(name)
class ETLPipeline: """Simple ETL pipeline for extracting, transforming, and loading data"""
def __init__(self, source_path, target_conn_string):
    self.source_path = source_path
    self.engine = create_engine(target_conn_string)

def extract(self):
    """Extract data from source"""
    logger.info(f"Extracting data from {self.source_path}")
    df = pd.read_csv(self.source_path)
    logger.info(f"Extracted {len(df)} rows")
    return df

def transform(self, df):
    """Transform data: clean, deduplicate, enrich"""
    logger.info("Transforming data")
    
    # Remove duplicates
    df = df.drop_duplicates()
    
    # Handle missing values
    df = df.fillna({
        'numeric_column': 0,
        'string_column': 'Unknown'
    })
    
    # Add derived columns
    df['created_date'] = pd.to_datetime(df['timestamp']).dt.date
    
    # Data validation
    df = df[df['amount'] > 0]
    
    logger.info(f"Transformed to {len(df)} rows")
    return df

def load(self, df, table_name):
    """Load data to target database"""
    logger.info(f"Loading data to {table_name}")
    df.to_sql(table_name, self.engine, if_exists='append', index=False)
    logger.info("Load complete")

def run(self, table_name):
    """Execute full ETL pipeline"""
    try:
        df = self.extract()
        df_transformed = self.transform(df)
        self.load(df_transformed, table_name)
        logger.info("ETL pipeline completed successfully")
    except Exception as e:
        logger.error(f"ETL pipeline failed: {str(e)}")
        raise
logging.basicConfig(level=logging.INFO) logger = logging.getLogger(name)
class ETLPipeline: """Simple ETL pipeline for extracting, transforming, and loading data"""
def __init__(self, source_path, target_conn_string):
    self.source_path = source_path
    self.engine = create_engine(target_conn_string)

def extract(self):
    """Extract data from source"""
    logger.info(f"Extracting data from {self.source_path}")
    df = pd.read_csv(self.source_path)
    logger.info(f"Extracted {len(df)} rows")
    return df

def transform(self, df):
    """Transform data: clean, deduplicate, enrich"""
    logger.info("Transforming data")
    
    # Remove duplicates
    df = df.drop_duplicates()
    
    # Handle missing values
    df = df.fillna({
        'numeric_column': 0,
        'string_column': 'Unknown'
    })
    
    # Add derived columns
    df['created_date'] = pd.to_datetime(df['timestamp']).dt.date
    
    # Data validation
    df = df[df['amount'] > 0]
    
    logger.info(f"Transformed to {len(df)} rows")
    return df

def load(self, df, table_name):
    """Load data to target database"""
    logger.info(f"Loading data to {table_name}")
    df.to_sql(table_name, self.engine, if_exists='append', index=False)
    logger.info("Load complete")

def run(self, table_name):
    """Execute full ETL pipeline"""
    try:
        df = self.extract()
        df_transformed = self.transform(df)
        self.load(df_transformed, table_name)
        logger.info("ETL pipeline completed successfully")
    except Exception as e:
        logger.error(f"ETL pipeline failed: {str(e)}")
        raise

Usage

Usage

if name == "main": pipeline = ETLPipeline( source_path='data/raw/sales.csv', target_conn_string='postgresql://user:pass@localhost:5432/warehouse' ) pipeline.run('sales_fact')
undefined
if name == "main": pipeline = ETLPipeline( source_path='data/raw/sales.csv', target_conn_string='postgresql://user:pass@localhost:5432/warehouse' ) pipeline.run('sales_fact')
undefined

Data Quality Checks

数据质量检查

python
import great_expectations as ge

def validate_data_quality(df):
    """Implement data quality checks using Great Expectations"""
    
    # Convert pandas DataFrame to GE DataFrame
    ge_df = ge.from_pandas(df)
    
    # Define expectations
    expectations = {
        'id': lambda col: col.expect_column_values_to_be_unique(),
        'email': lambda col: col.expect_column_values_to_match_regex(r'^[\w\.-]+@[\w\.-]+\.\w+$'),
        'amount': lambda col: col.expect_column_values_to_be_between(min_value=0, max_value=1000000),
        'created_at': lambda col: col.expect_column_values_to_not_be_null(),
        'status': lambda col: col.expect_column_values_to_be_in_set(['active', 'inactive', 'pending'])
    }
    
    # Run validations
    results = []
    for column, expectation_func in expectations.items():
        if column in ge_df.columns:
            result = expectation_func(ge_df[column])
            results.append(result)
    
    # Check if all validations passed
    all_passed = all(r.success for r in results)
    
    return all_passed, results
python
import great_expectations as ge

def validate_data_quality(df):
    """Implement data quality checks using Great Expectations"""
    
    # Convert pandas DataFrame to GE DataFrame
    ge_df = ge.from_pandas(df)
    
    # Define expectations
    expectations = {
        'id': lambda col: col.expect_column_values_to_be_unique(),
        'email': lambda col: col.expect_column_values_to_match_regex(r'^[\w\.-]+@[\w\.-]+\.\w+$'),
        'amount': lambda col: col.expect_column_values_to_be_between(min_value=0, max_value=1000000),
        'created_at': lambda col: col.expect_column_values_to_not_be_null(),
        'status': lambda col: col.expect_column_values_to_be_in_set(['active', 'inactive', 'pending'])
    }
    
    # Run validations
    results = []
    for column, expectation_func in expectations.items():
        if column in ge_df.columns:
            result = expectation_func(ge_df[column])
            results.append(result)
    
    # Check if all validations passed
    all_passed = all(r.success for r in results)
    
    return all_passed, results

Apache Airflow DAG Example

Apache Airflow DAG示例

python
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-engineering',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}

def extract_from_api(**context):
    """Extract data from external API"""
    import requests
    import json
    
    response = requests.get('https://api.example.com/data')
    data = response.json()
    
    # Save to S3
    s3_path = f"s3://my-bucket/raw/{context['ds']}/data.json"
    # Upload logic here
    
    return s3_path

def transform_data(**context):
    """Transform extracted data"""
    import pandas as pd
    
    # Read from S3
    s3_path = context['ti'].xcom_pull(task_ids='extract_task')
    df = pd.read_json(s3_path)
    
    # Transformations
    df_transformed = df.drop_duplicates()
    df_transformed['load_date'] = context['ds']
    
    # Write back to S3
    output_path = f"s3://my-bucket/processed/{context['ds']}/data.parquet"
    df_transformed.to_parquet(output_path)
    
    return output_path

with DAG(
    'daily_etl_pipeline',
    default_args=default_args,
    description='Daily ETL pipeline for data processing',
    schedule_interval='0 2 * * *',  # Run at 2 AM daily
    catchup=False,
    tags=['etl', 'daily']
) as dag:
    
    extract_task = PythonOperator(
        task_id='extract_task',
        python_callable=extract_from_api,
        provide_context=True
    )
    
    transform_task = PythonOperator(
        task_id='transform_task',
        python_callable=transform_data,
        provide_context=True
    )
    
    load_task = S3ToRedshiftOperator(
        task_id='load_to_redshift',
        s3_bucket='my-bucket',
        s3_key='processed/{{ ds }}/data.parquet',
        schema='analytics',
        table='fact_table',
        copy_options=['FORMAT AS PARQUET']
    )
    
    data_quality_check = PostgresOperator(
        task_id='quality_check',
        postgres_conn_id='redshift_conn',
        sql="""
            SELECT COUNT(*) as row_count 
            FROM analytics.fact_table 
            WHERE load_date = '{{ ds }}';
        """
    )
    
    extract_task >> transform_task >> load_task >> data_quality_check
python
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-engineering',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}

def extract_from_api(**context):
    """Extract data from external API"""
    import requests
    import json
    
    response = requests.get('https://api.example.com/data')
    data = response.json()
    
    # Save to S3
    s3_path = f"s3://my-bucket/raw/{context['ds']}/data.json"
    # Upload logic here
    
    return s3_path

def transform_data(**context):
    """Transform extracted data"""
    import pandas as pd
    
    # Read from S3
    s3_path = context['ti'].xcom_pull(task_ids='extract_task')
    df = pd.read_json(s3_path)
    
    # Transformations
    df_transformed = df.drop_duplicates()
    df_transformed['load_date'] = context['ds']
    
    # Write back to S3
    output_path = f"s3://my-bucket/processed/{context['ds']}/data.parquet"
    df_transformed.to_parquet(output_path)
    
    return output_path

with DAG(
    'daily_etl_pipeline',
    default_args=default_args,
    description='Daily ETL pipeline for data processing',
    schedule_interval='0 2 * * *',  # Run at 2 AM daily
    catchup=False,
    tags=['etl', 'daily']
) as dag:
    
    extract_task = PythonOperator(
        task_id='extract_task',
        python_callable=extract_from_api,
        provide_context=True
    )
    
    transform_task = PythonOperator(
        task_id='transform_task',
        python_callable=transform_data,
        provide_context=True
    )
    
    load_task = S3ToRedshiftOperator(
        task_id='load_to_redshift',
        s3_bucket='my-bucket',
        s3_key='processed/{{ ds }}/data.parquet',
        schema='analytics',
        table='fact_table',
        copy_options=['FORMAT AS PARQUET']
    )
    
    data_quality_check = PostgresOperator(
        task_id='quality_check',
        postgres_conn_id='redshift_conn',
        sql="""
            SELECT COUNT(*) as row_count 
            FROM analytics.fact_table 
            WHERE load_date = '{{ ds }}';
        """
    )
    
    extract_task >> transform_task >> load_task >> data_quality_check

Spark Batch Processing Example

Spark批处理示例

python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, sum, avg, count, to_date
from pyspark.sql.window import Window

def process_batch_data():
    """Process large-scale batch data with Apache Spark"""
    
    # Initialize Spark session
    spark = SparkSession.builder \
        .appName("BatchDataProcessing") \
        .config("spark.sql.adaptive.enabled", "true") \
        .getOrCreate()
    
    # Read data from data lake
    df = spark.read \
        .format("parquet") \
        .load("s3a://data-lake/raw/transactions/")
    
    # Transformations
    df_transformed = df \
        .withColumn("transaction_date", to_date(col("timestamp"))) \
        .filter(col("amount") > 0) \
        .dropDuplicates(["transaction_id"])
    
    # Aggregations
    daily_summary = df_transformed.groupBy("transaction_date", "category") \
        .agg(
            count("transaction_id").alias("transaction_count"),
            sum("amount").alias("total_amount"),
            avg("amount").alias("avg_amount")
        )
    
    # Window functions for ranking
    window_spec = Window.partitionBy("transaction_date").orderBy(col("total_amount").desc())
    
    ranked_summary = daily_summary \
        .withColumn("rank", dense_rank().over(window_spec)) \
        .filter(col("rank") <= 10)
    
    # Write to data warehouse
    ranked_summary.write \
        .format("parquet") \
        .mode("overwrite") \
        .partitionBy("transaction_date") \
        .save("s3a://data-warehouse/analytics/daily_summary/")
    
    spark.stop()

if __name__ == "__main__":
    process_batch_data()
python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, sum, avg, count, to_date
from pyspark.sql.window import Window

def process_batch_data():
    """Process large-scale batch data with Apache Spark"""
    
    # Initialize Spark session
    spark = SparkSession.builder \
        .appName("BatchDataProcessing") \
        .config("spark.sql.adaptive.enabled", "true") \
        .getOrCreate()
    
    # Read data from data lake
    df = spark.read \
        .format("parquet") \
        .load("s3a://data-lake/raw/transactions/")
    
    # Transformations
    df_transformed = df \
        .withColumn("transaction_date", to_date(col("timestamp"))) \
        .filter(col("amount") > 0) \
        .dropDuplicates(["transaction_id"])
    
    # Aggregations
    daily_summary = df_transformed.groupBy("transaction_date", "category") \
        .agg(
            count("transaction_id").alias("transaction_count"),
            sum("amount").alias("total_amount"),
            avg("amount").alias("avg_amount")
        )
    
    # Window functions for ranking
    window_spec = Window.partitionBy("transaction_date").orderBy(col("total_amount").desc())
    
    ranked_summary = daily_summary \
        .withColumn("rank", dense_rank().over(window_spec)) \
        .filter(col("rank") <= 10)
    
    # Write to data warehouse
    ranked_summary.write \
        .format("parquet") \
        .mode("overwrite") \
        .partitionBy("transaction_date") \
        .save("s3a://data-warehouse/analytics/daily_summary/")
    
    spark.stop()

if __name__ == "__main__":
    process_batch_data()

Streaming Pipeline Example (Kafka + Spark Structured Streaming)

流处理管道示例(Kafka + Spark Structured Streaming)

python
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

def create_streaming_pipeline():
    """Real-time data processing with Spark Structured Streaming"""
    
    spark = SparkSession.builder \
        .appName("RealTimeStreaming") \
        .getOrCreate()
    
    # Define schema for incoming data
    schema = StructType([
        StructField("event_id", StringType(), True),
        StructField("user_id", StringType(), True),
        StructField("event_type", StringType(), True),
        StructField("amount", DoubleType(), True),
        StructField("timestamp", TimestampType(), True)
    ])
    
    # Read from Kafka
    df_stream = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "latest") \
        .load()
    
    # Parse JSON data
    df_parsed = df_stream.select(
        from_json(col("value").cast("string"), schema).alias("data")
    ).select("data.*")
    
    # Windowed aggregations
    df_windowed = df_parsed \
        .withWatermark("timestamp", "10 minutes") \
        .groupBy(
            window(col("timestamp"), "5 minutes", "1 minute"),
            col("event_type")
        ) \
        .agg(
            count("event_id").alias("event_count"),
            sum("amount").alias("total_amount")
        )
    
    # Write to sink
    query = df_windowed.writeStream \
        .outputMode("append") \
        .format("parquet") \
        .option("path", "s3a://streaming-output/events/") \
        .option("checkpointLocation", "s3a://checkpoints/events/") \
        .trigger(processingTime="1 minute") \
        .start()
    
    query.awaitTermination()

if __name__ == "__main__":
    create_streaming_pipeline()
python
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

def create_streaming_pipeline():
    """Real-time data processing with Spark Structured Streaming"""
    
    spark = SparkSession.builder \
        .appName("RealTimeStreaming") \
        .getOrCreate()
    
    # Define schema for incoming data
    schema = StructType([
        StructField("event_id", StringType(), True),
        StructField("user_id", StringType(), True),
        StructField("event_type", StringType(), True),
        StructField("amount", DoubleType(), True),
        StructField("timestamp", TimestampType(), True)
    ])
    
    # Read from Kafka
    df_stream = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "latest") \
        .load()
    
    # Parse JSON data
    df_parsed = df_stream.select(
        from_json(col("value").cast("string"), schema).alias("data")
    ).select("data.*")
    
    # Windowed aggregations
    df_windowed = df_parsed \
        .withWatermark("timestamp", "10 minutes") \
        .groupBy(
            window(col("timestamp"), "5 minutes", "1 minute"),
            col("event_type")
        ) \
        .agg(
            count("event_id").alias("event_count"),
            sum("amount").alias("total_amount")
        )
    
    # Write to sink
    query = df_windowed.writeStream \
        .outputMode("append") \
        .format("parquet") \
        .option("path", "s3a://streaming-output/events/") \
        .option("checkpointLocation", "s3a://checkpoints/events/") \
        .trigger(processingTime="1 minute") \
        .start()
    
    query.awaitTermination()

if __name__ == "__main__":
    create_streaming_pipeline()

Common Patterns

常见模式

Incremental Data Loading

增量数据加载

python
def incremental_load(table_name, watermark_column, last_watermark):
    """Load only new/updated records since last run"""
    
    query = f"""
        SELECT *
        FROM {table_name}
        WHERE {watermark_column} > '{last_watermark}'
        ORDER BY {watermark_column}
    """
    
    # Execute query and get new data
    new_data = pd.read_sql(query, source_conn)
    
    if not new_data.empty:
        # Get new watermark
        new_watermark = new_data[watermark_column].max()
        
        # Load to target
        new_data.to_sql('target_table', target_conn, if_exists='append', index=False)
        
        # Update watermark
        save_watermark(table_name, new_watermark)
        
    return len(new_data)
python
def incremental_load(table_name, watermark_column, last_watermark):
    """Load only new/updated records since last run"""
    
    query = f"""
        SELECT *
        FROM {table_name}
        WHERE {watermark_column} > '{last_watermark}'
        ORDER BY {watermark_column}
    """
    
    # Execute query and get new data
    new_data = pd.read_sql(query, source_conn)
    
    if not new_data.empty:
        # Get new watermark
        new_watermark = new_data[watermark_column].max()
        
        # Load to target
        new_data.to_sql('target_table', target_conn, if_exists='append', index=False)
        
        # Update watermark
        save_watermark(table_name, new_watermark)
        
    return len(new_data)

SCD Type 2 Implementation

SCD Type 2实现

python
def scd_type2_merge(source_df, target_table, business_key, effective_date):
    """Implement Slowly Changing Dimension Type 2"""
    
    from datetime import datetime
    
    # Read current dimension table
    current_df = pd.read_sql(f"SELECT * FROM {target_table} WHERE is_current = 1", conn)
    
    # Identify changes
    merged = source_df.merge(
        current_df,
        on=business_key,
        how='left',
        suffixes=('_new', '_old')
    )
    
    # Records that changed
    changed = merged[
        (merged.apply(lambda row: row_has_changes(row), axis=1))
    ]
    
    # Expire old records
    if not changed.empty:
        expire_query = f"""
            UPDATE {target_table}
            SET is_current = 0,
                end_date = '{effective_date}'
            WHERE {business_key} IN ({','.join(map(str, changed[business_key].tolist()))})
            AND is_current = 1
        """
        conn.execute(expire_query)
    
    # Insert new versions
    new_records = source_df[source_df[business_key].isin(changed[business_key])]
    new_records['start_date'] = effective_date
    new_records['end_date'] = '9999-12-31'
    new_records['is_current'] = 1
    
    new_records.to_sql(target_table, conn, if_exists='append', index=False)
python
def scd_type2_merge(source_df, target_table, business_key, effective_date):
    """Implement Slowly Changing Dimension Type 2"""
    
    from datetime import datetime
    
    # Read current dimension table
    current_df = pd.read_sql(f"SELECT * FROM {target_table} WHERE is_current = 1", conn)
    
    # Identify changes
    merged = source_df.merge(
        current_df,
        on=business_key,
        how='left',
        suffixes=('_new', '_old')
    )
    
    # Records that changed
    changed = merged[
        (merged.apply(lambda row: row_has_changes(row), axis=1))
    ]
    
    # Expire old records
    if not changed.empty:
        expire_query = f"""
            UPDATE {target_table}
            SET is_current = 0,
                end_date = '{effective_date}'
            WHERE {business_key} IN ({','.join(map(str, changed[business_key].tolist()))})
            AND is_current = 1
        """
        conn.execute(expire_query)
    
    # Insert new versions
    new_records = source_df[source_df[business_key].isin(changed[business_key])]
    new_records['start_date'] = effective_date
    new_records['end_date'] = '9999-12-31'
    new_records['is_current'] = 1
    
    new_records.to_sql(target_table, conn, if_exists='append', index=False)

Configuration

配置

Database Connection Configuration

数据库连接配置

python
undefined
python
undefined

config.py

config.py

import os
DATABASE_CONFIG = { 'source': { 'host': os.getenv('SOURCE_DB_HOST'), 'port': os.getenv('SOURCE_DB_PORT', 5432), 'database': os.getenv('SOURCE_DB_NAME'), 'user': os.getenv('SOURCE_DB_USER'), 'password': os.getenv('SOURCE_DB_PASSWORD') }, 'warehouse': { 'host': os.getenv('WAREHOUSE_HOST'), 'port': os.getenv('WAREHOUSE_PORT', 5439), 'database': os.getenv('WAREHOUSE_DB'), 'user': os.getenv('WAREHOUSE_USER'), 'password': os.getenv('WAREHOUSE_PASSWORD') } }
SPARK_CONFIG = { 'spark.executor.memory': '4g', 'spark.driver.memory': '2g', 'spark.sql.adaptive.enabled': 'true', 'spark.sql.adaptive.coalescePartitions.enabled': 'true' }
AIRFLOW_CONFIG = { 'concurrency': 16, 'max_active_runs': 3, 'dagbag_import_timeout': 30 }
undefined
import os
DATABASE_CONFIG = { 'source': { 'host': os.getenv('SOURCE_DB_HOST'), 'port': os.getenv('SOURCE_DB_PORT', 5432), 'database': os.getenv('SOURCE_DB_NAME'), 'user': os.getenv('SOURCE_DB_USER'), 'password': os.getenv('SOURCE_DB_PASSWORD') }, 'warehouse': { 'host': os.getenv('WAREHOUSE_HOST'), 'port': os.getenv('WAREHOUSE_PORT', 5439), 'database': os.getenv('WAREHOUSE_DB'), 'user': os.getenv('WAREHOUSE_USER'), 'password': os.getenv('WAREHOUSE_PASSWORD') } }
SPARK_CONFIG = { 'spark.executor.memory': '4g', 'spark.driver.memory': '2g', 'spark.sql.adaptive.enabled': 'true', 'spark.sql.adaptive.coalescePartitions.enabled': 'true' }
AIRFLOW_CONFIG = { 'concurrency': 16, 'max_active_runs': 3, 'dagbag_import_timeout': 30 }
undefined

Troubleshooting

故障排查

Common Issues and Solutions

常见问题与解决方案

Issue: Out of Memory in Spark Jobs
python
undefined
问题:Spark作业内存不足
python
undefined

Solution: Optimize memory usage

Solution: Optimize memory usage

spark = SparkSession.builder
.config("spark.executor.memory", "8g")
.config("spark.driver.memory", "4g")
.config("spark.sql.shuffle.partitions", "200")
.config("spark.default.parallelism", "200")
.getOrCreate()
spark = SparkSession.builder
.config("spark.executor.memory", "8g")
.config("spark.driver.memory", "4g")
.config("spark.sql.shuffle.partitions", "200")
.config("spark.default.parallelism", "200")
.getOrCreate()

Use broadcast joins for small tables

Use broadcast joins for small tables

from pyspark.sql.functions import broadcast result = large_df.join(broadcast(small_df), "key")

**Issue: Slow Incremental Loads**
```python
from pyspark.sql.functions import broadcast result = large_df.join(broadcast(small_df), "key")

**问题:增量加载速度慢**
```python

Solution: Use partition pruning and indexing

Solution: Use partition pruning and indexing

Add indexes on watermark columns

Add indexes on watermark columns

Partition target tables by date

Partition target tables by date

Use partition pruning in Spark

Use partition pruning in Spark

df = spark.read.parquet("s3://data/table/")
.where(f"partition_date >= '{start_date}'")

**Issue: Data Quality Failures**
```python
df = spark.read.parquet("s3://data/table/")
.where(f"partition_date >= '{start_date}'")

**问题:数据质量验证失败**
```python

Solution: Implement comprehensive validation

Solution: Implement comprehensive validation

def validate_and_quarantine(df, rules): """Separate valid and invalid records"""
valid_df = df
invalid_records = []

for rule_name, rule_func in rules.items():
    mask = rule_func(valid_df)
    invalid = valid_df[~mask].copy()
    invalid['failed_rule'] = rule_name
    invalid_records.append(invalid)
    valid_df = valid_df[mask]

# Save invalid records for review
if invalid_records:
    pd.concat(invalid_records).to_sql(
        'data_quality_quarantine',
        conn,
        if_exists='append'
    )

return valid_df
undefined
def validate_and_quarantine(df, rules): """Separate valid and invalid records"""
valid_df = df
invalid_records = []

for rule_name, rule_func in rules.items():
    mask = rule_func(valid_df)
    invalid = valid_df[~mask].copy()
    invalid['failed_rule'] = rule_name
    invalid_records.append(invalid)
    valid_df = valid_df[mask]

# Save invalid records for review
if invalid_records:
    pd.concat(invalid_records).to_sql(
        'data_quality_quarantine',
        conn,
        if_exists='append'
    )

return valid_df
undefined

Best Practices

最佳实践

  1. Idempotency: Ensure pipelines can be re-run safely
  2. Monitoring: Implement comprehensive logging and alerting
  3. Data Quality: Validate data at every stage
  4. Partitioning: Use appropriate partitioning strategies for performance
  5. Documentation: Document data lineage and transformations
  6. Version Control: Track schema changes and pipeline versions
  7. Testing: Test pipelines with sample data before production
  8. Security: Use IAM roles, encryption, and secure credential management
  1. 幂等性:确保管道可安全重运行
  2. 监控:实现全面的日志记录与告警
  3. 数据质量:在每个阶段验证数据
  4. 分区策略:采用合适的分区策略提升性能
  5. 文档记录:记录数据血缘与转换逻辑
  6. 版本控制:跟踪 schema 变更与管道版本
  7. 测试:上线前用样本数据测试管道
  8. 安全性:使用IAM角色、加密与安全凭证管理

Interview Preparation

面试准备

Common data engineering interview topics covered:
  • SQL optimization and query tuning
  • Distributed systems concepts
  • Data modeling (star schema, snowflake, Data Vault)
  • ETL vs ELT trade-offs
  • CAP theorem and consistency models
  • Data quality frameworks
  • Cloud platform services (S3, Redshift, BigQuery, Databricks)
  • Orchestration tools (Airflow, Prefect, Dagster)
  • Streaming architectures (Kafka, Kinesis, Pub/Sub)
涵盖的常见数据工程面试主题:
  • SQL优化与查询调优
  • 分布式系统概念
  • 数据建模(星型模型、雪花模型、Data Vault)
  • ETL与ELT的权衡
  • CAP定理与一致性模型
  • 数据质量框架
  • 云平台服务(S3、Redshift、BigQuery、Databricks)
  • 编排工具(Airflow、Prefect、Dagster)
  • 流处理架构(Kafka、Kinesis、Pub/Sub)