runners
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseApache Beam Runners
Apache Beam 运行器
Overview
概述
Runners execute Beam pipelines on distributed processing backends. Each runner translates the portable Beam model to its native execution engine.
运行器在分布式处理后端执行Beam管道。每个运行器都会将可移植的Beam模型转换为其原生执行引擎的格式。
Available Runners
可用运行器
| Runner | Location | Description |
|---|---|---|
| Direct | | Local execution for testing |
| Prism | | Portable local runner |
| Dataflow | | Google Cloud Dataflow |
| Flink | | Apache Flink |
| Spark | | Apache Spark |
| Samza | | Apache Samza |
| Jet | | Hazelcast Jet |
| Twister2 | | Twister2 |
| 运行器 | 路径 | 描述 |
|---|---|---|
| Direct | | 本地执行,用于测试 |
| Prism | | 可移植本地运行器 |
| Dataflow | | Google Cloud Dataflow |
| Flink | | Apache Flink |
| Spark | | Apache Spark |
| Samza | | Apache Samza |
| Jet | | Hazelcast Jet |
| Twister2 | | Twister2 |
Direct Runner
Direct 运行器
For local development and testing.
用于本地开发和测试。
Java
Java
java
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.class);
Pipeline p = Pipeline.create(options);java
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.class);
Pipeline p = Pipeline.create(options);Python
Python
python
options = PipelineOptions()
options.view_as(StandardOptions).runner = 'DirectRunner'
p = beam.Pipeline(options=options)python
options = PipelineOptions()
options.view_as(StandardOptions).runner = 'DirectRunner'
p = beam.Pipeline(options=options)Command Line
命令行
bash
--runner=DirectRunnerbash
--runner=DirectRunnerDataflow Runner
Dataflow 运行器
Prerequisites
前置条件
- GCP project with Dataflow API enabled
- Service account with Dataflow Admin role
- GCS bucket for staging
- 已启用Dataflow API的GCP项目
- 拥有Dataflow管理员角色的服务账号
- 用于暂存的GCS存储桶
Java Usage
Java 使用示例
java
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setProject("my-project");
options.setRegion("us-central1");
options.setTempLocation("gs://my-bucket/temp");java
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setProject("my-project");
options.setRegion("us-central1");
options.setTempLocation("gs://my-bucket/temp");Python Usage
Python 使用示例
python
options = PipelineOptions([
'--runner=DataflowRunner',
'--project=my-project',
'--region=us-central1',
'--temp_location=gs://my-bucket/temp'
])python
options = PipelineOptions([
'--runner=DataflowRunner',
'--project=my-project',
'--region=us-central1',
'--temp_location=gs://my-bucket/temp'
])Runner v2
Runner v2
bash
--experiments=use_runner_v2bash
--experiments=use_runner_v2Custom SDK Container
自定义SDK容器
bash
--sdkContainerImage=gcr.io/project/beam_java11_sdk:custombash
--sdkContainerImage=gcr.io/project/beam_java11_sdk:customFlink Runner
Flink 运行器
Embedded Mode
嵌入模式
java
FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
options.setRunner(FlinkRunner.class);
options.setFlinkMaster("[local]");java
FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
options.setRunner(FlinkRunner.class);
options.setFlinkMaster("[local]");Cluster Mode
集群模式
java
options.setFlinkMaster("host:port");java
options.setFlinkMaster("host:port");Portable Mode (Python)
可移植模式(Python)
python
options = PipelineOptions([
'--runner=FlinkRunner',
'--flink_master=host:port',
'--environment_type=LOOPBACK' # or DOCKER, EXTERNAL
])python
options = PipelineOptions([
'--runner=FlinkRunner',
'--flink_master=host:port',
'--environment_type=LOOPBACK' # or DOCKER, EXTERNAL
])Spark Runner
Spark 运行器
Java
Java
java
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
options.setSparkMaster("local[*]"); # or spark://host:portjava
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
options.setSparkMaster("local[*]"); # or spark://host:portPython (Portable)
Python(可移植)
python
options = PipelineOptions([
'--runner=SparkRunner',
'--spark_master_url=local[*]'
])python
options = PipelineOptions([
'--runner=SparkRunner',
'--spark_master_url=local[*]'
])Testing with Runners
运行器测试
ValidatesRunner Tests
ValidatesRunner 测试
Tests that validate runner correctness:
bash
undefined验证运行器正确性的测试:
bash
undefinedDirect Runner
Direct Runner
./gradlew :runners:direct-java:validatesRunner
./gradlew :runners:direct-java:validatesRunner
Flink Runner
Flink Runner
./gradlew :runners:flink:1.18:validatesRunner
./gradlew :runners:flink:1.18:validatesRunner
Spark Runner
Spark Runner
./gradlew :runners:spark:3:validatesRunner
./gradlew :runners:spark:3:validatesRunner
Dataflow Runner
Dataflow Runner
./gradlew :runners:google-cloud-dataflow-java:validatesRunner
undefined./gradlew :runners:google-cloud-dataflow-java:validatesRunner
undefinedTestPipeline with Runners
结合TestPipeline使用运行器
java
@Rule public TestPipeline pipeline = TestPipeline.create();
// Set runner via system property
-DbeamTestPipelineOptions='["--runner=TestDataflowRunner"]'java
@Rule public TestPipeline pipeline = TestPipeline.create();
// 通过系统属性设置运行器
-DbeamTestPipelineOptions='["--runner=TestDataflowRunner"]'Portable Runners
可移植运行器
Concept
概念
- SDK-independent execution via Fn API
- SDK runs in container, communicates via gRPC
- 通过Fn API实现SDK无关执行
- SDK在容器中运行,通过gRPC通信
Environment Types
环境类型
- - SDK in Docker container
DOCKER - - SDK in same process (testing)
LOOPBACK - - SDK at specified address
EXTERNAL - - SDK in subprocess
PROCESS
- - SDK在Docker容器中运行
DOCKER - - SDK在同一进程中运行(测试用)
LOOPBACK - - SDK在指定地址运行
EXTERNAL - - SDK在子进程中运行
PROCESS
Job Server
Job Server
Start Flink job server:
bash
./gradlew :runners:flink:1.18:job-server:runShadowStart Spark job server:
bash
./gradlew :runners:spark:3:job-server:runShadow启动Flink job server:
bash
./gradlew :runners:flink:1.18:job-server:runShadow启动Spark job server:
bash
./gradlew :runners:spark:3:job-server:runShadowRunner-Specific Options
运行器专属配置项
Dataflow
Dataflow
| Option | Description |
|---|---|
| GCP project |
| GCP region |
| GCS temp location |
| GCS staging |
| Initial workers |
| Max workers |
| VM type |
| 配置项 | 描述 |
|---|---|
| GCP项目 |
| GCP区域 |
| GCS临时存储路径 |
| GCS暂存路径 |
| 初始工作节点数 |
| 最大工作节点数 |
| 虚拟机类型 |
Flink
Flink
| Option | Description |
|---|---|
| Flink master address |
| Default parallelism |
| Checkpoint interval |
| 配置项 | 描述 |
|---|---|
| Flink主节点地址 |
| 默认并行度 |
| 检查点间隔 |
Spark
Spark
| Option | Description |
|---|---|
| Spark master URL |
| Additional Spark config |
| 配置项 | 描述 |
|---|---|
| Spark主节点URL |
| 额外Spark配置 |
Building Runner Artifacts
构建运行器制品
Dataflow Worker Jar
Dataflow Worker Jar
bash
./gradlew :runners:google-cloud-dataflow-java:worker:shadowJarbash
./gradlew :runners:google-cloud-dataflow-java:worker:shadowJarFlink Job Server
Flink Job Server
bash
./gradlew :runners:flink:1.18:job-server:shadowJarbash
./gradlew :runners:flink:1.18:job-server:shadowJarSpark Job Server
Spark Job Server
bash
./gradlew :runners:spark:3:job-server:shadowJarbash
./gradlew :runners:spark:3:job-server:shadowJarDebugging
调试
Direct Runner
Direct 运行器
- Enable logging:
-Dorg.slf4j.simpleLogger.defaultLogLevel=debug - Use for deterministic execution
--targetParallelism=1
- 启用日志:
-Dorg.slf4j.simpleLogger.defaultLogLevel=debug - 使用实现确定性执行
--targetParallelism=1
Dataflow
Dataflow
- Check Dataflow UI: console.cloud.google.com/dataflow
- Use for graph debugging
--experiments=upload_graph - Worker logs in Cloud Logging
- 查看Dataflow UI:console.cloud.google.com/dataflow
- 使用进行图形调试
--experiments=upload_graph - 工作节点日志在Cloud Logging中查看
Portable Runners
可移植运行器
- Enable debug logging on job server
- Check SDK harness logs in worker containers
- 在job server上启用调试日志
- 在工作节点容器中查看SDK工具包日志