beam-concepts

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Apache Beam Core Concepts

Apache Beam核心概念

The Beam Model

Beam模型

Evolved from Google's MapReduce, FlumeJava, and Millwheel projects. Originally called the "Dataflow Model."
源自Google的MapReduce、FlumeJava和Millwheel项目,最初被称为“Dataflow Model”。

Key Abstractions

核心抽象概念

Pipeline

Pipeline

A Pipeline encapsulates the entire data processing task, including reading, transforming, and writing data.
java
// Java
Pipeline p = Pipeline.create(options);
p.apply(...)
 .apply(...)
 .apply(...);
p.run().waitUntilFinish();
python
undefined
Pipeline封装了整个数据处理任务,包括数据的读取、转换和写入。
java
// Java
Pipeline p = Pipeline.create(options);
p.apply(...)
 .apply(...)
 .apply(...);
p.run().waitUntilFinish();
python
undefined

Python

Python

with beam.Pipeline(options=options) as p: (p | 'Read' >> beam.io.ReadFromText('input.txt') | 'Transform' >> beam.Map(process) | 'Write' >> beam.io.WriteToText('output'))
undefined
with beam.Pipeline(options=options) as p: (p | 'Read' >> beam.io.ReadFromText('input.txt') | 'Transform' >> beam.Map(process) | 'Write' >> beam.io.WriteToText('output'))
undefined

PCollection

PCollection

A distributed dataset that can be bounded (batch) or unbounded (streaming).
一种分布式数据集,可以是有界(批处理)或无界(流处理)的。

Properties

特性

  • Immutable - Once created, cannot be modified
  • Distributed - Elements processed in parallel
  • May be bounded or unbounded
  • Timestamped - Each element has an event timestamp
  • Windowed - Elements assigned to windows
  • 不可变:一旦创建,无法修改
  • 分布式:元素并行处理
  • 可为有界或无界
  • 带时间戳:每个元素都有事件时间戳
  • 窗口化:元素被分配到不同窗口

PTransform

PTransform

A data processing operation that transforms PCollections.
java
// Java
PCollection<String> output = input.apply(MyTransform.create());
python
undefined
一种数据处理操作,用于转换PCollections。
java
// Java
PCollection<String> output = input.apply(MyTransform.create());
python
undefined

Python

Python

output = input | 'Name' >> beam.ParDo(MyDoFn())
undefined
output = input | 'Name' >> beam.ParDo(MyDoFn())
undefined

Core Transforms

核心转换操作

ParDo

ParDo

General-purpose parallel processing.
java
// Java
input.apply(ParDo.of(new DoFn<String, Integer>() {
    @ProcessElement
    public void processElement(@Element String element, OutputReceiver<Integer> out) {
        out.output(element.length());
    }
}));
python
undefined
通用并行处理。
java
// Java
input.apply(ParDo.of(new DoFn<String, Integer>() {
    @ProcessElement
    public void processElement(@Element String element, OutputReceiver<Integer> out) {
        out.output(element.length());
    }
}));
python
undefined

Python

Python

class LengthFn(beam.DoFn): def process(self, element): yield len(element)
input | beam.ParDo(LengthFn())
class LengthFn(beam.DoFn): def process(self, element): yield len(element)
input | beam.ParDo(LengthFn())

Or simpler:

或者更简洁的写法:

input | beam.Map(len)
undefined
input | beam.Map(len)
undefined

GroupByKey

GroupByKey

Groups elements by key.
java
PCollection<KV<String, Integer>> input = ...;
PCollection<KV<String, Iterable<Integer>>> grouped = input.apply(GroupByKey.create());
按Key分组元素。
java
PCollection<KV<String, Integer>> input = ...;
PCollection<KV<String, Iterable<Integer>>> grouped = input.apply(GroupByKey.create());

CoGroupByKey

CoGroupByKey

Joins multiple PCollections by key.
按Key连接多个PCollections。

Combine

Combine

Combines elements (sum, mean, etc.).
java
// Global combine
input.apply(Combine.globally(Sum.ofIntegers()));

// Per-key combine
input.apply(Combine.perKey(Sum.ofIntegers()));
合并元素(求和、求均值等)。
java
// 全局合并
input.apply(Combine.globally(Sum.ofIntegers()));

// 按Key合并
input.apply(Combine.perKey(Sum.ofIntegers()));

Flatten

Flatten

Merges multiple PCollections.
java
PCollectionList<String> collections = PCollectionList.of(pc1).and(pc2).and(pc3);
PCollection<String> merged = collections.apply(Flatten.pCollections());
合并多个PCollections。
java
PCollectionList<String> collections = PCollectionList.of(pc1).and(pc2).and(pc3);
PCollection<String> merged = collections.apply(Flatten.pCollections());

Partition

Partition

Splits a PCollection into multiple PCollections.
将一个PCollection拆分为多个PCollections。

Windowing

窗口机制

Types

窗口类型

  • Fixed Windows - Regular, non-overlapping intervals
  • Sliding Windows - Overlapping intervals
  • Session Windows - Gaps of inactivity define boundaries
  • Global Window - All elements in one window (default)
java
input.apply(Window.into(FixedWindows.of(Duration.standardMinutes(5))));
python
input | beam.WindowInto(beam.window.FixedWindows(300))
  • 固定窗口:规则的、不重叠的时间间隔
  • 滑动窗口:重叠的时间间隔
  • 会话窗口:以无活动间隔定义窗口边界
  • 全局窗口:所有元素在同一个窗口中(默认)
java
input.apply(Window.into(FixedWindows.of(Duration.standardMinutes(5))));
python
input | beam.WindowInto(beam.window.FixedWindows(300))

Triggers

触发器

Control when results are emitted.
java
input.apply(Window.<T>into(FixedWindows.of(Duration.standardMinutes(5)))
    .triggering(AfterWatermark.pastEndOfWindow()
        .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
            .plusDelayOf(Duration.standardMinutes(1))))
    .withAllowedLateness(Duration.standardHours(1))
    .accumulatingFiredPanes());
控制结果的输出时机。
java
input.apply(Window.<T>into(FixedWindows.of(Duration.standardMinutes(5)))
    .triggering(AfterWatermark.pastEndOfWindow()
        .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
            .plusDelayOf(Duration.standardMinutes(1))))
    .withAllowedLateness(Duration.standardHours(1))
    .accumulatingFiredPanes());

Side Inputs

侧输入

Additional inputs to ParDo.
java
PCollectionView<Map<String, String>> sideInput =
    lookupTable.apply(View.asMap());

mainInput.apply(ParDo.of(new DoFn<String, String>() {
    @ProcessElement
    public void processElement(ProcessContext c) {
        Map<String, String> lookup = c.sideInput(sideInput);
        // Use lookup...
    }
}).withSideInputs(sideInput));
ParDo的额外输入。
java
PCollectionView<Map<String, String>> sideInput =
    lookupTable.apply(View.asMap());

mainInput.apply(ParDo.of(new DoFn<String, String>() {
    @ProcessElement
    public void processElement(ProcessContext c) {
        Map<String, String> lookup = c.sideInput(sideInput);
        // 使用lookup进行处理...
    }
}).withSideInputs(sideInput));

Pipeline Options

管道配置选项

Configure pipeline execution.
java
public interface MyOptions extends PipelineOptions {
    @Description("Input file")
    @Required
    String getInput();
    void setInput(String value);
}

MyOptions options = PipelineOptionsFactory.fromArgs(args).as(MyOptions.class);
配置管道的执行参数。
java
public interface MyOptions extends PipelineOptions {
    @Description("输入文件路径")
    @Required
    String getInput();
    void setInput(String value);
}

MyOptions options = PipelineOptionsFactory.fromArgs(args).as(MyOptions.class);

Schema

数据Schema

Strongly-typed access to structured data.
java
@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract class User {
    public abstract String getName();
    public abstract int getAge();
}

PCollection<User> users = ...;
PCollection<Row> rows = users.apply(Convert.toRows());
对结构化数据的强类型访问。
java
@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract class User {
    public abstract String getName();
    public abstract int getAge();
}

PCollection<User> users = ...;
PCollection<Row> rows = users.apply(Convert.toRows());

Error Handling

错误处理

Dead Letter Queue Pattern

死信队列模式

java
TupleTag<String> successTag = new TupleTag<>() {};
TupleTag<String> failureTag = new TupleTag<>() {};

PCollectionTuple results = input.apply(ParDo.of(new DoFn<String, String>() {
    @ProcessElement
    public void processElement(ProcessContext c) {
        try {
            c.output(process(c.element()));
        } catch (Exception e) {
            c.output(failureTag, c.element());
        }
    }
}).withOutputTags(successTag, TupleTagList.of(failureTag)));

results.get(successTag).apply(WriteToSuccess());
results.get(failureTag).apply(WriteToDeadLetter());
java
TupleTag<String> successTag = new TupleTag<>() {};
TupleTag<String> failureTag = new TupleTag<>() {};

PCollectionTuple results = input.apply(ParDo.of(new DoFn<String, String>() {
    @ProcessElement
    public void processElement(ProcessContext c) {
        try {
            c.output(process(c.element()));
        } catch (Exception e) {
            c.output(failureTag, c.element());
        }
    }
}).withOutputTags(successTag, TupleTagList.of(failureTag)));

results.get(successTag).apply(WriteToSuccess());
results.get(failureTag).apply(WriteToDeadLetter());

Cross-Language Pipelines

跨语言管道

Use transforms from other SDKs.
python
undefined
使用其他SDK中的转换操作。
python
undefined

Use Java Kafka connector from Python

从Python中使用Java Kafka连接器

from apache_beam.io.kafka import ReadFromKafka
result = pipeline | ReadFromKafka( consumer_config={'bootstrap.servers': 'localhost:9092'}, topics=['my-topic'] )
undefined
from apache_beam.io.kafka import ReadFromKafka
result = pipeline | ReadFromKafka( consumer_config={'bootstrap.servers': 'localhost:9092'}, topics=['my-topic'] )
undefined

Best Practices

最佳实践

  1. Prefer built-in transforms over custom DoFns
  2. Use schemas for type-safe operations
  3. Minimize side inputs for performance
  4. Handle late data explicitly
  5. Test with DirectRunner before deploying
  6. Use TestPipeline for unit tests
  1. 优先使用内置转换操作而非自定义DoFn
  2. 使用Schema实现类型安全的操作
  3. 减少侧输入以提升性能
  4. 显式处理延迟数据
  5. 部署前使用DirectRunner测试
  6. 使用TestPipeline进行单元测试