big-data
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseBig Data & Distributed Computing
大数据与分布式计算
Production-grade big data processing with Apache Spark, distributed systems patterns, and petabyte-scale data engineering.
基于Apache Spark、分布式系统模式的生产级大数据处理,以及PB级数据工程实践。
Quick Start
快速开始
python
undefinedpython
undefinedPySpark 3.5+ modern DataFrame API
PySpark 3.5+ modern DataFrame API
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
Initialize Spark with optimal settings
Initialize Spark with optimal settings
spark = (SparkSession.builder
.appName("ProductionETL")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate())
spark = (SparkSession.builder
.appName("ProductionETL")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate())
Efficient data loading with schema enforcement
Efficient data loading with schema enforcement
from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType
schema = StructType([
StructField("event_id", StringType(), False),
StructField("user_id", LongType(), False),
StructField("event_type", StringType(), False),
StructField("timestamp", TimestampType(), False),
StructField("properties", StringType(), True)
])
df = (spark.read
.schema(schema)
.parquet("s3://bucket/events/")
.filter(F.col("timestamp") >= F.current_date() - 30))
from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType
schema = StructType([
StructField("event_id", StringType(), False),
StructField("user_id", LongType(), False),
StructField("event_type", StringType(), False),
StructField("timestamp", TimestampType(), False),
StructField("properties", StringType(), True)
])
df = (spark.read
.schema(schema)
.parquet("s3://bucket/events/")
.filter(F.col("timestamp") >= F.current_date() - 30))
Complex aggregation with window functions
Complex aggregation with window functions
window_spec = Window.partitionBy("user_id").orderBy("timestamp")
result = (df
.withColumn("event_rank", F.row_number().over(window_spec))
.withColumn("session_id", F.sum(
F.when(
F.col("timestamp") - F.lag("timestamp").over(window_spec) > F.expr("INTERVAL 30 MINUTES"),
1
).otherwise(0)
).over(window_spec))
.groupBy("user_id", "session_id")
.agg(
F.count("*").alias("event_count"),
F.min("timestamp").alias("session_start"),
F.max("timestamp").alias("session_end")
))
result.write.mode("overwrite").parquet("s3://bucket/sessions/")
undefinedwindow_spec = Window.partitionBy("user_id").orderBy("timestamp")
result = (df
.withColumn("event_rank", F.row_number().over(window_spec))
.withColumn("session_id", F.sum(
F.when(
F.col("timestamp") - F.lag("timestamp").over(window_spec) > F.expr("INTERVAL 30 MINUTES"),
1
).otherwise(0)
).over(window_spec))
.groupBy("user_id", "session_id")
.agg(
F.count("*").alias("event_count"),
F.min("timestamp").alias("session_start"),
F.max("timestamp").alias("session_end")
))
result.write.mode("overwrite").parquet("s3://bucket/sessions/")
undefinedCore Concepts
核心概念
1. Spark Architecture Deep Dive
1. Spark架构深度解析
┌─────────────────────────────────────────────────────────┐
│ Driver Program │
│ ┌─────────────────────────────────────────────────┐ │
│ │ SparkContext/SparkSession │ │
│ │ - Creates execution plan (DAG) │ │
│ │ - Coordinates with Cluster Manager │ │
│ │ - Schedules tasks │ │
│ └─────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Cluster Manager (YARN/K8s/Standalone) │
└─────────────────────────────────────────────────────────┘
│
┌────────────────┼────────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Executor │ │ Executor │ │ Executor │
│ ┌──────┐ │ │ ┌──────┐ │ │ ┌──────┐ │
│ │Task 1│ │ │ │Task 2│ │ │ │Task 3│ │
│ │Task 4│ │ │ │Task 5│ │ │ │Task 6│ │
│ └──────┘ │ │ └──────┘ │ │ └──────┘ │
│ Cache │ │ Cache │ │ Cache │
└──────────┘ └──────────┘ └──────────┘┌─────────────────────────────────────────────────────────┐
│ Driver Program │
│ ┌─────────────────────────────────────────────────┐ │
│ │ SparkContext/SparkSession │ │
│ │ - Creates execution plan (DAG) │ │
│ │ - Coordinates with Cluster Manager │ │
│ │ - Schedules tasks │ │
│ └─────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Cluster Manager (YARN/K8s/Standalone) │
└─────────────────────────────────────────────────────────┘
│
┌────────────────┼────────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Executor │ │ Executor │ │ Executor │
│ ┌──────┐ │ │ ┌──────┐ │ │ ┌──────┐ │
│ │Task 1│ │ │ │Task 2│ │ │ │Task 3│ │
│ │Task 4│ │ │ │Task 5│ │ │ │Task 6│ │
│ └──────┘ │ │ └──────┘ │ │ └──────┘ │
│ Cache │ │ Cache │ │ Cache │
└──────────┘ └──────────┘ └──────────┘2. Partition Optimization
2. 分区优化
python
from pyspark.sql import functions as Fpython
from pyspark.sql import functions as FCheck current partitioning
Check current partitioning
print(f"Partitions: {df.rdd.getNumPartitions()}")
print(f"Partitions: {df.rdd.getNumPartitions()}")
Rule of thumb: 128MB per partition, 2-4 partitions per core
Rule of thumb: 128MB per partition, 2-4 partitions per core
For 100GB data on 10 executors with 4 cores each:
For 100GB data on 10 executors with 4 cores each:
100GB / 128MB ≈ 800 partitions, or 40 cores * 4 = 160 partitions
100GB / 128MB ≈ 800 partitions, or 40 cores * 4 = 160 partitions
Use: 200-400 partitions
Use: 200-400 partitions
Repartition by key (for joins)
Repartition by key (for joins)
df_repartitioned = df.repartition(200, "user_id")
df_repartitioned = df.repartition(200, "user_id")
Coalesce (reduce partitions without shuffle)
Coalesce (reduce partitions without shuffle)
df_coalesced = df.coalesce(100)
df_coalesced = df.coalesce(100)
Optimal write partitioning
Optimal write partitioning
df.repartition(F.year("date"), F.month("date"))
.write
.partitionBy("year", "month")
.mode("overwrite")
.parquet("s3://bucket/output/")
.write
.partitionBy("year", "month")
.mode("overwrite")
.parquet("s3://bucket/output/")
df.repartition(F.year("date"), F.month("date"))
.write
.partitionBy("year", "month")
.mode("overwrite")
.parquet("s3://bucket/output/")
.write
.partitionBy("year", "month")
.mode("overwrite")
.parquet("s3://bucket/output/")
Bucketing for repeated joins
Bucketing for repeated joins
df.write
.bucketBy(256, "user_id")
.sortBy("user_id")
.saveAsTable("bucketed_events")
.bucketBy(256, "user_id")
.sortBy("user_id")
.saveAsTable("bucketed_events")
undefineddf.write
.bucketBy(256, "user_id")
.sortBy("user_id")
.saveAsTable("bucketed_events")
.bucketBy(256, "user_id")
.sortBy("user_id")
.saveAsTable("bucketed_events")
undefined3. Join Optimization Strategies
3. 连接优化策略
python
from pyspark.sql import functions as Fpython
from pyspark.sql import functions as FBroadcast join (small table < 10MB default, configurable to 100MB)
Broadcast join (small table < 10MB default, configurable to 100MB)
small_df = spark.read.parquet("s3://bucket/dim_product/") # 5MB
large_df = spark.read.parquet("s3://bucket/fact_sales/") # 500GB
small_df = spark.read.parquet("s3://bucket/dim_product/") # 5MB
large_df = spark.read.parquet("s3://bucket/fact_sales/") # 500GB
Explicit broadcast hint
Explicit broadcast hint
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "product_id")
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "product_id")
Increase broadcast threshold
Increase broadcast threshold
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 100 * 1024 * 1024) # 100MB
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 100 * 1024 * 1024) # 100MB
Sort-Merge Join (for large tables)
Sort-Merge Join (for large tables)
Both tables sorted and partitioned by join key
Both tables sorted and partitioned by join key
users = spark.read.parquet("users/").repartition(200, "user_id").sortWithinPartitions("user_id")
orders = spark.read.parquet("orders/").repartition(200, "user_id").sortWithinPartitions("user_id")
result = users.join(orders, "user_id")
users = spark.read.parquet("users/").repartition(200, "user_id").sortWithinPartitions("user_id")
orders = spark.read.parquet("orders/").repartition(200, "user_id").sortWithinPartitions("user_id")
result = users.join(orders, "user_id")
Skewed join handling (salting technique)
Skewed join handling (salting technique)
If user_id has skew (some users have millions of rows)
If user_id has skew (some users have millions of rows)
salt_range = 10
salted_users = (users
.withColumn("salt", F.explode(F.array([F.lit(i) for i in range(salt_range)])))
.withColumn("salted_key", F.concat("user_id", F.lit("_"), "salt")))
salted_orders = (orders
.withColumn("salt", (F.rand() * salt_range).cast("int"))
.withColumn("salted_key", F.concat("user_id", F.lit("_"), "salt")))
result = salted_users.join(salted_orders, "salted_key").drop("salt", "salted_key")
undefinedsalt_range = 10
salted_users = (users
.withColumn("salt", F.explode(F.array([F.lit(i) for i in range(salt_range)])))
.withColumn("salted_key", F.concat("user_id", F.lit("_"), "salt")))
salted_orders = (orders
.withColumn("salt", (F.rand() * salt_range).cast("int"))
.withColumn("salted_key", F.concat("user_id", F.lit("_"), "salt")))
result = salted_users.join(salted_orders, "salted_key").drop("salt", "salted_key")
undefined4. Caching & Persistence
4. 缓存与持久化
python
from pyspark import StorageLevelpython
from pyspark import StorageLevelCaching strategies
Caching strategies
df.cache() # MEMORY_AND_DISK by default in Spark 3.x
df.persist(StorageLevel.MEMORY_ONLY) # Fastest, may recompute if evicted
df.persist(StorageLevel.MEMORY_AND_DISK_SER) # Compressed, slower but less memory
df.persist(StorageLevel.DISK_ONLY) # For very large intermediate datasets
df.cache() # MEMORY_AND_DISK by default in Spark 3.x
df.persist(StorageLevel.MEMORY_ONLY) # Fastest, may recompute if evicted
df.persist(StorageLevel.MEMORY_AND_DISK_SER) # Compressed, slower but less memory
df.persist(StorageLevel.DISK_ONLY) # For very large intermediate datasets
When to cache:
When to cache:
- Reused DataFrames (used in multiple actions)
- Reused DataFrames (used in multiple actions)
- After expensive transformations (joins, aggregations)
- After expensive transformations (joins, aggregations)
- Before iterative algorithms
- Before iterative algorithms
Cache usage pattern
Cache usage pattern
expensive_df = (spark.read.parquet("s3://bucket/large/")
.filter(F.col("status") == "active")
.join(broadcast(dim_df), "dim_key")
.groupBy("category")
.agg(F.sum("amount").alias("total")))
expensive_df.cache()
expensive_df.count() # Materialize cache
expensive_df = (spark.read.parquet("s3://bucket/large/")
.filter(F.col("status") == "active")
.join(broadcast(dim_df), "dim_key")
.groupBy("category")
.agg(F.sum("amount").alias("total")))
expensive_df.cache()
expensive_df.count() # Materialize cache
Use cached DataFrame multiple times
Use cached DataFrame multiple times
top_categories = expensive_df.orderBy(F.desc("total")).limit(10)
summary = expensive_df.agg(F.avg("total"), F.max("total"))
top_categories = expensive_df.orderBy(F.desc("total")).limit(10)
summary = expensive_df.agg(F.avg("total"), F.max("total"))
Release cache when done
Release cache when done
expensive_df.unpersist()
undefinedexpensive_df.unpersist()
undefined5. Structured Streaming
5. 结构化流处理
python
from pyspark.sql import functions as F
from pyspark.sql.types import *python
from pyspark.sql import functions as F
from pyspark.sql.types import *Read from Kafka
Read from Kafka
kafka_df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
.option("subscribe", "events")
.option("startingOffsets", "latest")
.option("maxOffsetsPerTrigger", 100000)
.load())
kafka_df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
.option("subscribe", "events")
.option("startingOffsets", "latest")
.option("maxOffsetsPerTrigger", 100000)
.load())
Parse JSON payload
Parse JSON payload
event_schema = StructType([
StructField("event_id", StringType()),
StructField("user_id", LongType()),
StructField("event_type", StringType()),
StructField("timestamp", TimestampType())
])
parsed_df = (kafka_df
.select(F.from_json(F.col("value").cast("string"), event_schema).alias("data"))
.select("data.*")
.withWatermark("timestamp", "10 minutes"))
event_schema = StructType([
StructField("event_id", StringType()),
StructField("user_id", LongType()),
StructField("event_type", StringType()),
StructField("timestamp", TimestampType())
])
parsed_df = (kafka_df
.select(F.from_json(F.col("value").cast("string"), event_schema).alias("data"))
.select("data.*")
.withWatermark("timestamp", "10 minutes"))
Windowed aggregation
Windowed aggregation
windowed_counts = (parsed_df
.groupBy(
F.window("timestamp", "5 minutes", "1 minute"),
"event_type"
)
.count())
windowed_counts = (parsed_df
.groupBy(
F.window("timestamp", "5 minutes", "1 minute"),
"event_type"
)
.count())
Write to Delta Lake with checkpointing
Write to Delta Lake with checkpointing
query = (windowed_counts.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "s3://bucket/checkpoints/events/")
.trigger(processingTime="1 minute")
.start("s3://bucket/streaming_output/"))
query = (windowed_counts.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "s3://bucket/checkpoints/events/")
.trigger(processingTime="1 minute")
.start("s3://bucket/streaming_output/"))
Monitor stream
Monitor stream
query.awaitTermination()
undefinedquery.awaitTermination()
undefinedTools & Technologies
工具与技术
| Tool | Purpose | Version (2025) |
|---|---|---|
| Apache Spark | Distributed processing | 3.5+ |
| Delta Lake | ACID transactions | 3.0+ |
| Apache Iceberg | Table format | 1.4+ |
| Apache Flink | Stream processing | 1.18+ |
| Databricks | Managed Spark platform | Latest |
| AWS EMR | Managed Hadoop/Spark | 7.0+ |
| Trino | Interactive queries | 400+ |
| dbt | Transform layer | 1.7+ |
| 工具 | 用途 | 版本(2025年) |
|---|---|---|
| Apache Spark | 分布式处理 | 3.5+ |
| Delta Lake | ACID事务支持 | 3.0+ |
| Apache Iceberg | 表格式 | 1.4+ |
| Apache Flink | 流处理 | 1.18+ |
| Databricks | 托管Spark平台 | 最新版 |
| AWS EMR | 托管Hadoop/Spark | 7.0+ |
| Trino | 交互式查询 | 400+ |
| dbt | 数据转换层 | 1.7+ |
Learning Path
学习路径
Phase 1: Foundations (Weeks 1-3)
阶段1:基础(第1-3周)
Week 1: Distributed computing concepts, MapReduce
Week 2: Spark architecture, RDDs, DataFrames
Week 3: Spark SQL, basic transformations第1周:分布式计算概念、MapReduce
第2周:Spark架构、RDD、DataFrame
第3周:Spark SQL、基础转换操作Phase 2: Intermediate (Weeks 4-6)
阶段2:中级(第4-6周)
Week 4: Joins, aggregations, window functions
Week 5: Partitioning, bucketing, caching
Week 6: Performance tuning, Spark UI analysis第4周:连接操作、聚合、窗口函数
第5周:分区、分桶、缓存
第6周:性能调优、Spark UI分析Phase 3: Advanced (Weeks 7-10)
阶段3:高级(第7-10周)
Week 7: Structured Streaming
Week 8: Delta Lake / Iceberg table formats
Week 9: Cluster sizing, cost optimization
Week 10: Advanced optimizations (AQE, skew handling)第7周:结构化流处理
第8周:Delta Lake / Iceberg表格式
第9周:集群规模规划、成本优化
第10周:高级优化(AQE、倾斜处理)Phase 4: Production (Weeks 11-14)
阶段4:生产实践(第11-14周)
Week 11: Deployment on EMR/Databricks
Week 12: Monitoring, alerting, debugging
Week 13: CI/CD for Spark jobs
Week 14: Multi-cluster architectures第11周:在EMR/Databricks上部署
第12周:监控、告警、调试
第13周:Spark作业CI/CD
第14周:多集群架构Production Patterns
生产实践模式
Delta Lake UPSERT (Merge)
Delta Lake UPSERT(合并操作)
python
from delta.tables import DeltaTablepython
from delta.tables import DeltaTableIncremental UPSERT pattern
Incremental UPSERT pattern
delta_table = DeltaTable.forPath(spark, "s3://bucket/users/")
updates_df = spark.read.parquet("s3://bucket/updates/")
delta_table.alias("target").merge(
updates_df.alias("source"),
"target.user_id = source.user_id"
).whenMatchedUpdate(set={
"email": "source.email",
"updated_at": "source.updated_at"
}).whenNotMatchedInsertAll().execute()
delta_table = DeltaTable.forPath(spark, "s3://bucket/users/")
updates_df = spark.read.parquet("s3://bucket/updates/")
delta_table.alias("target").merge(
updates_df.alias("source"),
"target.user_id = source.user_id"
).whenMatchedUpdate(set={
"email": "source.email",
"updated_at": "source.updated_at"
}).whenNotMatchedInsertAll().execute()
Optimize after merge
Optimize after merge
delta_table.optimize().executeCompaction()
delta_table.vacuum(retentionHours=168) # 7 days
undefineddelta_table.optimize().executeCompaction()
delta_table.vacuum(retentionHours=168) # 7 days
undefinedCost-Effective Cluster Configuration
高性价比集群配置
python
undefinedpython
undefinedspark-submit configuration for 1TB processing job
spark-submit configuration for 1TB processing job
"""
spark-submit
--master yarn
--deploy-mode cluster
--num-executors 50
--executor-cores 4
--executor-memory 16g
--driver-memory 8g
--conf spark.sql.adaptive.enabled=true
--conf spark.sql.adaptive.coalescePartitions.enabled=true
--conf spark.sql.shuffle.partitions=400
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.minExecutors=10
--conf spark.dynamicAllocation.maxExecutors=100
--conf spark.speculation=true
job.py """
--master yarn
--deploy-mode cluster
--num-executors 50
--executor-cores 4
--executor-memory 16g
--driver-memory 8g
--conf spark.sql.adaptive.enabled=true
--conf spark.sql.adaptive.coalescePartitions.enabled=true
--conf spark.sql.shuffle.partitions=400
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.minExecutors=10
--conf spark.dynamicAllocation.maxExecutors=100
--conf spark.speculation=true
job.py """
"""
spark-submit
--master yarn
--deploy-mode cluster
--num-executors 50
--executor-cores 4
--executor-memory 16g
--driver-memory 8g
--conf spark.sql.adaptive.enabled=true
--conf spark.sql.adaptive.coalescePartitions.enabled=true
--conf spark.sql.shuffle.partitions=400
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.minExecutors=10
--conf spark.dynamicAllocation.maxExecutors=100
--conf spark.speculation=true
job.py """
--master yarn
--deploy-mode cluster
--num-executors 50
--executor-cores 4
--executor-memory 16g
--driver-memory 8g
--conf spark.sql.adaptive.enabled=true
--conf spark.sql.adaptive.coalescePartitions.enabled=true
--conf spark.sql.shuffle.partitions=400
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.minExecutors=10
--conf spark.dynamicAllocation.maxExecutors=100
--conf spark.speculation=true
job.py """
Sizing guidelines:
Sizing guidelines:
- Executor memory: 16-32GB (avoid GC overhead)
- Executor memory: 16-32GB (avoid GC overhead)
- Executor cores: 4-5 (parallelism per executor)
- Executor cores: 4-5 (parallelism per executor)
- Total cores: 2-4x data size in GB
- Total cores: 2-4x data size in GB
- Partitions: 2-4x total cores
- Partitions: 2-4x total cores
undefinedundefinedTroubleshooting Guide
故障排查指南
Common Failure Modes
常见故障模式
| Issue | Symptoms | Root Cause | Fix |
|---|---|---|---|
| OOM Error | "Container killed by YARN" | Too much data per partition | Increase partitions, reduce broadcast |
| Shuffle Spill | Slow stage, disk I/O | Insufficient memory | Increase |
| Skewed Tasks | One task much slower | Data skew on key | Use salting, AQE skew handling |
| GC Overhead | "GC overhead limit exceeded" | Too many small objects | Use Kryo serialization, reduce UDFs |
| Driver OOM | Driver crash | collect(), large broadcast | Avoid collect, stream results |
| 问题 | 症状 | 根本原因 | 解决方法 |
|---|---|---|---|
| 内存溢出错误 | "Container killed by YARN" | 单分区数据量过大 | 增加分区数、减少广播数据量 |
| Shuffle溢出 | 阶段执行缓慢、磁盘I/O高 | 内存不足 | 调整 |
| 任务倾斜 | 单个任务执行时间远长于其他任务 | 键值数据倾斜 | 使用加盐技术、AQE倾斜处理 |
| GC开销过大 | "GC overhead limit exceeded" | 小对象过多 | 使用Kryo序列化、减少UDF使用 |
| Driver内存溢出 | Driver崩溃 | 使用collect()、广播过大数据 | 避免collect操作、流式返回结果 |
Debug Checklist
调试检查清单
python
undefinedpython
undefined1. Check Spark UI (port 4040/18080)
1. Check Spark UI (port 4040/18080)
- Stages: Look for skewed tasks (max >> median)
- Stages: Look for skewed tasks (max >> median)
- Storage: Check cached data size
- Storage: Check cached data size
- Environment: Verify configuration
- Environment: Verify configuration
2. Analyze execution plan
2. Analyze execution plan
df.explain(mode="extended")
df.explain(mode="extended")
3. Check partition distribution
3. Check partition distribution
df.groupBy(F.spark_partition_id()).count().show()
df.groupBy(F.spark_partition_id()).count().show()
4. Profile data skew
4. Profile data skew
df.groupBy("key_column").count().orderBy(F.desc("count")).show(20)
df.groupBy("key_column").count().orderBy(F.desc("count")).show(20)
5. Monitor job metrics
5. Monitor job metrics
spark.sparkContext.setLogLevel("WARN")
spark.sparkContext.setLogLevel("WARN")
6. Enable detailed metrics
6. Enable detailed metrics
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.eventLog.enabled", "true")
undefinedspark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.eventLog.enabled", "true")
undefinedReading Spark UI
Spark UI解读
Stage Analysis:
├── Duration: Total time for stage
├── Tasks: Number of parallel tasks
│ ├── Median: Typical task duration
│ ├── Max: Slowest task (check for skew)
│ └── Failed: Retry count
├── Input: Data read
├── Shuffle Read: Data from other stages
├── Shuffle Write: Data for downstream stages
└── Spill: Disk spill (indicates memory pressure)
Key Metrics:
├── GC Time > 10%: Memory issue
├── Shuffle Write > Input: Exploding join
├── Max/Median > 2x: Data skew
└── Spill > 0: Increase partitions or memoryStage Analysis:
├── Duration: Total time for stage
├── Tasks: Number of parallel tasks
│ ├── Median: Typical task duration
│ ├── Max: Slowest task (check for skew)
│ └── Failed: Retry count
├── Input: Data read
├── Shuffle Read: Data from other stages
├── Shuffle Write: Data for downstream stages
└── Spill: Disk spill (indicates memory pressure)
Key Metrics:
├── GC Time > 10%: Memory issue
├── Shuffle Write > Input: Exploding join
├── Max/Median > 2x: Data skew
└── Spill > 0: Increase partitions or memoryUnit Test Template
单元测试模板
python
import pytest
from pyspark.sql import SparkSession
from chispa.dataframe_comparer import assert_df_equality
import pyspark.sql.functions as F
@pytest.fixture(scope="session")
def spark():
"""Session-scoped Spark for tests."""
return (SparkSession.builder
.master("local[2]")
.appName("UnitTests")
.config("spark.sql.shuffle.partitions", 2)
.getOrCreate())
@pytest.fixture
def sample_data(spark):
return spark.createDataFrame([
(1, "user1", 100.0),
(2, "user2", 200.0),
(3, "user1", 150.0),
], ["id", "user_id", "amount"])
class TestAggregations:
def test_user_totals(self, spark, sample_data):
# Arrange
expected = spark.createDataFrame([
("user1", 250.0),
("user2", 200.0),
], ["user_id", "total"])
# Act
result = sample_data.groupBy("user_id").agg(
F.sum("amount").alias("total")
)
# Assert
assert_df_equality(result, expected, ignore_row_order=True)
def test_handles_empty_dataframe(self, spark):
# Arrange
empty_df = spark.createDataFrame([], "id INT, amount DOUBLE")
# Act
result = empty_df.agg(F.sum("amount").alias("total")).collect()
# Assert
assert result[0]["total"] is None
def test_window_functions(self, spark, sample_data):
# Arrange
from pyspark.sql.window import Window
window = Window.partitionBy("user_id").orderBy("id")
# Act
result = sample_data.withColumn(
"running_total",
F.sum("amount").over(window)
).filter(F.col("user_id") == "user1")
# Assert
totals = [row["running_total"] for row in result.collect()]
assert totals == [100.0, 250.0]python
import pytest
from pyspark.sql import SparkSession
from chispa.dataframe_comparer import assert_df_equality
import pyspark.sql.functions as F
@pytest.fixture(scope="session")
def spark():
"""Session-scoped Spark for tests."""
return (SparkSession.builder
.master("local[2]")
.appName("UnitTests")
.config("spark.sql.shuffle.partitions", 2)
.getOrCreate())
@pytest.fixture
def sample_data(spark):
return spark.createDataFrame([
(1, "user1", 100.0),
(2, "user2", 200.0),
(3, "user1", 150.0),
], ["id", "user_id", "amount"])
class TestAggregations:
def test_user_totals(self, spark, sample_data):
# Arrange
expected = spark.createDataFrame([
("user1", 250.0),
("user2", 200.0),
], ["user_id", "total"])
# Act
result = sample_data.groupBy("user_id").agg(
F.sum("amount").alias("total")
)
# Assert
assert_df_equality(result, expected, ignore_row_order=True)
def test_handles_empty_dataframe(self, spark):
# Arrange
empty_df = spark.createDataFrame([], "id INT, amount DOUBLE")
# Act
result = empty_df.agg(F.sum("amount").alias("total")).collect()
# Assert
assert result[0]["total"] is None
def test_window_functions(self, spark, sample_data):
# Arrange
from pyspark.sql.window import Window
window = Window.partitionBy("user_id").orderBy("id")
# Act
result = sample_data.withColumn(
"running_total",
F.sum("amount").over(window)
).filter(F.col("user_id") == "user1")
# Assert
totals = [row["running_total"] for row in result.collect()]
assert totals == [100.0, 250.0]Best Practices
最佳实践
Performance
性能优化
python
undefinedpython
undefined✅ DO: Use DataFrame API over RDD
✅ 推荐:使用DataFrame API而非RDD
df.filter(F.col("status") == "active") # Catalyst optimized
df.filter(F.col("status") == "active") # Catalyst优化支持
❌ DON'T: Use RDD transformations
❌ 不推荐:使用RDD转换
rdd.filter(lambda x: x["status"] == "active") # No optimization
rdd.filter(lambda x: x["status"] == "active") # 无优化支持
✅ DO: Use built-in functions
✅ 推荐:使用内置函数
df.withColumn("upper_name", F.upper("name"))
df.withColumn("upper_name", F.upper("name"))
❌ DON'T: Use Python UDFs (slow serialization)
❌ 不推荐:使用Python UDF(序列化缓慢)
@udf
def upper_name(name):
return name.upper()
@udf
def upper_name(name):
return name.upper()
✅ DO: Broadcast small lookups
✅ 推荐:广播小表关联
df.join(broadcast(small_df), "key")
df.join(broadcast(small_df), "key")
✅ DO: Persist wisely
✅ 推荐:合理使用持久化
intermediate.cache()
intermediate.count() # Force materialization
intermediate.cache()
intermediate.count() # 触发缓存物化
... use intermediate multiple times ...
... 多次使用intermediate ...
intermediate.unpersist()
undefinedintermediate.unpersist()
undefinedCode Organization
代码组织
python
undefinedpython
undefined✅ DO: Chain transformations fluently
✅ 推荐:链式调用转换操作
result = (df
.filter(condition)
.withColumn("new_col", F.expr("..."))
.groupBy("key")
.agg(F.sum("value")))
result = (df
.filter(condition)
.withColumn("new_col", F.expr("..."))
.groupBy("key")
.agg(F.sum("value")))
✅ DO: Use descriptive column aliases
✅ 推荐:使用描述性列别名
.agg(
F.count("*").alias("event_count"),
F.avg("amount").alias("avg_amount")
)
.agg(
F.count("*").alias("event_count"),
F.avg("amount").alias("avg_amount")
)
✅ DO: Parameterize for reusability
✅ 推荐:参数化实现复用
def add_date_features(df, date_col):
return (df
.withColumn("year", F.year(date_col))
.withColumn("month", F.month(date_col))
.withColumn("day_of_week", F.dayofweek(date_col)))
undefineddef add_date_features(df, date_col):
return (df
.withColumn("year", F.year(date_col))
.withColumn("month", F.month(date_col))
.withColumn("day_of_week", F.dayofweek(date_col)))
undefinedResources
参考资源
Official Documentation
官方文档
Performance Tuning
性能调优
Books
书籍
- "Learning Spark 2nd Edition" by Damji et al.
- "Spark: The Definitive Guide" by Chambers & Zaharia
- "High Performance Spark" by Karau & Warren
- 《Learning Spark 第二版》Damji等著
- 《Spark权威指南》Chambers & Zaharia著
- 《高性能Spark》Karau & Warren著
Next Skills
后续技能
After mastering Big Data:
- → - Design dimensional models
data-warehousing - → - Deploy ML at scale
mlops - → - Real-time with Flink/Kafka
streaming - → - AWS EMR, Databricks
cloud-platforms
Skill Certification Checklist:
- Can optimize Spark jobs using EXPLAIN and Spark UI
- Can implement efficient joins with broadcast and bucketing
- Can handle data skew with salting techniques
- Can build streaming pipelines with Structured Streaming
- Can use Delta Lake for ACID operations
掌握大数据技术后可进阶:
- → - 设计维度模型
data-warehousing - → - 大规模部署机器学习
mlops - → - 使用Flink/Kafka实现实时处理
streaming - → - AWS EMR、Databricks平台
cloud-platforms
技能认证清单:
- 能够使用EXPLAIN和Spark UI优化Spark作业
- 能够实现基于广播和分桶的高效关联
- 能够使用加盐技术处理数据倾斜
- 能够基于结构化流处理构建流处理管道
- 能够使用Delta Lake实现ACID操作