python-development

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Python Development in Apache Beam

Apache Beam 中的 Python 开发

Project Structure

项目结构

Key Directories

关键目录

  • sdks/python/
    - Python SDK root
    • apache_beam/
      - Main Beam package
      • transforms/
        - Core transforms (ParDo, GroupByKey, etc.)
      • io/
        - I/O connectors
      • ml/
        - Beam ML code (RunInference, etc.)
      • runners/
        - Runner implementations and wrappers
      • runners/worker/
        - SDK worker harness
    • container/
      - Docker container configuration
    • test-suites/
      - Test configurations
    • scripts/
      - Utility scripts
  • sdks/python/
    - Python SDK 根目录
    • apache_beam/
      - Beam 主包
      • transforms/
        - 核心转换(ParDo、GroupByKey 等)
      • io/
        - I/O 连接器
      • ml/
        - Beam ML 代码(RunInference 等)
      • runners/
        - 运行器实现与包装器
      • runners/worker/
        - SDK 工作器进程
    • container/
      - Docker 容器配置
    • test-suites/
      - 测试配置
    • scripts/
      - 实用脚本

Configuration Files

配置文件

  • setup.py
    - Package configuration
  • pyproject.toml
    - Build configuration
  • tox.ini
    - Test automation
  • pytest.ini
    - Pytest configuration
  • .pylintrc
    - Linting rules
  • .isort.cfg
    - Import sorting
  • mypy.ini
    - Type checking
  • setup.py
    - 包配置文件
  • pyproject.toml
    - 构建配置文件
  • tox.ini
    - 测试自动化配置
  • pytest.ini
    - Pytest 配置文件
  • .pylintrc
    - 代码检查规则
  • .isort.cfg
    - 导入排序配置
  • mypy.ini
    - 类型检查配置

Environment Setup

环境搭建

Using pyenv (Recommended)

使用 pyenv(推荐)

bash
undefined
bash
undefined

Install Python

安装 Python

pyenv install 3.X # Use supported version from gradle.properties
pyenv install 3.X # 使用 gradle.properties 中支持的版本

Create virtual environment

创建虚拟环境

pyenv virtualenv 3.X beam-dev pyenv activate beam-dev
undefined
pyenv virtualenv 3.X beam-dev pyenv activate beam-dev
undefined

Install in Editable Mode

以可编辑模式安装

bash
cd sdks/python
pip install -e .[gcp,test]
bash
cd sdks/python
pip install -e .[gcp,test]

Enable Pre-commit Hooks

启用预提交钩子

bash
pip install pre-commit
pre-commit install
bash
pip install pre-commit
pre-commit install

To disable

禁用预提交钩子

pre-commit uninstall
undefined
pre-commit uninstall
undefined

Running Tests

运行测试

Unit Tests (filename:
*_test.py
)

单元测试(文件名:
*_test.py

bash
undefined
bash
undefined

Run all tests in a file

运行某文件中的所有测试

pytest -v apache_beam/io/textio_test.py
pytest -v apache_beam/io/textio_test.py

Run tests in a class

运行某类中的测试

pytest -v apache_beam/io/textio_test.py::TextSourceTest
pytest -v apache_beam/io/textio_test.py::TextSourceTest

Run a specific test

运行指定测试用例

pytest -v apache_beam/io/textio_test.py::TextSourceTest::test_progress
undefined
pytest -v apache_beam/io/textio_test.py::TextSourceTest::test_progress
undefined

Integration Tests (filename:
*_it_test.py
)

集成测试(文件名:
*_it_test.py

On Direct Runner

在 Direct Runner 上运行

bash
python -m pytest -o log_cli=True -o log_level=Info \
  apache_beam/ml/inference/pytorch_inference_it_test.py::PyTorchInference \
  --test-pipeline-options='--runner=TestDirectRunner'
bash
python -m pytest -o log_cli=True -o log_level=Info \
  apache_beam/ml/inference/pytorch_inference_it_test.py::PyTorchInference \
  --test-pipeline-options='--runner=TestDirectRunner'

On Dataflow Runner

在 Dataflow Runner 上运行

bash
undefined
bash
undefined

First build SDK tarball

先构建 SDK 压缩包

pip install build && python -m build --sdist
pip install build && python -m build --sdist

Run integration test

运行集成测试

python -m pytest -o log_cli=True -o log_level=Info
apache_beam/ml/inference/pytorch_inference_it_test.py::PyTorchInference
--test-pipeline-options='--runner=TestDataflowRunner --project=<project> --temp_location=gs://<bucket>/tmp --sdk_location=dist/apache-beam-2.XX.0.dev0.tar.gz --region=us-central1'
undefined
python -m pytest -o log_cli=True -o log_level=Info
apache_beam/ml/inference/pytorch_inference_it_test.py::PyTorchInference
--test-pipeline-options='--runner=TestDataflowRunner --project=<project> --temp_location=gs://<bucket>/tmp --sdk_location=dist/apache-beam-2.XX.0.dev0.tar.gz --region=us-central1'
undefined

Building Python SDK

构建 Python SDK

Build Source Distribution

构建源码分发包

bash
cd sdks/python
pip install build && python -m build --sdist
bash
cd sdks/python
pip install build && python -m build --sdist

Output: sdks/python/dist/apache-beam-X.XX.0.dev0.tar.gz

输出文件:sdks/python/dist/apache-beam-X.XX.0.dev0.tar.gz

undefined
undefined

Build Wheel (faster installation)

构建 Wheel 包(安装速度更快)

bash
./gradlew :sdks:python:bdistPy311linux  # For Python 3.11 on Linux
bash
./gradlew :sdks:python:bdistPy311linux  # 适用于 Linux 系统的 Python 3.11

Build and Push SDK Container Image

构建并推送 SDK 容器镜像

bash
./gradlew :sdks:python:container:py311:docker \
  -Pdocker-repository-root=gcr.io/your-project/your-name \
  -Pdocker-tag=custom \
  -Ppush-containers
bash
./gradlew :sdks:python:container:py311:docker \
  -Pdocker-repository-root=gcr.io/your-project/your-name \
  -Pdocker-tag=custom \
  -Ppush-containers

Container image will be pushed to: gcr.io/your-project/your-name/beam_python3.11_sdk:custom

容器镜像将被推送到:gcr.io/your-project/your-name/beam_python3.11_sdk:custom


To use this container image, supply it via `--sdk_container_image`.

要使用此容器镜像,需通过 `--sdk_container_image` 参数指定。

Running Pipelines with Modified Code

使用修改后的代码运行管道

bash
undefined
bash
undefined

Install modified SDK

安装修改后的 SDK

pip install /path/to/apache-beam.tar.gz[gcp]
pip install /path/to/apache-beam.tar.gz[gcp]

Run pipeline

运行管道

python my_pipeline.py
--runner=DataflowRunner
--sdk_location=/path/to/apache-beam.tar.gz
--project=my_project
--region=us-central1
--temp_location=gs://my-bucket/temp
undefined
python my_pipeline.py
--runner=DataflowRunner
--sdk_location=/path/to/apache-beam.tar.gz
--project=my_project
--region=us-central1
--temp_location=gs://my-bucket/temp
undefined

Common Issues

常见问题

NameError
when running DoFn

运行 DoFn 时出现 NameError

Global imports, functions, and variables in the main pipeline module are not serialized by default. Use:
bash
--save_main_session
主管道模块中的全局导入、函数和变量默认不会被序列化。请使用:
bash
--save_main_session

Specifying Additional Dependencies

指定额外依赖

Use
--requirements_file=requirements.txt
or custom containers.
使用
--requirements_file=requirements.txt
或自定义容器。

Test Markers

测试标记

  • @pytest.mark.it_postcommit
    - Include in PostCommit test suite
  • @pytest.mark.it_postcommit
    - 包含在提交后测试套件中

Gradle Commands for Python

Python 相关 Gradle 命令

bash
undefined
bash
undefined

Run WordCount

运行 WordCount 示例

./gradlew :sdks:python:wordCount
./gradlew :sdks:python:wordCount

Check environment

检查环境配置

./gradlew :checkSetup
undefined
./gradlew :checkSetup
undefined

Code Quality Tools

代码质量工具

bash
undefined
bash
undefined

Linting

代码检查

pylint apache_beam/
pylint apache_beam/

Type checking

类型检查

mypy apache_beam/
mypy apache_beam/

Formatting (via yapf)

代码格式化(通过 yapf)

yapf -i apache_beam/file.py
yapf -i apache_beam/file.py

Import sorting

导入排序

isort apache_beam/file.py
undefined
isort apache_beam/file.py
undefined