python-bigquery-sdk

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Python BigQuery SDK

Python BigQuery SDK

Provides workflows and best practices for the
google-cloud-bigquery
Python client library (v3.x) covering client setup, querying, schema definition, data loading, and result consumption.
本文提供
google-cloud-bigquery
Python客户端库(v3.x版本)的工作流与最佳实践,涵盖客户端配置、数据查询、Schema定义、数据加载以及结果处理等内容。

Installation

安装

bash
pip install google-cloud-bigquery
bash
pip install google-cloud-bigquery

Optional extras for Arrow/pandas integration

可选:安装用于Arrow/pandas集成的扩展依赖

pip install "google-cloud-bigquery[pandas,pyarrow]"
undefined
pip install "google-cloud-bigquery[pandas,pyarrow]"
undefined

Client Initialisation

客户端初始化

Instantiate
Client
once per process and reuse it. The client is thread-safe.
python
from google.cloud import bigquery
每个进程只需实例化一次
Client
并复用,该客户端支持线程安全。
python
from google.cloud import bigquery

Picks up credentials from GOOGLE_APPLICATION_CREDENTIALS or ADC

从GOOGLE_APPLICATION_CREDENTIALS环境变量或ADC自动获取凭证

client = bigquery.Client(project="my-project")
client = bigquery.Client(project="my-project")

Explicit project + credentials

显式指定项目与凭证

from google.oauth2 import service_account credentials = service_account.Credentials.from_service_account_file("key.json") client = bigquery.Client(project="my-project", credentials=credentials)

**Key rules:**
- Never create a `Client` inside a per-request or per-row function.
- Always set `project` explicitly in production; avoid relying on environment inference.
- Close the client with `client.close()` or use it as a context manager when appropriate.
from google.oauth2 import service_account credentials = service_account.Credentials.from_service_account_file("key.json") client = bigquery.Client(project="my-project", credentials=credentials)

**关键规则:**
- 切勿在每次请求或每一行数据的处理函数内创建`Client`实例。
- 生产环境中务必显式设置`project`参数,避免依赖环境变量自动推断。
- 适时使用`client.close()`关闭客户端,或在合适场景下将其作为上下文管理器使用。

Running Queries

执行查询

Simple query (blocking)

简单查询(阻塞式)

python
query = "SELECT name, age FROM `my-project.my_dataset.users` WHERE age > 18"
rows = client.query_and_wait(query)  # Returns RowIterator directly (v3+)
for row in rows:
    print(row["name"], row.age)
query_and_wait
is preferred over the legacy
client.query(...).result()
pattern for short interactive queries.
python
query = "SELECT name, age FROM `my-project.my_dataset.users` WHERE age > 18"
rows = client.query_and_wait(query)  # 直接返回RowIterator(v3+版本)
for row in rows:
    print(row["name"], row.age)
对于短时间交互式查询,
query_and_wait
是优于传统
client.query(...).result()
模式的推荐方案。

Parameterised queries (mandatory for untrusted input)

参数化查询(处理不可信输入时的强制要求)

Always use query parameters — never string-format user input into SQL.
python
from google.cloud.bigquery import ScalarQueryParameter, QueryJobConfig

config = QueryJobConfig(
    query_parameters=[
        ScalarQueryParameter("min_age", "INT64", 18),
        ScalarQueryParameter("country", "STRING", "US"),
    ]
)
sql = """
    SELECT name FROM `project.dataset.users`
    WHERE age > @min_age AND country = @country
"""
rows = client.query_and_wait(sql, job_config=config)
Parameter types map to BigQuery SQL types:
"STRING"
,
"INT64"
,
"FLOAT64"
,
"BOOL"
,
"TIMESTAMP"
,
"DATE"
,
"BYTES"
.
Use
ArrayQueryParameter
for list inputs:
python
from google.cloud.bigquery import ArrayQueryParameter

config = QueryJobConfig(
    query_parameters=[
        ArrayQueryParameter("ids", "INT64", [1, 2, 3]),
    ]
)
务必使用查询参数——绝不要将用户输入直接通过字符串格式化拼接到SQL中。
python
from google.cloud.bigquery import ScalarQueryParameter, QueryJobConfig

config = QueryJobConfig(
    query_parameters=[
        ScalarQueryParameter("min_age", "INT64", 18),
        ScalarQueryParameter("country", "STRING", "US"),
    ]
)
sql = """
    SELECT name FROM `project.dataset.users`
    WHERE age > @min_age AND country = @country
"""
rows = client.query_and_wait(sql, job_config=config)
参数类型与BigQuery SQL类型对应:
"STRING"
"INT64"
"FLOAT64"
"BOOL"
"TIMESTAMP"
"DATE"
"BYTES"
列表类型输入可使用
ArrayQueryParameter
python
from google.cloud.bigquery import ArrayQueryParameter

config = QueryJobConfig(
    query_parameters=[
        ArrayQueryParameter("ids", "INT64", [1, 2, 3]),
    ]
)

Asynchronous / long-running queries

异步/长时间运行的查询

python
job = client.query(sql, job_config=config)   # Returns QueryJob immediately
python
job = client.query(sql, job_config=config)   # 立即返回QueryJob对象

... do other work ...

... 执行其他任务 ...

rows = job.result(timeout=300) # Block until complete or timeout print(f"Bytes processed: {job.total_bytes_processed}")

Check `job.state` (`"RUNNING"`, `"DONE"`) and `job.error_result` before consuming results.
rows = job.result(timeout=300) # 阻塞直到任务完成或超时 print(f"处理数据量:{job.total_bytes_processed}")

在处理结果前,请检查`job.state`(可选值:`"RUNNING"`、`"DONE"`)与`job.error_result`。

Dry-run (cost estimation)

预运行(成本估算)

python
dry_config = QueryJobConfig(dry_run=True, use_query_cache=False)
job = client.query(sql, job_config=dry_config)
print(f"Estimated bytes: {job.total_bytes_processed}")
python
dry_config = QueryJobConfig(dry_run=True, use_query_cache=False)
job = client.query(sql, job_config=dry_config)
print(f"预估处理数据量:{job.total_bytes_processed}")

Consuming Results

结果处理

Iterate rows

遍历行数据

python
for row in rows:
    value = row["column_name"]   # by name
    value = row[0]               # by position
    value = row.column_name      # attribute access
python
for row in rows:
    value = row["column_name"]   # 通过列名获取
    value = row[0]               # 通过索引位置获取
    value = row.column_name      # 通过属性访问获取

Convert to pandas DataFrame

转换为pandas DataFrame

python
df = rows.to_dataframe()                  # requires pandas + pyarrow
df = rows.to_dataframe(dtypes={"age": "Int64"})
python
df = rows.to_dataframe()                  # 需要安装pandas与pyarrow
df = rows.to_dataframe(dtypes={"age": "Int64"})

Convert to Arrow Table

转换为Arrow Table

python
table = rows.to_arrow()
to_arrow()
is faster than
to_dataframe()
for large result sets; convert after if needed.
python
table = rows.to_arrow()
对于大型结果集,
to_arrow()
to_dataframe()
速度更快;如需DataFormat可在转换后再做处理。

Page size control

分页大小控制

python
rows = client.query_and_wait(sql, max_results=1000)   # cap result rows
python
rows = client.query_and_wait(sql, max_results=1000)   # 限制返回行数

Schema Definition

Schema定义

Define schemas explicitly — never rely on autodetect in production.
python
schema = [
    bigquery.SchemaField("user_id", "INT64", mode="REQUIRED"),
    bigquery.SchemaField("email", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("created_at", "TIMESTAMP", mode="NULLABLE"),
    bigquery.SchemaField(
        "address",
        "RECORD",
        mode="NULLABLE",
        fields=[
            bigquery.SchemaField("city", "STRING"),
            bigquery.SchemaField("postcode", "STRING"),
        ],
    ),
]
ModeMeaning
REQUIRED
NOT NULL; value must be present
NULLABLE
Default; value may be NULL
REPEATED
Array of the given type
Standard SQL types:
STRING
,
BYTES
,
INTEGER
/
INT64
,
FLOAT
/
FLOAT64
,
NUMERIC
,
BIGNUMERIC
,
BOOLEAN
/
BOOL
,
TIMESTAMP
,
DATE
,
TIME
,
DATETIME
,
GEOGRAPHY
,
JSON
,
RECORD
/
STRUCT
.
生产环境中务必显式定义Schema——切勿依赖自动检测功能。
python
schema = [
    bigquery.SchemaField("user_id", "INT64", mode="REQUIRED"),
    bigquery.SchemaField("email", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("created_at", "TIMESTAMP", mode="NULLABLE"),
    bigquery.SchemaField(
        "address",
        "RECORD",
        mode="NULLABLE",
        fields=[
            bigquery.SchemaField("city", "STRING"),
            bigquery.SchemaField("postcode", "STRING"),
        ],
    ),
]
模式说明
REQUIRED
非空约束;必须提供值
NULLABLE
默认值;值可为空
REPEATED
对应数组类型
标准SQL类型:
STRING
BYTES
INTEGER
/
INT64
FLOAT
/
FLOAT64
NUMERIC
BIGNUMERIC
BOOLEAN
/
BOOL
TIMESTAMP
DATE
TIME
DATETIME
GEOGRAPHY
JSON
RECORD
/
STRUCT

Creating Tables

创建表

python
dataset_ref = client.dataset("my_dataset")
table_ref = dataset_ref.table("my_table")
table = bigquery.Table(table_ref, schema=schema)
python
dataset_ref = client.dataset("my_dataset")
table_ref = dataset_ref.table("my_table")
table = bigquery.Table(table_ref, schema=schema)

Time partitioning

按时间分区

table.time_partitioning = bigquery.TimePartitioning( type_=bigquery.TimePartitioningType.DAY, field="created_at", expiration_ms=7 * 24 * 60 * 60 * 1000, # 7 days )
table.time_partitioning = bigquery.TimePartitioning( type_=bigquery.TimePartitioningType.DAY, field="created_at", expiration_ms=7 * 24 * 60 * 60 * 1000, # 7天 )

Clustering

聚类配置

table.clustering_fields = ["country", "user_id"]
table = client.create_table(table, exists_ok=True)
undefined
table.clustering_fields = ["country", "user_id"]
table = client.create_table(table, exists_ok=True)
undefined

Loading Data

数据加载

From a local file

从本地文件加载

python
job_config = bigquery.LoadJobConfig(
    schema=schema,
    source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
)
with open("data.ndjson", "rb") as f:
    job = client.load_table_from_file(f, table_ref, job_config=job_config)
job.result()  # Wait for completion
python
job_config = bigquery.LoadJobConfig(
    schema=schema,
    source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
)
with open("data.ndjson", "rb") as f:
    job = client.load_table_from_file(f, table_ref, job_config=job_config)
job.result()  # 等待任务完成

From Google Cloud Storage

从Google Cloud Storage加载

python
uri = "gs://my-bucket/data/*.parquet"
job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.PARQUET,
    write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
)
job = client.load_table_from_uri(uri, table_ref, job_config=job_config)
job.result()
python
uri = "gs://my-bucket/data/*.parquet"
job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.PARQUET,
    write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
)
job = client.load_table_from_uri(uri, table_ref, job_config=job_config)
job.result()

From a pandas DataFrame

从pandas DataFrame加载

python
job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
job.result()
python
job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
job.result()

Write dispositions

写入策略

ValueBehaviour
WRITE_TRUNCATE
Replace all existing rows
WRITE_APPEND
Add rows to existing data
WRITE_EMPTY
Fail if table already contains data
取值行为
WRITE_TRUNCATE
替换表中所有现有数据
WRITE_APPEND
向表中追加数据
WRITE_EMPTY
若表中已有数据则任务失败

Inserting Rows (Streaming)

插入行数据(流式传输)

Use the Storage Write API via
google-cloud-bigquery-storage
for high-throughput streaming. For simple cases:
python
errors = client.insert_rows_json(table_ref, [
    {"user_id": 1, "email": "a@example.com"},
    {"user_id": 2, "email": "b@example.com"},
])
if errors:
    raise RuntimeError(f"Streaming insert errors: {errors}")
Caution:
insert_rows_json
does not guarantee exactly-once delivery and has per-row cost. Prefer batch load jobs for bulk ingestion.
对于高吞吐量的流式传输,推荐通过
google-cloud-bigquery-storage
使用Storage Write API。简单场景下可使用以下方式:
python
errors = client.insert_rows_json(table_ref, [
    {"user_id": 1, "email": "a@example.com"},
    {"user_id": 2, "email": "b@example.com"},
])
if errors:
    raise RuntimeError(f"流式插入错误:{errors}")
注意:
insert_rows_json
不保证精确一次投递,且按行计费。批量数据导入时优先使用批量加载任务。

Error Handling

错误处理

python
from google.cloud.exceptions import GoogleCloudError
from google.api_core.exceptions import BadRequest, NotFound

try:
    rows = client.query_and_wait(sql)
except BadRequest as exc:
    # SQL syntax / schema errors
    print(f"Query error: {exc.message}")
except NotFound as exc:
    print(f"Table or dataset not found: {exc}")
except GoogleCloudError as exc:
    print(f"API error: {exc}")
Always inspect
job.errors
after async jobs:
python
job = client.query(sql)
job.result()
if job.errors:
    for err in job.errors:
        print(err["message"], err.get("reason"))
python
from google.cloud.exceptions import GoogleCloudError
from google.api_core.exceptions import BadRequest, NotFound

try:
    rows = client.query_and_wait(sql)
except BadRequest as exc:
    # SQL语法/Schema错误
    print(f"查询错误:{exc.message}")
except NotFound as exc:
    print(f"表或数据集不存在:{exc}")
except GoogleCloudError as exc:
    print(f"API调用错误:{exc}")
始终在异步任务完成后检查
job.errors
python
job = client.query(sql)
job.result()
if job.errors:
    for err in job.errors:
        print(err["message"], err.get("reason"))

Quick Reference: Key Classes

快速参考:关键类

ClassModulePurpose
Client
google.cloud.bigquery.client
Entry point for all API calls
QueryJobConfig
google.cloud.bigquery.job
Query execution options
LoadJobConfig
google.cloud.bigquery.job
Load job options
SchemaField
google.cloud.bigquery.schema
Column definition
Table
google.cloud.bigquery.table
Table resource
Dataset
google.cloud.bigquery.dataset
Dataset resource
TimePartitioning
google.cloud.bigquery.table
Partitioning config
ScalarQueryParameter
google.cloud.bigquery.query
Single-value param
ArrayQueryParameter
google.cloud.bigquery.query
Array param
StructQueryParameter
google.cloud.bigquery.query
Struct param
WriteDisposition
google.cloud.bigquery.enums
Overwrite vs append
SourceFormat
google.cloud.bigquery.enums
Input file format
类名模块用途
Client
google.cloud.bigquery.client
所有API调用的入口类
QueryJobConfig
google.cloud.bigquery.job
查询执行配置选项
LoadJobConfig
google.cloud.bigquery.job
加载任务配置选项
SchemaField
google.cloud.bigquery.schema
列定义类
Table
google.cloud.bigquery.table
表资源类
Dataset
google.cloud.bigquery.dataset
数据集资源类
TimePartitioning
google.cloud.bigquery.table
分区配置类
ScalarQueryParameter
google.cloud.bigquery.query
单值参数类
ArrayQueryParameter
google.cloud.bigquery.query
数组参数类
StructQueryParameter
google.cloud.bigquery.query
结构体参数类
WriteDisposition
google.cloud.bigquery.enums
写入策略枚举
SourceFormat
google.cloud.bigquery.enums
输入文件格式枚举

Additional Resources

附加资源

Reference Files

参考文档

For deeper coverage consult:
  • references/advanced-patterns.md
    — Schema evolution, partitioned table queries, export jobs, retry configuration, DB-API usage, and performance patterns
如需深入了解,请查阅:
  • references/advanced-patterns.md
    —— Schema演进、分区表查询、导出任务、重试配置、DB-API使用以及性能优化模式

Official Docs

官方文档