runners

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Apache Beam Runners

Apache Beam 运行器

Overview

概述

Runners execute Beam pipelines on distributed processing backends. Each runner translates the portable Beam model to its native execution engine.
运行器在分布式处理后端执行Beam管道。每个运行器都会将可移植的Beam模型转换为其原生执行引擎的格式。

Available Runners

可用运行器

RunnerLocationDescription
Direct
runners/direct-java/
Local execution for testing
Prism
runners/prism/
Portable local runner
Dataflow
runners/google-cloud-dataflow-java/
Google Cloud Dataflow
Flink
runners/flink/
Apache Flink
Spark
runners/spark/
Apache Spark
Samza
runners/samza/
Apache Samza
Jet
runners/jet/
Hazelcast Jet
Twister2
runners/twister2/
Twister2
运行器路径描述
Direct
runners/direct-java/
本地执行,用于测试
Prism
runners/prism/
可移植本地运行器
Dataflow
runners/google-cloud-dataflow-java/
Google Cloud Dataflow
Flink
runners/flink/
Apache Flink
Spark
runners/spark/
Apache Spark
Samza
runners/samza/
Apache Samza
Jet
runners/jet/
Hazelcast Jet
Twister2
runners/twister2/
Twister2

Direct Runner

Direct 运行器

For local development and testing.
用于本地开发和测试。

Java

Java

java
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.class);
Pipeline p = Pipeline.create(options);
java
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.class);
Pipeline p = Pipeline.create(options);

Python

Python

python
options = PipelineOptions()
options.view_as(StandardOptions).runner = 'DirectRunner'
p = beam.Pipeline(options=options)
python
options = PipelineOptions()
options.view_as(StandardOptions).runner = 'DirectRunner'
p = beam.Pipeline(options=options)

Command Line

命令行

bash
--runner=DirectRunner
bash
--runner=DirectRunner

Dataflow Runner

Dataflow 运行器

Prerequisites

前置条件

  • GCP project with Dataflow API enabled
  • Service account with Dataflow Admin role
  • GCS bucket for staging
  • 已启用Dataflow API的GCP项目
  • 拥有Dataflow管理员角色的服务账号
  • 用于暂存的GCS存储桶

Java Usage

Java 使用示例

java
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setProject("my-project");
options.setRegion("us-central1");
options.setTempLocation("gs://my-bucket/temp");
java
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setProject("my-project");
options.setRegion("us-central1");
options.setTempLocation("gs://my-bucket/temp");

Python Usage

Python 使用示例

python
options = PipelineOptions([
    '--runner=DataflowRunner',
    '--project=my-project',
    '--region=us-central1',
    '--temp_location=gs://my-bucket/temp'
])
python
options = PipelineOptions([
    '--runner=DataflowRunner',
    '--project=my-project',
    '--region=us-central1',
    '--temp_location=gs://my-bucket/temp'
])

Runner v2

Runner v2

bash
--experiments=use_runner_v2
bash
--experiments=use_runner_v2

Custom SDK Container

自定义SDK容器

bash
--sdkContainerImage=gcr.io/project/beam_java11_sdk:custom
bash
--sdkContainerImage=gcr.io/project/beam_java11_sdk:custom

Flink Runner

Flink 运行器

Embedded Mode

嵌入模式

java
FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
options.setRunner(FlinkRunner.class);
options.setFlinkMaster("[local]");
java
FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
options.setRunner(FlinkRunner.class);
options.setFlinkMaster("[local]");

Cluster Mode

集群模式

java
options.setFlinkMaster("host:port");
java
options.setFlinkMaster("host:port");

Portable Mode (Python)

可移植模式(Python)

python
options = PipelineOptions([
    '--runner=FlinkRunner',
    '--flink_master=host:port',
    '--environment_type=LOOPBACK'  # or DOCKER, EXTERNAL
])
python
options = PipelineOptions([
    '--runner=FlinkRunner',
    '--flink_master=host:port',
    '--environment_type=LOOPBACK'  # or DOCKER, EXTERNAL
])

Spark Runner

Spark 运行器

Java

Java

java
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
options.setSparkMaster("local[*]");  # or spark://host:port
java
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
options.setSparkMaster("local[*]");  # or spark://host:port

Python (Portable)

Python(可移植)

python
options = PipelineOptions([
    '--runner=SparkRunner',
    '--spark_master_url=local[*]'
])
python
options = PipelineOptions([
    '--runner=SparkRunner',
    '--spark_master_url=local[*]'
])

Testing with Runners

运行器测试

ValidatesRunner Tests

ValidatesRunner 测试

Tests that validate runner correctness:
bash
undefined
验证运行器正确性的测试:
bash
undefined

Direct Runner

Direct Runner

./gradlew :runners:direct-java:validatesRunner
./gradlew :runners:direct-java:validatesRunner

Flink Runner

Flink Runner

./gradlew :runners:flink:1.18:validatesRunner
./gradlew :runners:flink:1.18:validatesRunner

Spark Runner

Spark Runner

./gradlew :runners:spark:3:validatesRunner
./gradlew :runners:spark:3:validatesRunner

Dataflow Runner

Dataflow Runner

./gradlew :runners:google-cloud-dataflow-java:validatesRunner
undefined
./gradlew :runners:google-cloud-dataflow-java:validatesRunner
undefined

TestPipeline with Runners

结合TestPipeline使用运行器

java
@Rule public TestPipeline pipeline = TestPipeline.create();

// Set runner via system property
-DbeamTestPipelineOptions='["--runner=TestDataflowRunner"]'
java
@Rule public TestPipeline pipeline = TestPipeline.create();

// 通过系统属性设置运行器
-DbeamTestPipelineOptions='["--runner=TestDataflowRunner"]'

Portable Runners

可移植运行器

Concept

概念

  • SDK-independent execution via Fn API
  • SDK runs in container, communicates via gRPC
  • 通过Fn API实现SDK无关执行
  • SDK在容器中运行,通过gRPC通信

Environment Types

环境类型

  • DOCKER
    - SDK in Docker container
  • LOOPBACK
    - SDK in same process (testing)
  • EXTERNAL
    - SDK at specified address
  • PROCESS
    - SDK in subprocess
  • DOCKER
    - SDK在Docker容器中运行
  • LOOPBACK
    - SDK在同一进程中运行(测试用)
  • EXTERNAL
    - SDK在指定地址运行
  • PROCESS
    - SDK在子进程中运行

Job Server

Job Server

Start Flink job server:
bash
./gradlew :runners:flink:1.18:job-server:runShadow
Start Spark job server:
bash
./gradlew :runners:spark:3:job-server:runShadow
启动Flink job server:
bash
./gradlew :runners:flink:1.18:job-server:runShadow
启动Spark job server:
bash
./gradlew :runners:spark:3:job-server:runShadow

Runner-Specific Options

运行器专属配置项

Dataflow

Dataflow

OptionDescription
--project
GCP project
--region
GCP region
--tempLocation
GCS temp location
--stagingLocation
GCS staging
--numWorkers
Initial workers
--maxNumWorkers
Max workers
--workerMachineType
VM type
配置项描述
--project
GCP项目
--region
GCP区域
--tempLocation
GCS临时存储路径
--stagingLocation
GCS暂存路径
--numWorkers
初始工作节点数
--maxNumWorkers
最大工作节点数
--workerMachineType
虚拟机类型

Flink

Flink

OptionDescription
--flinkMaster
Flink master address
--parallelism
Default parallelism
--checkpointingInterval
Checkpoint interval
配置项描述
--flinkMaster
Flink主节点地址
--parallelism
默认并行度
--checkpointingInterval
检查点间隔

Spark

Spark

OptionDescription
--sparkMaster
Spark master URL
--sparkConf
Additional Spark config
配置项描述
--sparkMaster
Spark主节点URL
--sparkConf
额外Spark配置

Building Runner Artifacts

构建运行器制品

Dataflow Worker Jar

Dataflow Worker Jar

bash
./gradlew :runners:google-cloud-dataflow-java:worker:shadowJar
bash
./gradlew :runners:google-cloud-dataflow-java:worker:shadowJar

Flink Job Server

Flink Job Server

bash
./gradlew :runners:flink:1.18:job-server:shadowJar
bash
./gradlew :runners:flink:1.18:job-server:shadowJar

Spark Job Server

Spark Job Server

bash
./gradlew :runners:spark:3:job-server:shadowJar
bash
./gradlew :runners:spark:3:job-server:shadowJar

Debugging

调试

Direct Runner

Direct 运行器

  • Enable logging:
    -Dorg.slf4j.simpleLogger.defaultLogLevel=debug
  • Use
    --targetParallelism=1
    for deterministic execution
  • 启用日志:
    -Dorg.slf4j.simpleLogger.defaultLogLevel=debug
  • 使用
    --targetParallelism=1
    实现确定性执行

Dataflow

Dataflow

  • Check Dataflow UI: console.cloud.google.com/dataflow
  • Use
    --experiments=upload_graph
    for graph debugging
  • Worker logs in Cloud Logging
  • 查看Dataflow UI:console.cloud.google.com/dataflow
  • 使用
    --experiments=upload_graph
    进行图形调试
  • 工作节点日志在Cloud Logging中查看

Portable Runners

可移植运行器

  • Enable debug logging on job server
  • Check SDK harness logs in worker containers
  • 在job server上启用调试日志
  • 在工作节点容器中查看SDK工具包日志