data-engineering-study-material
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseData Engineering Study Material
数据工程学习资料
Overview
概述
This project is a comprehensive study guide and reference repository for data engineering concepts, tools, and practices. It serves as a centralized resource for learning core data engineering principles, understanding modern data stack components, and preparing for data engineering roles.
The repository covers:
- Data engineering fundamentals and architecture patterns
- ETL/ELT pipeline design and implementation
- Data warehousing and lake architectures
- Streaming and batch processing frameworks
- Cloud data platforms (AWS, GCP, Azure)
- Data quality, governance, and observability
- Infrastructure as Code and orchestration tools
- Interview preparation and best practices
本项目是一个涵盖数据工程概念、工具与实践的综合性学习指南和参考仓库。它作为集中式资源,用于学习核心数据工程原理、理解现代数据栈组件,以及为数据工程岗位做准备。
本仓库涵盖以下内容:
- 数据工程基础与架构模式
- ETL/ELT管道设计与实现
- 数据仓库与数据湖架构
- 流处理与批处理框架
- 云数据平台(AWS、GCP、Azure)
- 数据质量、治理与可观测性
- 基础设施即代码与编排工具
- 面试准备与最佳实践
Installation
安装说明
This is a study material repository, not an installable package. Clone it to access the materials:
bash
git clone https://github.com/Ahmeduddin3403/data-engineering-study-material.git
cd data-engineering-study-material这是一个学习资料仓库,并非可安装包。克隆仓库即可获取资料:
bash
git clone https://github.com/Ahmeduddin3403/data-engineering-study-material.git
cd data-engineering-study-materialRepository Structure
仓库结构
The materials are typically organized by topic area:
data-engineering-study-material/
├── fundamentals/ # Core concepts and principles
├── tools/ # Tool-specific guides
├── architectures/ # Design patterns and architectures
├── pipelines/ # ETL/ELT examples
├── cloud-platforms/ # Cloud-specific implementations
├── streaming/ # Real-time processing
├── batch-processing/ # Batch job patterns
├── data-quality/ # Testing and validation
├── orchestration/ # Workflow management
├── interview-prep/ # Interview questions and answers
└── projects/ # Hands-on project examples资料通常按主题领域组织:
data-engineering-study-material/
├── fundamentals/ # 核心概念与原理
├── tools/ # 工具专属指南
├── architectures/ # 设计模式与架构
├── pipelines/ # ETL/ELT示例
├── cloud-platforms/ # 云平台专属实现
├── streaming/ # 实时处理
├── batch-processing/ # 批处理作业模式
├── data-quality/ # 测试与验证
├── orchestration/ # 工作流管理
├── interview-prep/ # 面试问题与答案
└── projects/ # 实操项目示例Core Data Engineering Concepts
核心数据工程概念
ETL Pipeline Example (Python)
ETL管道示例(Python)
python
import pandas as pd
from sqlalchemy import create_engine
import loggingpython
import pandas as pd
from sqlalchemy import create_engine
import loggingConfigure logging
Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(name)
class ETLPipeline:
"""Simple ETL pipeline for extracting, transforming, and loading data"""
def __init__(self, source_path, target_conn_string):
self.source_path = source_path
self.engine = create_engine(target_conn_string)
def extract(self):
"""Extract data from source"""
logger.info(f"Extracting data from {self.source_path}")
df = pd.read_csv(self.source_path)
logger.info(f"Extracted {len(df)} rows")
return df
def transform(self, df):
"""Transform data: clean, deduplicate, enrich"""
logger.info("Transforming data")
# Remove duplicates
df = df.drop_duplicates()
# Handle missing values
df = df.fillna({
'numeric_column': 0,
'string_column': 'Unknown'
})
# Add derived columns
df['created_date'] = pd.to_datetime(df['timestamp']).dt.date
# Data validation
df = df[df['amount'] > 0]
logger.info(f"Transformed to {len(df)} rows")
return df
def load(self, df, table_name):
"""Load data to target database"""
logger.info(f"Loading data to {table_name}")
df.to_sql(table_name, self.engine, if_exists='append', index=False)
logger.info("Load complete")
def run(self, table_name):
"""Execute full ETL pipeline"""
try:
df = self.extract()
df_transformed = self.transform(df)
self.load(df_transformed, table_name)
logger.info("ETL pipeline completed successfully")
except Exception as e:
logger.error(f"ETL pipeline failed: {str(e)}")
raiselogging.basicConfig(level=logging.INFO)
logger = logging.getLogger(name)
class ETLPipeline:
"""Simple ETL pipeline for extracting, transforming, and loading data"""
def __init__(self, source_path, target_conn_string):
self.source_path = source_path
self.engine = create_engine(target_conn_string)
def extract(self):
"""Extract data from source"""
logger.info(f"Extracting data from {self.source_path}")
df = pd.read_csv(self.source_path)
logger.info(f"Extracted {len(df)} rows")
return df
def transform(self, df):
"""Transform data: clean, deduplicate, enrich"""
logger.info("Transforming data")
# Remove duplicates
df = df.drop_duplicates()
# Handle missing values
df = df.fillna({
'numeric_column': 0,
'string_column': 'Unknown'
})
# Add derived columns
df['created_date'] = pd.to_datetime(df['timestamp']).dt.date
# Data validation
df = df[df['amount'] > 0]
logger.info(f"Transformed to {len(df)} rows")
return df
def load(self, df, table_name):
"""Load data to target database"""
logger.info(f"Loading data to {table_name}")
df.to_sql(table_name, self.engine, if_exists='append', index=False)
logger.info("Load complete")
def run(self, table_name):
"""Execute full ETL pipeline"""
try:
df = self.extract()
df_transformed = self.transform(df)
self.load(df_transformed, table_name)
logger.info("ETL pipeline completed successfully")
except Exception as e:
logger.error(f"ETL pipeline failed: {str(e)}")
raiseUsage
Usage
if name == "main":
pipeline = ETLPipeline(
source_path='data/raw/sales.csv',
target_conn_string='postgresql://user:pass@localhost:5432/warehouse'
)
pipeline.run('sales_fact')
undefinedif name == "main":
pipeline = ETLPipeline(
source_path='data/raw/sales.csv',
target_conn_string='postgresql://user:pass@localhost:5432/warehouse'
)
pipeline.run('sales_fact')
undefinedData Quality Checks
数据质量检查
python
import great_expectations as ge
def validate_data_quality(df):
"""Implement data quality checks using Great Expectations"""
# Convert pandas DataFrame to GE DataFrame
ge_df = ge.from_pandas(df)
# Define expectations
expectations = {
'id': lambda col: col.expect_column_values_to_be_unique(),
'email': lambda col: col.expect_column_values_to_match_regex(r'^[\w\.-]+@[\w\.-]+\.\w+$'),
'amount': lambda col: col.expect_column_values_to_be_between(min_value=0, max_value=1000000),
'created_at': lambda col: col.expect_column_values_to_not_be_null(),
'status': lambda col: col.expect_column_values_to_be_in_set(['active', 'inactive', 'pending'])
}
# Run validations
results = []
for column, expectation_func in expectations.items():
if column in ge_df.columns:
result = expectation_func(ge_df[column])
results.append(result)
# Check if all validations passed
all_passed = all(r.success for r in results)
return all_passed, resultspython
import great_expectations as ge
def validate_data_quality(df):
"""Implement data quality checks using Great Expectations"""
# Convert pandas DataFrame to GE DataFrame
ge_df = ge.from_pandas(df)
# Define expectations
expectations = {
'id': lambda col: col.expect_column_values_to_be_unique(),
'email': lambda col: col.expect_column_values_to_match_regex(r'^[\w\.-]+@[\w\.-]+\.\w+$'),
'amount': lambda col: col.expect_column_values_to_be_between(min_value=0, max_value=1000000),
'created_at': lambda col: col.expect_column_values_to_not_be_null(),
'status': lambda col: col.expect_column_values_to_be_in_set(['active', 'inactive', 'pending'])
}
# Run validations
results = []
for column, expectation_func in expectations.items():
if column in ge_df.columns:
result = expectation_func(ge_df[column])
results.append(result)
# Check if all validations passed
all_passed = all(r.success for r in results)
return all_passed, resultsApache Airflow DAG Example
Apache Airflow DAG示例
python
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
def extract_from_api(**context):
"""Extract data from external API"""
import requests
import json
response = requests.get('https://api.example.com/data')
data = response.json()
# Save to S3
s3_path = f"s3://my-bucket/raw/{context['ds']}/data.json"
# Upload logic here
return s3_path
def transform_data(**context):
"""Transform extracted data"""
import pandas as pd
# Read from S3
s3_path = context['ti'].xcom_pull(task_ids='extract_task')
df = pd.read_json(s3_path)
# Transformations
df_transformed = df.drop_duplicates()
df_transformed['load_date'] = context['ds']
# Write back to S3
output_path = f"s3://my-bucket/processed/{context['ds']}/data.parquet"
df_transformed.to_parquet(output_path)
return output_path
with DAG(
'daily_etl_pipeline',
default_args=default_args,
description='Daily ETL pipeline for data processing',
schedule_interval='0 2 * * *', # Run at 2 AM daily
catchup=False,
tags=['etl', 'daily']
) as dag:
extract_task = PythonOperator(
task_id='extract_task',
python_callable=extract_from_api,
provide_context=True
)
transform_task = PythonOperator(
task_id='transform_task',
python_callable=transform_data,
provide_context=True
)
load_task = S3ToRedshiftOperator(
task_id='load_to_redshift',
s3_bucket='my-bucket',
s3_key='processed/{{ ds }}/data.parquet',
schema='analytics',
table='fact_table',
copy_options=['FORMAT AS PARQUET']
)
data_quality_check = PostgresOperator(
task_id='quality_check',
postgres_conn_id='redshift_conn',
sql="""
SELECT COUNT(*) as row_count
FROM analytics.fact_table
WHERE load_date = '{{ ds }}';
"""
)
extract_task >> transform_task >> load_task >> data_quality_checkpython
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
def extract_from_api(**context):
"""Extract data from external API"""
import requests
import json
response = requests.get('https://api.example.com/data')
data = response.json()
# Save to S3
s3_path = f"s3://my-bucket/raw/{context['ds']}/data.json"
# Upload logic here
return s3_path
def transform_data(**context):
"""Transform extracted data"""
import pandas as pd
# Read from S3
s3_path = context['ti'].xcom_pull(task_ids='extract_task')
df = pd.read_json(s3_path)
# Transformations
df_transformed = df.drop_duplicates()
df_transformed['load_date'] = context['ds']
# Write back to S3
output_path = f"s3://my-bucket/processed/{context['ds']}/data.parquet"
df_transformed.to_parquet(output_path)
return output_path
with DAG(
'daily_etl_pipeline',
default_args=default_args,
description='Daily ETL pipeline for data processing',
schedule_interval='0 2 * * *', # Run at 2 AM daily
catchup=False,
tags=['etl', 'daily']
) as dag:
extract_task = PythonOperator(
task_id='extract_task',
python_callable=extract_from_api,
provide_context=True
)
transform_task = PythonOperator(
task_id='transform_task',
python_callable=transform_data,
provide_context=True
)
load_task = S3ToRedshiftOperator(
task_id='load_to_redshift',
s3_bucket='my-bucket',
s3_key='processed/{{ ds }}/data.parquet',
schema='analytics',
table='fact_table',
copy_options=['FORMAT AS PARQUET']
)
data_quality_check = PostgresOperator(
task_id='quality_check',
postgres_conn_id='redshift_conn',
sql="""
SELECT COUNT(*) as row_count
FROM analytics.fact_table
WHERE load_date = '{{ ds }}';
"""
)
extract_task >> transform_task >> load_task >> data_quality_checkSpark Batch Processing Example
Spark批处理示例
python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, sum, avg, count, to_date
from pyspark.sql.window import Window
def process_batch_data():
"""Process large-scale batch data with Apache Spark"""
# Initialize Spark session
spark = SparkSession.builder \
.appName("BatchDataProcessing") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
# Read data from data lake
df = spark.read \
.format("parquet") \
.load("s3a://data-lake/raw/transactions/")
# Transformations
df_transformed = df \
.withColumn("transaction_date", to_date(col("timestamp"))) \
.filter(col("amount") > 0) \
.dropDuplicates(["transaction_id"])
# Aggregations
daily_summary = df_transformed.groupBy("transaction_date", "category") \
.agg(
count("transaction_id").alias("transaction_count"),
sum("amount").alias("total_amount"),
avg("amount").alias("avg_amount")
)
# Window functions for ranking
window_spec = Window.partitionBy("transaction_date").orderBy(col("total_amount").desc())
ranked_summary = daily_summary \
.withColumn("rank", dense_rank().over(window_spec)) \
.filter(col("rank") <= 10)
# Write to data warehouse
ranked_summary.write \
.format("parquet") \
.mode("overwrite") \
.partitionBy("transaction_date") \
.save("s3a://data-warehouse/analytics/daily_summary/")
spark.stop()
if __name__ == "__main__":
process_batch_data()python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, sum, avg, count, to_date
from pyspark.sql.window import Window
def process_batch_data():
"""Process large-scale batch data with Apache Spark"""
# Initialize Spark session
spark = SparkSession.builder \
.appName("BatchDataProcessing") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
# Read data from data lake
df = spark.read \
.format("parquet") \
.load("s3a://data-lake/raw/transactions/")
# Transformations
df_transformed = df \
.withColumn("transaction_date", to_date(col("timestamp"))) \
.filter(col("amount") > 0) \
.dropDuplicates(["transaction_id"])
# Aggregations
daily_summary = df_transformed.groupBy("transaction_date", "category") \
.agg(
count("transaction_id").alias("transaction_count"),
sum("amount").alias("total_amount"),
avg("amount").alias("avg_amount")
)
# Window functions for ranking
window_spec = Window.partitionBy("transaction_date").orderBy(col("total_amount").desc())
ranked_summary = daily_summary \
.withColumn("rank", dense_rank().over(window_spec)) \
.filter(col("rank") <= 10)
# Write to data warehouse
ranked_summary.write \
.format("parquet") \
.mode("overwrite") \
.partitionBy("transaction_date") \
.save("s3a://data-warehouse/analytics/daily_summary/")
spark.stop()
if __name__ == "__main__":
process_batch_data()Streaming Pipeline Example (Kafka + Spark Structured Streaming)
流处理管道示例(Kafka + Spark Structured Streaming)
python
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
def create_streaming_pipeline():
"""Real-time data processing with Spark Structured Streaming"""
spark = SparkSession.builder \
.appName("RealTimeStreaming") \
.getOrCreate()
# Define schema for incoming data
schema = StructType([
StructField("event_id", StringType(), True),
StructField("user_id", StringType(), True),
StructField("event_type", StringType(), True),
StructField("amount", DoubleType(), True),
StructField("timestamp", TimestampType(), True)
])
# Read from Kafka
df_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "events") \
.option("startingOffsets", "latest") \
.load()
# Parse JSON data
df_parsed = df_stream.select(
from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")
# Windowed aggregations
df_windowed = df_parsed \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window(col("timestamp"), "5 minutes", "1 minute"),
col("event_type")
) \
.agg(
count("event_id").alias("event_count"),
sum("amount").alias("total_amount")
)
# Write to sink
query = df_windowed.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "s3a://streaming-output/events/") \
.option("checkpointLocation", "s3a://checkpoints/events/") \
.trigger(processingTime="1 minute") \
.start()
query.awaitTermination()
if __name__ == "__main__":
create_streaming_pipeline()python
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
def create_streaming_pipeline():
"""Real-time data processing with Spark Structured Streaming"""
spark = SparkSession.builder \
.appName("RealTimeStreaming") \
.getOrCreate()
# Define schema for incoming data
schema = StructType([
StructField("event_id", StringType(), True),
StructField("user_id", StringType(), True),
StructField("event_type", StringType(), True),
StructField("amount", DoubleType(), True),
StructField("timestamp", TimestampType(), True)
])
# Read from Kafka
df_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "events") \
.option("startingOffsets", "latest") \
.load()
# Parse JSON data
df_parsed = df_stream.select(
from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")
# Windowed aggregations
df_windowed = df_parsed \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window(col("timestamp"), "5 minutes", "1 minute"),
col("event_type")
) \
.agg(
count("event_id").alias("event_count"),
sum("amount").alias("total_amount")
)
# Write to sink
query = df_windowed.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "s3a://streaming-output/events/") \
.option("checkpointLocation", "s3a://checkpoints/events/") \
.trigger(processingTime="1 minute") \
.start()
query.awaitTermination()
if __name__ == "__main__":
create_streaming_pipeline()Common Patterns
常见模式
Incremental Data Loading
增量数据加载
python
def incremental_load(table_name, watermark_column, last_watermark):
"""Load only new/updated records since last run"""
query = f"""
SELECT *
FROM {table_name}
WHERE {watermark_column} > '{last_watermark}'
ORDER BY {watermark_column}
"""
# Execute query and get new data
new_data = pd.read_sql(query, source_conn)
if not new_data.empty:
# Get new watermark
new_watermark = new_data[watermark_column].max()
# Load to target
new_data.to_sql('target_table', target_conn, if_exists='append', index=False)
# Update watermark
save_watermark(table_name, new_watermark)
return len(new_data)python
def incremental_load(table_name, watermark_column, last_watermark):
"""Load only new/updated records since last run"""
query = f"""
SELECT *
FROM {table_name}
WHERE {watermark_column} > '{last_watermark}'
ORDER BY {watermark_column}
"""
# Execute query and get new data
new_data = pd.read_sql(query, source_conn)
if not new_data.empty:
# Get new watermark
new_watermark = new_data[watermark_column].max()
# Load to target
new_data.to_sql('target_table', target_conn, if_exists='append', index=False)
# Update watermark
save_watermark(table_name, new_watermark)
return len(new_data)SCD Type 2 Implementation
SCD Type 2实现
python
def scd_type2_merge(source_df, target_table, business_key, effective_date):
"""Implement Slowly Changing Dimension Type 2"""
from datetime import datetime
# Read current dimension table
current_df = pd.read_sql(f"SELECT * FROM {target_table} WHERE is_current = 1", conn)
# Identify changes
merged = source_df.merge(
current_df,
on=business_key,
how='left',
suffixes=('_new', '_old')
)
# Records that changed
changed = merged[
(merged.apply(lambda row: row_has_changes(row), axis=1))
]
# Expire old records
if not changed.empty:
expire_query = f"""
UPDATE {target_table}
SET is_current = 0,
end_date = '{effective_date}'
WHERE {business_key} IN ({','.join(map(str, changed[business_key].tolist()))})
AND is_current = 1
"""
conn.execute(expire_query)
# Insert new versions
new_records = source_df[source_df[business_key].isin(changed[business_key])]
new_records['start_date'] = effective_date
new_records['end_date'] = '9999-12-31'
new_records['is_current'] = 1
new_records.to_sql(target_table, conn, if_exists='append', index=False)python
def scd_type2_merge(source_df, target_table, business_key, effective_date):
"""Implement Slowly Changing Dimension Type 2"""
from datetime import datetime
# Read current dimension table
current_df = pd.read_sql(f"SELECT * FROM {target_table} WHERE is_current = 1", conn)
# Identify changes
merged = source_df.merge(
current_df,
on=business_key,
how='left',
suffixes=('_new', '_old')
)
# Records that changed
changed = merged[
(merged.apply(lambda row: row_has_changes(row), axis=1))
]
# Expire old records
if not changed.empty:
expire_query = f"""
UPDATE {target_table}
SET is_current = 0,
end_date = '{effective_date}'
WHERE {business_key} IN ({','.join(map(str, changed[business_key].tolist()))})
AND is_current = 1
"""
conn.execute(expire_query)
# Insert new versions
new_records = source_df[source_df[business_key].isin(changed[business_key])]
new_records['start_date'] = effective_date
new_records['end_date'] = '9999-12-31'
new_records['is_current'] = 1
new_records.to_sql(target_table, conn, if_exists='append', index=False)Configuration
配置
Database Connection Configuration
数据库连接配置
python
undefinedpython
undefinedconfig.py
config.py
import os
DATABASE_CONFIG = {
'source': {
'host': os.getenv('SOURCE_DB_HOST'),
'port': os.getenv('SOURCE_DB_PORT', 5432),
'database': os.getenv('SOURCE_DB_NAME'),
'user': os.getenv('SOURCE_DB_USER'),
'password': os.getenv('SOURCE_DB_PASSWORD')
},
'warehouse': {
'host': os.getenv('WAREHOUSE_HOST'),
'port': os.getenv('WAREHOUSE_PORT', 5439),
'database': os.getenv('WAREHOUSE_DB'),
'user': os.getenv('WAREHOUSE_USER'),
'password': os.getenv('WAREHOUSE_PASSWORD')
}
}
SPARK_CONFIG = {
'spark.executor.memory': '4g',
'spark.driver.memory': '2g',
'spark.sql.adaptive.enabled': 'true',
'spark.sql.adaptive.coalescePartitions.enabled': 'true'
}
AIRFLOW_CONFIG = {
'concurrency': 16,
'max_active_runs': 3,
'dagbag_import_timeout': 30
}
undefinedimport os
DATABASE_CONFIG = {
'source': {
'host': os.getenv('SOURCE_DB_HOST'),
'port': os.getenv('SOURCE_DB_PORT', 5432),
'database': os.getenv('SOURCE_DB_NAME'),
'user': os.getenv('SOURCE_DB_USER'),
'password': os.getenv('SOURCE_DB_PASSWORD')
},
'warehouse': {
'host': os.getenv('WAREHOUSE_HOST'),
'port': os.getenv('WAREHOUSE_PORT', 5439),
'database': os.getenv('WAREHOUSE_DB'),
'user': os.getenv('WAREHOUSE_USER'),
'password': os.getenv('WAREHOUSE_PASSWORD')
}
}
SPARK_CONFIG = {
'spark.executor.memory': '4g',
'spark.driver.memory': '2g',
'spark.sql.adaptive.enabled': 'true',
'spark.sql.adaptive.coalescePartitions.enabled': 'true'
}
AIRFLOW_CONFIG = {
'concurrency': 16,
'max_active_runs': 3,
'dagbag_import_timeout': 30
}
undefinedTroubleshooting
故障排查
Common Issues and Solutions
常见问题与解决方案
Issue: Out of Memory in Spark Jobs
python
undefined问题:Spark作业内存不足
python
undefinedSolution: Optimize memory usage
Solution: Optimize memory usage
spark = SparkSession.builder
.config("spark.executor.memory", "8g")
.config("spark.driver.memory", "4g")
.config("spark.sql.shuffle.partitions", "200")
.config("spark.default.parallelism", "200")
.getOrCreate()
.config("spark.executor.memory", "8g")
.config("spark.driver.memory", "4g")
.config("spark.sql.shuffle.partitions", "200")
.config("spark.default.parallelism", "200")
.getOrCreate()
spark = SparkSession.builder
.config("spark.executor.memory", "8g")
.config("spark.driver.memory", "4g")
.config("spark.sql.shuffle.partitions", "200")
.config("spark.default.parallelism", "200")
.getOrCreate()
.config("spark.executor.memory", "8g")
.config("spark.driver.memory", "4g")
.config("spark.sql.shuffle.partitions", "200")
.config("spark.default.parallelism", "200")
.getOrCreate()
Use broadcast joins for small tables
Use broadcast joins for small tables
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "key")
**Issue: Slow Incremental Loads**
```pythonfrom pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "key")
**问题:增量加载速度慢**
```pythonSolution: Use partition pruning and indexing
Solution: Use partition pruning and indexing
Add indexes on watermark columns
Add indexes on watermark columns
Partition target tables by date
Partition target tables by date
Use partition pruning in Spark
Use partition pruning in Spark
df = spark.read.parquet("s3://data/table/")
.where(f"partition_date >= '{start_date}'")
.where(f"partition_date >= '{start_date}'")
**Issue: Data Quality Failures**
```pythondf = spark.read.parquet("s3://data/table/")
.where(f"partition_date >= '{start_date}'")
.where(f"partition_date >= '{start_date}'")
**问题:数据质量验证失败**
```pythonSolution: Implement comprehensive validation
Solution: Implement comprehensive validation
def validate_and_quarantine(df, rules):
"""Separate valid and invalid records"""
valid_df = df
invalid_records = []
for rule_name, rule_func in rules.items():
mask = rule_func(valid_df)
invalid = valid_df[~mask].copy()
invalid['failed_rule'] = rule_name
invalid_records.append(invalid)
valid_df = valid_df[mask]
# Save invalid records for review
if invalid_records:
pd.concat(invalid_records).to_sql(
'data_quality_quarantine',
conn,
if_exists='append'
)
return valid_dfundefineddef validate_and_quarantine(df, rules):
"""Separate valid and invalid records"""
valid_df = df
invalid_records = []
for rule_name, rule_func in rules.items():
mask = rule_func(valid_df)
invalid = valid_df[~mask].copy()
invalid['failed_rule'] = rule_name
invalid_records.append(invalid)
valid_df = valid_df[mask]
# Save invalid records for review
if invalid_records:
pd.concat(invalid_records).to_sql(
'data_quality_quarantine',
conn,
if_exists='append'
)
return valid_dfundefinedBest Practices
最佳实践
- Idempotency: Ensure pipelines can be re-run safely
- Monitoring: Implement comprehensive logging and alerting
- Data Quality: Validate data at every stage
- Partitioning: Use appropriate partitioning strategies for performance
- Documentation: Document data lineage and transformations
- Version Control: Track schema changes and pipeline versions
- Testing: Test pipelines with sample data before production
- Security: Use IAM roles, encryption, and secure credential management
- 幂等性:确保管道可安全重运行
- 监控:实现全面的日志记录与告警
- 数据质量:在每个阶段验证数据
- 分区策略:采用合适的分区策略提升性能
- 文档记录:记录数据血缘与转换逻辑
- 版本控制:跟踪 schema 变更与管道版本
- 测试:上线前用样本数据测试管道
- 安全性:使用IAM角色、加密与安全凭证管理
Interview Preparation
面试准备
Common data engineering interview topics covered:
- SQL optimization and query tuning
- Distributed systems concepts
- Data modeling (star schema, snowflake, Data Vault)
- ETL vs ELT trade-offs
- CAP theorem and consistency models
- Data quality frameworks
- Cloud platform services (S3, Redshift, BigQuery, Databricks)
- Orchestration tools (Airflow, Prefect, Dagster)
- Streaming architectures (Kafka, Kinesis, Pub/Sub)
涵盖的常见数据工程面试主题:
- SQL优化与查询调优
- 分布式系统概念
- 数据建模(星型模型、雪花模型、Data Vault)
- ETL与ELT的权衡
- CAP定理与一致性模型
- 数据质量框架
- 云平台服务(S3、Redshift、BigQuery、Databricks)
- 编排工具(Airflow、Prefect、Dagster)
- 流处理架构(Kafka、Kinesis、Pub/Sub)