python-development
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChinesePython Development in Apache Beam
Apache Beam 中的 Python 开发
Project Structure
项目结构
Key Directories
关键目录
- - Python SDK root
sdks/python/- - Main Beam package
apache_beam/- - Core transforms (ParDo, GroupByKey, etc.)
transforms/ - - I/O connectors
io/ - - Beam ML code (RunInference, etc.)
ml/ - - Runner implementations and wrappers
runners/ - - SDK worker harness
runners/worker/
- - Docker container configuration
container/ - - Test configurations
test-suites/ - - Utility scripts
scripts/
- - Python SDK 根目录
sdks/python/- - Beam 主包
apache_beam/- - 核心转换(ParDo、GroupByKey 等)
transforms/ - - I/O 连接器
io/ - - Beam ML 代码(RunInference 等)
ml/ - - 运行器实现与包装器
runners/ - - SDK 工作器进程
runners/worker/
- - Docker 容器配置
container/ - - 测试配置
test-suites/ - - 实用脚本
scripts/
Configuration Files
配置文件
- - Package configuration
setup.py - - Build configuration
pyproject.toml - - Test automation
tox.ini - - Pytest configuration
pytest.ini - - Linting rules
.pylintrc - - Import sorting
.isort.cfg - - Type checking
mypy.ini
- - 包配置文件
setup.py - - 构建配置文件
pyproject.toml - - 测试自动化配置
tox.ini - - Pytest 配置文件
pytest.ini - - 代码检查规则
.pylintrc - - 导入排序配置
.isort.cfg - - 类型检查配置
mypy.ini
Environment Setup
环境搭建
Using pyenv (Recommended)
使用 pyenv(推荐)
bash
undefinedbash
undefinedInstall Python
安装 Python
pyenv install 3.X # Use supported version from gradle.properties
pyenv install 3.X # 使用 gradle.properties 中支持的版本
Create virtual environment
创建虚拟环境
pyenv virtualenv 3.X beam-dev
pyenv activate beam-dev
undefinedpyenv virtualenv 3.X beam-dev
pyenv activate beam-dev
undefinedInstall in Editable Mode
以可编辑模式安装
bash
cd sdks/python
pip install -e .[gcp,test]bash
cd sdks/python
pip install -e .[gcp,test]Enable Pre-commit Hooks
启用预提交钩子
bash
pip install pre-commit
pre-commit installbash
pip install pre-commit
pre-commit installTo disable
禁用预提交钩子
pre-commit uninstall
undefinedpre-commit uninstall
undefinedRunning Tests
运行测试
Unit Tests (filename: *_test.py
)
*_test.py单元测试(文件名:*_test.py
)
*_test.pybash
undefinedbash
undefinedRun all tests in a file
运行某文件中的所有测试
pytest -v apache_beam/io/textio_test.py
pytest -v apache_beam/io/textio_test.py
Run tests in a class
运行某类中的测试
pytest -v apache_beam/io/textio_test.py::TextSourceTest
pytest -v apache_beam/io/textio_test.py::TextSourceTest
Run a specific test
运行指定测试用例
pytest -v apache_beam/io/textio_test.py::TextSourceTest::test_progress
undefinedpytest -v apache_beam/io/textio_test.py::TextSourceTest::test_progress
undefinedIntegration Tests (filename: *_it_test.py
)
*_it_test.py集成测试(文件名:*_it_test.py
)
*_it_test.pyOn Direct Runner
在 Direct Runner 上运行
bash
python -m pytest -o log_cli=True -o log_level=Info \
apache_beam/ml/inference/pytorch_inference_it_test.py::PyTorchInference \
--test-pipeline-options='--runner=TestDirectRunner'bash
python -m pytest -o log_cli=True -o log_level=Info \
apache_beam/ml/inference/pytorch_inference_it_test.py::PyTorchInference \
--test-pipeline-options='--runner=TestDirectRunner'On Dataflow Runner
在 Dataflow Runner 上运行
bash
undefinedbash
undefinedFirst build SDK tarball
先构建 SDK 压缩包
pip install build && python -m build --sdist
pip install build && python -m build --sdist
Run integration test
运行集成测试
python -m pytest -o log_cli=True -o log_level=Info
apache_beam/ml/inference/pytorch_inference_it_test.py::PyTorchInference
--test-pipeline-options='--runner=TestDataflowRunner --project=<project> --temp_location=gs://<bucket>/tmp --sdk_location=dist/apache-beam-2.XX.0.dev0.tar.gz --region=us-central1'
apache_beam/ml/inference/pytorch_inference_it_test.py::PyTorchInference
--test-pipeline-options='--runner=TestDataflowRunner --project=<project> --temp_location=gs://<bucket>/tmp --sdk_location=dist/apache-beam-2.XX.0.dev0.tar.gz --region=us-central1'
undefinedpython -m pytest -o log_cli=True -o log_level=Info
apache_beam/ml/inference/pytorch_inference_it_test.py::PyTorchInference
--test-pipeline-options='--runner=TestDataflowRunner --project=<project> --temp_location=gs://<bucket>/tmp --sdk_location=dist/apache-beam-2.XX.0.dev0.tar.gz --region=us-central1'
apache_beam/ml/inference/pytorch_inference_it_test.py::PyTorchInference
--test-pipeline-options='--runner=TestDataflowRunner --project=<project> --temp_location=gs://<bucket>/tmp --sdk_location=dist/apache-beam-2.XX.0.dev0.tar.gz --region=us-central1'
undefinedBuilding Python SDK
构建 Python SDK
Build Source Distribution
构建源码分发包
bash
cd sdks/python
pip install build && python -m build --sdistbash
cd sdks/python
pip install build && python -m build --sdistOutput: sdks/python/dist/apache-beam-X.XX.0.dev0.tar.gz
输出文件:sdks/python/dist/apache-beam-X.XX.0.dev0.tar.gz
undefinedundefinedBuild Wheel (faster installation)
构建 Wheel 包(安装速度更快)
bash
./gradlew :sdks:python:bdistPy311linux # For Python 3.11 on Linuxbash
./gradlew :sdks:python:bdistPy311linux # 适用于 Linux 系统的 Python 3.11Build and Push SDK Container Image
构建并推送 SDK 容器镜像
bash
./gradlew :sdks:python:container:py311:docker \
-Pdocker-repository-root=gcr.io/your-project/your-name \
-Pdocker-tag=custom \
-Ppush-containersbash
./gradlew :sdks:python:container:py311:docker \
-Pdocker-repository-root=gcr.io/your-project/your-name \
-Pdocker-tag=custom \
-Ppush-containersContainer image will be pushed to: gcr.io/your-project/your-name/beam_python3.11_sdk:custom
容器镜像将被推送到:gcr.io/your-project/your-name/beam_python3.11_sdk:custom
To use this container image, supply it via `--sdk_container_image`.
要使用此容器镜像,需通过 `--sdk_container_image` 参数指定。Running Pipelines with Modified Code
使用修改后的代码运行管道
bash
undefinedbash
undefinedInstall modified SDK
安装修改后的 SDK
pip install /path/to/apache-beam.tar.gz[gcp]
pip install /path/to/apache-beam.tar.gz[gcp]
Run pipeline
运行管道
python my_pipeline.py
--runner=DataflowRunner
--sdk_location=/path/to/apache-beam.tar.gz
--project=my_project
--region=us-central1
--temp_location=gs://my-bucket/temp
--runner=DataflowRunner
--sdk_location=/path/to/apache-beam.tar.gz
--project=my_project
--region=us-central1
--temp_location=gs://my-bucket/temp
undefinedpython my_pipeline.py
--runner=DataflowRunner
--sdk_location=/path/to/apache-beam.tar.gz
--project=my_project
--region=us-central1
--temp_location=gs://my-bucket/temp
--runner=DataflowRunner
--sdk_location=/path/to/apache-beam.tar.gz
--project=my_project
--region=us-central1
--temp_location=gs://my-bucket/temp
undefinedCommon Issues
常见问题
NameError
when running DoFn
NameError运行 DoFn 时出现 NameError
Global imports, functions, and variables in the main pipeline module are not serialized by default. Use:
bash
--save_main_session主管道模块中的全局导入、函数和变量默认不会被序列化。请使用:
bash
--save_main_sessionSpecifying Additional Dependencies
指定额外依赖
Use or custom containers.
--requirements_file=requirements.txt使用 或自定义容器。
--requirements_file=requirements.txtTest Markers
测试标记
- - Include in PostCommit test suite
@pytest.mark.it_postcommit
- - 包含在提交后测试套件中
@pytest.mark.it_postcommit
Gradle Commands for Python
Python 相关 Gradle 命令
bash
undefinedbash
undefinedRun WordCount
运行 WordCount 示例
./gradlew :sdks:python:wordCount
./gradlew :sdks:python:wordCount
Check environment
检查环境配置
./gradlew :checkSetup
undefined./gradlew :checkSetup
undefinedCode Quality Tools
代码质量工具
bash
undefinedbash
undefinedLinting
代码检查
pylint apache_beam/
pylint apache_beam/
Type checking
类型检查
mypy apache_beam/
mypy apache_beam/
Formatting (via yapf)
代码格式化(通过 yapf)
yapf -i apache_beam/file.py
yapf -i apache_beam/file.py
Import sorting
导入排序
isort apache_beam/file.py
undefinedisort apache_beam/file.py
undefined