big-data

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Big Data & Distributed Computing

大数据与分布式计算

Production-grade big data processing with Apache Spark, distributed systems patterns, and petabyte-scale data engineering.
基于Apache Spark、分布式系统模式的生产级大数据处理,以及PB级数据工程实践。

Quick Start

快速开始

python
undefined
python
undefined

PySpark 3.5+ modern DataFrame API

PySpark 3.5+ modern DataFrame API

from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql.window import Window
from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql.window import Window

Initialize Spark with optimal settings

Initialize Spark with optimal settings

spark = (SparkSession.builder .appName("ProductionETL") .config("spark.sql.adaptive.enabled", "true") .config("spark.sql.adaptive.coalescePartitions.enabled", "true") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .getOrCreate())
spark = (SparkSession.builder .appName("ProductionETL") .config("spark.sql.adaptive.enabled", "true") .config("spark.sql.adaptive.coalescePartitions.enabled", "true") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .getOrCreate())

Efficient data loading with schema enforcement

Efficient data loading with schema enforcement

from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType
schema = StructType([ StructField("event_id", StringType(), False), StructField("user_id", LongType(), False), StructField("event_type", StringType(), False), StructField("timestamp", TimestampType(), False), StructField("properties", StringType(), True) ])
df = (spark.read .schema(schema) .parquet("s3://bucket/events/") .filter(F.col("timestamp") >= F.current_date() - 30))
from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType
schema = StructType([ StructField("event_id", StringType(), False), StructField("user_id", LongType(), False), StructField("event_type", StringType(), False), StructField("timestamp", TimestampType(), False), StructField("properties", StringType(), True) ])
df = (spark.read .schema(schema) .parquet("s3://bucket/events/") .filter(F.col("timestamp") >= F.current_date() - 30))

Complex aggregation with window functions

Complex aggregation with window functions

window_spec = Window.partitionBy("user_id").orderBy("timestamp")
result = (df .withColumn("event_rank", F.row_number().over(window_spec)) .withColumn("session_id", F.sum( F.when( F.col("timestamp") - F.lag("timestamp").over(window_spec) > F.expr("INTERVAL 30 MINUTES"), 1 ).otherwise(0) ).over(window_spec)) .groupBy("user_id", "session_id") .agg( F.count("*").alias("event_count"), F.min("timestamp").alias("session_start"), F.max("timestamp").alias("session_end") ))
result.write.mode("overwrite").parquet("s3://bucket/sessions/")
undefined
window_spec = Window.partitionBy("user_id").orderBy("timestamp")
result = (df .withColumn("event_rank", F.row_number().over(window_spec)) .withColumn("session_id", F.sum( F.when( F.col("timestamp") - F.lag("timestamp").over(window_spec) > F.expr("INTERVAL 30 MINUTES"), 1 ).otherwise(0) ).over(window_spec)) .groupBy("user_id", "session_id") .agg( F.count("*").alias("event_count"), F.min("timestamp").alias("session_start"), F.max("timestamp").alias("session_end") ))
result.write.mode("overwrite").parquet("s3://bucket/sessions/")
undefined

Core Concepts

核心概念

1. Spark Architecture Deep Dive

1. Spark架构深度解析

┌─────────────────────────────────────────────────────────┐
│                    Driver Program                        │
│  ┌─────────────────────────────────────────────────┐   │
│  │              SparkContext/SparkSession           │   │
│  │  - Creates execution plan (DAG)                  │   │
│  │  - Coordinates with Cluster Manager              │   │
│  │  - Schedules tasks                               │   │
│  └─────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│              Cluster Manager (YARN/K8s/Standalone)      │
└─────────────────────────────────────────────────────────┘
          ┌────────────────┼────────────────┐
          ▼                ▼                ▼
    ┌──────────┐    ┌──────────┐    ┌──────────┐
    │ Executor │    │ Executor │    │ Executor │
    │ ┌──────┐ │    │ ┌──────┐ │    │ ┌──────┐ │
    │ │Task 1│ │    │ │Task 2│ │    │ │Task 3│ │
    │ │Task 4│ │    │ │Task 5│ │    │ │Task 6│ │
    │ └──────┘ │    │ └──────┘ │    │ └──────┘ │
    │  Cache   │    │  Cache   │    │  Cache   │
    └──────────┘    └──────────┘    └──────────┘
┌─────────────────────────────────────────────────────────┐
│                    Driver Program                        │
│  ┌─────────────────────────────────────────────────┐   │
│  │              SparkContext/SparkSession           │   │
│  │  - Creates execution plan (DAG)                  │   │
│  │  - Coordinates with Cluster Manager              │   │
│  │  - Schedules tasks                               │   │
│  └─────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│              Cluster Manager (YARN/K8s/Standalone)      │
└─────────────────────────────────────────────────────────┘
          ┌────────────────┼────────────────┐
          ▼                ▼                ▼
    ┌──────────┐    ┌──────────┐    ┌──────────┐
    │ Executor │    │ Executor │    │ Executor │
    │ ┌──────┐ │    │ ┌──────┐ │    │ ┌──────┐ │
    │ │Task 1│ │    │ │Task 2│ │    │ │Task 3│ │
    │ │Task 4│ │    │ │Task 5│ │    │ │Task 6│ │
    │ └──────┘ │    │ └──────┘ │    │ └──────┘ │
    │  Cache   │    │  Cache   │    │  Cache   │
    └──────────┘    └──────────┘    └──────────┘

2. Partition Optimization

2. 分区优化

python
from pyspark.sql import functions as F
python
from pyspark.sql import functions as F

Check current partitioning

Check current partitioning

print(f"Partitions: {df.rdd.getNumPartitions()}")
print(f"Partitions: {df.rdd.getNumPartitions()}")

Rule of thumb: 128MB per partition, 2-4 partitions per core

Rule of thumb: 128MB per partition, 2-4 partitions per core

For 100GB data on 10 executors with 4 cores each:

For 100GB data on 10 executors with 4 cores each:

100GB / 128MB ≈ 800 partitions, or 40 cores * 4 = 160 partitions

100GB / 128MB ≈ 800 partitions, or 40 cores * 4 = 160 partitions

Use: 200-400 partitions

Use: 200-400 partitions

Repartition by key (for joins)

Repartition by key (for joins)

df_repartitioned = df.repartition(200, "user_id")
df_repartitioned = df.repartition(200, "user_id")

Coalesce (reduce partitions without shuffle)

Coalesce (reduce partitions without shuffle)

df_coalesced = df.coalesce(100)
df_coalesced = df.coalesce(100)

Optimal write partitioning

Optimal write partitioning

df.repartition(F.year("date"), F.month("date"))
.write
.partitionBy("year", "month")
.mode("overwrite")
.parquet("s3://bucket/output/")
df.repartition(F.year("date"), F.month("date"))
.write
.partitionBy("year", "month")
.mode("overwrite")
.parquet("s3://bucket/output/")

Bucketing for repeated joins

Bucketing for repeated joins

df.write
.bucketBy(256, "user_id")
.sortBy("user_id")
.saveAsTable("bucketed_events")
undefined
df.write
.bucketBy(256, "user_id")
.sortBy("user_id")
.saveAsTable("bucketed_events")
undefined

3. Join Optimization Strategies

3. 连接优化策略

python
from pyspark.sql import functions as F
python
from pyspark.sql import functions as F

Broadcast join (small table < 10MB default, configurable to 100MB)

Broadcast join (small table < 10MB default, configurable to 100MB)

small_df = spark.read.parquet("s3://bucket/dim_product/") # 5MB large_df = spark.read.parquet("s3://bucket/fact_sales/") # 500GB
small_df = spark.read.parquet("s3://bucket/dim_product/") # 5MB large_df = spark.read.parquet("s3://bucket/fact_sales/") # 500GB

Explicit broadcast hint

Explicit broadcast hint

from pyspark.sql.functions import broadcast result = large_df.join(broadcast(small_df), "product_id")
from pyspark.sql.functions import broadcast result = large_df.join(broadcast(small_df), "product_id")

Increase broadcast threshold

Increase broadcast threshold

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 100 * 1024 * 1024) # 100MB
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 100 * 1024 * 1024) # 100MB

Sort-Merge Join (for large tables)

Sort-Merge Join (for large tables)

Both tables sorted and partitioned by join key

Both tables sorted and partitioned by join key

users = spark.read.parquet("users/").repartition(200, "user_id").sortWithinPartitions("user_id") orders = spark.read.parquet("orders/").repartition(200, "user_id").sortWithinPartitions("user_id") result = users.join(orders, "user_id")
users = spark.read.parquet("users/").repartition(200, "user_id").sortWithinPartitions("user_id") orders = spark.read.parquet("orders/").repartition(200, "user_id").sortWithinPartitions("user_id") result = users.join(orders, "user_id")

Skewed join handling (salting technique)

Skewed join handling (salting technique)

If user_id has skew (some users have millions of rows)

If user_id has skew (some users have millions of rows)

salt_range = 10 salted_users = (users .withColumn("salt", F.explode(F.array([F.lit(i) for i in range(salt_range)]))) .withColumn("salted_key", F.concat("user_id", F.lit("_"), "salt")))
salted_orders = (orders .withColumn("salt", (F.rand() * salt_range).cast("int")) .withColumn("salted_key", F.concat("user_id", F.lit("_"), "salt")))
result = salted_users.join(salted_orders, "salted_key").drop("salt", "salted_key")
undefined
salt_range = 10 salted_users = (users .withColumn("salt", F.explode(F.array([F.lit(i) for i in range(salt_range)]))) .withColumn("salted_key", F.concat("user_id", F.lit("_"), "salt")))
salted_orders = (orders .withColumn("salt", (F.rand() * salt_range).cast("int")) .withColumn("salted_key", F.concat("user_id", F.lit("_"), "salt")))
result = salted_users.join(salted_orders, "salted_key").drop("salt", "salted_key")
undefined

4. Caching & Persistence

4. 缓存与持久化

python
from pyspark import StorageLevel
python
from pyspark import StorageLevel

Caching strategies

Caching strategies

df.cache() # MEMORY_AND_DISK by default in Spark 3.x df.persist(StorageLevel.MEMORY_ONLY) # Fastest, may recompute if evicted df.persist(StorageLevel.MEMORY_AND_DISK_SER) # Compressed, slower but less memory df.persist(StorageLevel.DISK_ONLY) # For very large intermediate datasets
df.cache() # MEMORY_AND_DISK by default in Spark 3.x df.persist(StorageLevel.MEMORY_ONLY) # Fastest, may recompute if evicted df.persist(StorageLevel.MEMORY_AND_DISK_SER) # Compressed, slower but less memory df.persist(StorageLevel.DISK_ONLY) # For very large intermediate datasets

When to cache:

When to cache:

- Reused DataFrames (used in multiple actions)

- Reused DataFrames (used in multiple actions)

- After expensive transformations (joins, aggregations)

- After expensive transformations (joins, aggregations)

- Before iterative algorithms

- Before iterative algorithms

Cache usage pattern

Cache usage pattern

expensive_df = (spark.read.parquet("s3://bucket/large/") .filter(F.col("status") == "active") .join(broadcast(dim_df), "dim_key") .groupBy("category") .agg(F.sum("amount").alias("total")))
expensive_df.cache() expensive_df.count() # Materialize cache
expensive_df = (spark.read.parquet("s3://bucket/large/") .filter(F.col("status") == "active") .join(broadcast(dim_df), "dim_key") .groupBy("category") .agg(F.sum("amount").alias("total")))
expensive_df.cache() expensive_df.count() # Materialize cache

Use cached DataFrame multiple times

Use cached DataFrame multiple times

top_categories = expensive_df.orderBy(F.desc("total")).limit(10) summary = expensive_df.agg(F.avg("total"), F.max("total"))
top_categories = expensive_df.orderBy(F.desc("total")).limit(10) summary = expensive_df.agg(F.avg("total"), F.max("total"))

Release cache when done

Release cache when done

expensive_df.unpersist()
undefined
expensive_df.unpersist()
undefined

5. Structured Streaming

5. 结构化流处理

python
from pyspark.sql import functions as F
from pyspark.sql.types import *
python
from pyspark.sql import functions as F
from pyspark.sql.types import *

Read from Kafka

Read from Kafka

kafka_df = (spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") .option("subscribe", "events") .option("startingOffsets", "latest") .option("maxOffsetsPerTrigger", 100000) .load())
kafka_df = (spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") .option("subscribe", "events") .option("startingOffsets", "latest") .option("maxOffsetsPerTrigger", 100000) .load())

Parse JSON payload

Parse JSON payload

event_schema = StructType([ StructField("event_id", StringType()), StructField("user_id", LongType()), StructField("event_type", StringType()), StructField("timestamp", TimestampType()) ])
parsed_df = (kafka_df .select(F.from_json(F.col("value").cast("string"), event_schema).alias("data")) .select("data.*") .withWatermark("timestamp", "10 minutes"))
event_schema = StructType([ StructField("event_id", StringType()), StructField("user_id", LongType()), StructField("event_type", StringType()), StructField("timestamp", TimestampType()) ])
parsed_df = (kafka_df .select(F.from_json(F.col("value").cast("string"), event_schema).alias("data")) .select("data.*") .withWatermark("timestamp", "10 minutes"))

Windowed aggregation

Windowed aggregation

windowed_counts = (parsed_df .groupBy( F.window("timestamp", "5 minutes", "1 minute"), "event_type" ) .count())
windowed_counts = (parsed_df .groupBy( F.window("timestamp", "5 minutes", "1 minute"), "event_type" ) .count())

Write to Delta Lake with checkpointing

Write to Delta Lake with checkpointing

query = (windowed_counts.writeStream .format("delta") .outputMode("append") .option("checkpointLocation", "s3://bucket/checkpoints/events/") .trigger(processingTime="1 minute") .start("s3://bucket/streaming_output/"))
query = (windowed_counts.writeStream .format("delta") .outputMode("append") .option("checkpointLocation", "s3://bucket/checkpoints/events/") .trigger(processingTime="1 minute") .start("s3://bucket/streaming_output/"))

Monitor stream

Monitor stream

query.awaitTermination()
undefined
query.awaitTermination()
undefined

Tools & Technologies

工具与技术

ToolPurposeVersion (2025)
Apache SparkDistributed processing3.5+
Delta LakeACID transactions3.0+
Apache IcebergTable format1.4+
Apache FlinkStream processing1.18+
DatabricksManaged Spark platformLatest
AWS EMRManaged Hadoop/Spark7.0+
TrinoInteractive queries400+
dbtTransform layer1.7+
工具用途版本(2025年)
Apache Spark分布式处理3.5+
Delta LakeACID事务支持3.0+
Apache Iceberg表格式1.4+
Apache Flink流处理1.18+
Databricks托管Spark平台最新版
AWS EMR托管Hadoop/Spark7.0+
Trino交互式查询400+
dbt数据转换层1.7+

Learning Path

学习路径

Phase 1: Foundations (Weeks 1-3)

阶段1:基础(第1-3周)

Week 1: Distributed computing concepts, MapReduce
Week 2: Spark architecture, RDDs, DataFrames
Week 3: Spark SQL, basic transformations
第1周:分布式计算概念、MapReduce
第2周:Spark架构、RDD、DataFrame
第3周:Spark SQL、基础转换操作

Phase 2: Intermediate (Weeks 4-6)

阶段2:中级(第4-6周)

Week 4: Joins, aggregations, window functions
Week 5: Partitioning, bucketing, caching
Week 6: Performance tuning, Spark UI analysis
第4周:连接操作、聚合、窗口函数
第5周:分区、分桶、缓存
第6周:性能调优、Spark UI分析

Phase 3: Advanced (Weeks 7-10)

阶段3:高级(第7-10周)

Week 7: Structured Streaming
Week 8: Delta Lake / Iceberg table formats
Week 9: Cluster sizing, cost optimization
Week 10: Advanced optimizations (AQE, skew handling)
第7周:结构化流处理
第8周:Delta Lake / Iceberg表格式
第9周:集群规模规划、成本优化
第10周:高级优化(AQE、倾斜处理)

Phase 4: Production (Weeks 11-14)

阶段4:生产实践(第11-14周)

Week 11: Deployment on EMR/Databricks
Week 12: Monitoring, alerting, debugging
Week 13: CI/CD for Spark jobs
Week 14: Multi-cluster architectures
第11周:在EMR/Databricks上部署
第12周:监控、告警、调试
第13周:Spark作业CI/CD
第14周:多集群架构

Production Patterns

生产实践模式

Delta Lake UPSERT (Merge)

Delta Lake UPSERT(合并操作)

python
from delta.tables import DeltaTable
python
from delta.tables import DeltaTable

Incremental UPSERT pattern

Incremental UPSERT pattern

delta_table = DeltaTable.forPath(spark, "s3://bucket/users/") updates_df = spark.read.parquet("s3://bucket/updates/")
delta_table.alias("target").merge( updates_df.alias("source"), "target.user_id = source.user_id" ).whenMatchedUpdate(set={ "email": "source.email", "updated_at": "source.updated_at" }).whenNotMatchedInsertAll().execute()
delta_table = DeltaTable.forPath(spark, "s3://bucket/users/") updates_df = spark.read.parquet("s3://bucket/updates/")
delta_table.alias("target").merge( updates_df.alias("source"), "target.user_id = source.user_id" ).whenMatchedUpdate(set={ "email": "source.email", "updated_at": "source.updated_at" }).whenNotMatchedInsertAll().execute()

Optimize after merge

Optimize after merge

delta_table.optimize().executeCompaction() delta_table.vacuum(retentionHours=168) # 7 days
undefined
delta_table.optimize().executeCompaction() delta_table.vacuum(retentionHours=168) # 7 days
undefined

Cost-Effective Cluster Configuration

高性价比集群配置

python
undefined
python
undefined

spark-submit configuration for 1TB processing job

spark-submit configuration for 1TB processing job

""" spark-submit
--master yarn
--deploy-mode cluster
--num-executors 50
--executor-cores 4
--executor-memory 16g
--driver-memory 8g
--conf spark.sql.adaptive.enabled=true
--conf spark.sql.adaptive.coalescePartitions.enabled=true
--conf spark.sql.shuffle.partitions=400
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.minExecutors=10
--conf spark.dynamicAllocation.maxExecutors=100
--conf spark.speculation=true
job.py """
""" spark-submit
--master yarn
--deploy-mode cluster
--num-executors 50
--executor-cores 4
--executor-memory 16g
--driver-memory 8g
--conf spark.sql.adaptive.enabled=true
--conf spark.sql.adaptive.coalescePartitions.enabled=true
--conf spark.sql.shuffle.partitions=400
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.minExecutors=10
--conf spark.dynamicAllocation.maxExecutors=100
--conf spark.speculation=true
job.py """

Sizing guidelines:

Sizing guidelines:

- Executor memory: 16-32GB (avoid GC overhead)

- Executor memory: 16-32GB (avoid GC overhead)

- Executor cores: 4-5 (parallelism per executor)

- Executor cores: 4-5 (parallelism per executor)

- Total cores: 2-4x data size in GB

- Total cores: 2-4x data size in GB

- Partitions: 2-4x total cores

- Partitions: 2-4x total cores

undefined
undefined

Troubleshooting Guide

故障排查指南

Common Failure Modes

常见故障模式

IssueSymptomsRoot CauseFix
OOM Error"Container killed by YARN"Too much data per partitionIncrease partitions, reduce broadcast
Shuffle SpillSlow stage, disk I/OInsufficient memoryIncrease
spark.memory.fraction
Skewed TasksOne task much slowerData skew on keyUse salting, AQE skew handling
GC Overhead"GC overhead limit exceeded"Too many small objectsUse Kryo serialization, reduce UDFs
Driver OOMDriver crashcollect(), large broadcastAvoid collect, stream results
问题症状根本原因解决方法
内存溢出错误"Container killed by YARN"单分区数据量过大增加分区数、减少广播数据量
Shuffle溢出阶段执行缓慢、磁盘I/O高内存不足调整
spark.memory.fraction
参数
任务倾斜单个任务执行时间远长于其他任务键值数据倾斜使用加盐技术、AQE倾斜处理
GC开销过大"GC overhead limit exceeded"小对象过多使用Kryo序列化、减少UDF使用
Driver内存溢出Driver崩溃使用collect()、广播过大数据避免collect操作、流式返回结果

Debug Checklist

调试检查清单

python
undefined
python
undefined

1. Check Spark UI (port 4040/18080)

1. Check Spark UI (port 4040/18080)

- Stages: Look for skewed tasks (max >> median)

- Stages: Look for skewed tasks (max >> median)

- Storage: Check cached data size

- Storage: Check cached data size

- Environment: Verify configuration

- Environment: Verify configuration

2. Analyze execution plan

2. Analyze execution plan

df.explain(mode="extended")
df.explain(mode="extended")

3. Check partition distribution

3. Check partition distribution

df.groupBy(F.spark_partition_id()).count().show()
df.groupBy(F.spark_partition_id()).count().show()

4. Profile data skew

4. Profile data skew

df.groupBy("key_column").count().orderBy(F.desc("count")).show(20)
df.groupBy("key_column").count().orderBy(F.desc("count")).show(20)

5. Monitor job metrics

5. Monitor job metrics

spark.sparkContext.setLogLevel("WARN")
spark.sparkContext.setLogLevel("WARN")

6. Enable detailed metrics

6. Enable detailed metrics

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") spark.conf.set("spark.eventLog.enabled", "true")
undefined
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") spark.conf.set("spark.eventLog.enabled", "true")
undefined

Reading Spark UI

Spark UI解读

Stage Analysis:
├── Duration: Total time for stage
├── Tasks: Number of parallel tasks
│   ├── Median: Typical task duration
│   ├── Max: Slowest task (check for skew)
│   └── Failed: Retry count
├── Input: Data read
├── Shuffle Read: Data from other stages
├── Shuffle Write: Data for downstream stages
└── Spill: Disk spill (indicates memory pressure)

Key Metrics:
├── GC Time > 10%: Memory issue
├── Shuffle Write > Input: Exploding join
├── Max/Median > 2x: Data skew
└── Spill > 0: Increase partitions or memory
Stage Analysis:
├── Duration: Total time for stage
├── Tasks: Number of parallel tasks
│   ├── Median: Typical task duration
│   ├── Max: Slowest task (check for skew)
│   └── Failed: Retry count
├── Input: Data read
├── Shuffle Read: Data from other stages
├── Shuffle Write: Data for downstream stages
└── Spill: Disk spill (indicates memory pressure)

Key Metrics:
├── GC Time > 10%: Memory issue
├── Shuffle Write > Input: Exploding join
├── Max/Median > 2x: Data skew
└── Spill > 0: Increase partitions or memory

Unit Test Template

单元测试模板

python
import pytest
from pyspark.sql import SparkSession
from chispa.dataframe_comparer import assert_df_equality
import pyspark.sql.functions as F

@pytest.fixture(scope="session")
def spark():
    """Session-scoped Spark for tests."""
    return (SparkSession.builder
        .master("local[2]")
        .appName("UnitTests")
        .config("spark.sql.shuffle.partitions", 2)
        .getOrCreate())

@pytest.fixture
def sample_data(spark):
    return spark.createDataFrame([
        (1, "user1", 100.0),
        (2, "user2", 200.0),
        (3, "user1", 150.0),
    ], ["id", "user_id", "amount"])

class TestAggregations:

    def test_user_totals(self, spark, sample_data):
        # Arrange
        expected = spark.createDataFrame([
            ("user1", 250.0),
            ("user2", 200.0),
        ], ["user_id", "total"])

        # Act
        result = sample_data.groupBy("user_id").agg(
            F.sum("amount").alias("total")
        )

        # Assert
        assert_df_equality(result, expected, ignore_row_order=True)

    def test_handles_empty_dataframe(self, spark):
        # Arrange
        empty_df = spark.createDataFrame([], "id INT, amount DOUBLE")

        # Act
        result = empty_df.agg(F.sum("amount").alias("total")).collect()

        # Assert
        assert result[0]["total"] is None

    def test_window_functions(self, spark, sample_data):
        # Arrange
        from pyspark.sql.window import Window
        window = Window.partitionBy("user_id").orderBy("id")

        # Act
        result = sample_data.withColumn(
            "running_total",
            F.sum("amount").over(window)
        ).filter(F.col("user_id") == "user1")

        # Assert
        totals = [row["running_total"] for row in result.collect()]
        assert totals == [100.0, 250.0]
python
import pytest
from pyspark.sql import SparkSession
from chispa.dataframe_comparer import assert_df_equality
import pyspark.sql.functions as F

@pytest.fixture(scope="session")
def spark():
    """Session-scoped Spark for tests."""
    return (SparkSession.builder
        .master("local[2]")
        .appName("UnitTests")
        .config("spark.sql.shuffle.partitions", 2)
        .getOrCreate())

@pytest.fixture
def sample_data(spark):
    return spark.createDataFrame([
        (1, "user1", 100.0),
        (2, "user2", 200.0),
        (3, "user1", 150.0),
    ], ["id", "user_id", "amount"])

class TestAggregations:

    def test_user_totals(self, spark, sample_data):
        # Arrange
        expected = spark.createDataFrame([
            ("user1", 250.0),
            ("user2", 200.0),
        ], ["user_id", "total"])

        # Act
        result = sample_data.groupBy("user_id").agg(
            F.sum("amount").alias("total")
        )

        # Assert
        assert_df_equality(result, expected, ignore_row_order=True)

    def test_handles_empty_dataframe(self, spark):
        # Arrange
        empty_df = spark.createDataFrame([], "id INT, amount DOUBLE")

        # Act
        result = empty_df.agg(F.sum("amount").alias("total")).collect()

        # Assert
        assert result[0]["total"] is None

    def test_window_functions(self, spark, sample_data):
        # Arrange
        from pyspark.sql.window import Window
        window = Window.partitionBy("user_id").orderBy("id")

        # Act
        result = sample_data.withColumn(
            "running_total",
            F.sum("amount").over(window)
        ).filter(F.col("user_id") == "user1")

        # Assert
        totals = [row["running_total"] for row in result.collect()]
        assert totals == [100.0, 250.0]

Best Practices

最佳实践

Performance

性能优化

python
undefined
python
undefined

✅ DO: Use DataFrame API over RDD

✅ 推荐:使用DataFrame API而非RDD

df.filter(F.col("status") == "active") # Catalyst optimized
df.filter(F.col("status") == "active") # Catalyst优化支持

❌ DON'T: Use RDD transformations

❌ 不推荐:使用RDD转换

rdd.filter(lambda x: x["status"] == "active") # No optimization
rdd.filter(lambda x: x["status"] == "active") # 无优化支持

✅ DO: Use built-in functions

✅ 推荐:使用内置函数

df.withColumn("upper_name", F.upper("name"))
df.withColumn("upper_name", F.upper("name"))

❌ DON'T: Use Python UDFs (slow serialization)

❌ 不推荐:使用Python UDF(序列化缓慢)

@udf def upper_name(name): return name.upper()
@udf def upper_name(name): return name.upper()

✅ DO: Broadcast small lookups

✅ 推荐:广播小表关联

df.join(broadcast(small_df), "key")
df.join(broadcast(small_df), "key")

✅ DO: Persist wisely

✅ 推荐:合理使用持久化

intermediate.cache() intermediate.count() # Force materialization
intermediate.cache() intermediate.count() # 触发缓存物化

... use intermediate multiple times ...

... 多次使用intermediate ...

intermediate.unpersist()
undefined
intermediate.unpersist()
undefined

Code Organization

代码组织

python
undefined
python
undefined

✅ DO: Chain transformations fluently

✅ 推荐:链式调用转换操作

result = (df .filter(condition) .withColumn("new_col", F.expr("...")) .groupBy("key") .agg(F.sum("value")))
result = (df .filter(condition) .withColumn("new_col", F.expr("...")) .groupBy("key") .agg(F.sum("value")))

✅ DO: Use descriptive column aliases

✅ 推荐:使用描述性列别名

.agg( F.count("*").alias("event_count"), F.avg("amount").alias("avg_amount") )
.agg( F.count("*").alias("event_count"), F.avg("amount").alias("avg_amount") )

✅ DO: Parameterize for reusability

✅ 推荐:参数化实现复用

def add_date_features(df, date_col): return (df .withColumn("year", F.year(date_col)) .withColumn("month", F.month(date_col)) .withColumn("day_of_week", F.dayofweek(date_col)))
undefined
def add_date_features(df, date_col): return (df .withColumn("year", F.year(date_col)) .withColumn("month", F.month(date_col)) .withColumn("day_of_week", F.dayofweek(date_col)))
undefined

Resources

参考资源

Official Documentation

官方文档

Performance Tuning

性能调优

Books

书籍

  • "Learning Spark 2nd Edition" by Damji et al.
  • "Spark: The Definitive Guide" by Chambers & Zaharia
  • "High Performance Spark" by Karau & Warren
  • 《Learning Spark 第二版》Damji等著
  • 《Spark权威指南》Chambers & Zaharia著
  • 《高性能Spark》Karau & Warren著

Next Skills

后续技能

After mastering Big Data:
  • data-warehousing
    - Design dimensional models
  • mlops
    - Deploy ML at scale
  • streaming
    - Real-time with Flink/Kafka
  • cloud-platforms
    - AWS EMR, Databricks

Skill Certification Checklist:
  • Can optimize Spark jobs using EXPLAIN and Spark UI
  • Can implement efficient joins with broadcast and bucketing
  • Can handle data skew with salting techniques
  • Can build streaming pipelines with Structured Streaming
  • Can use Delta Lake for ACID operations
掌握大数据技术后可进阶:
  • data-warehousing
    - 设计维度模型
  • mlops
    - 大规模部署机器学习
  • streaming
    - 使用Flink/Kafka实现实时处理
  • cloud-platforms
    - AWS EMR、Databricks平台

技能认证清单:
  • 能够使用EXPLAIN和Spark UI优化Spark作业
  • 能够实现基于广播和分桶的高效关联
  • 能够使用加盐技术处理数据倾斜
  • 能够基于结构化流处理构建流处理管道
  • 能够使用Delta Lake实现ACID操作