Loading...
Loading...
This skill should be used when the user asks to "query BigQuery with Python", "use the google-cloud-bigquery SDK", "load data into BigQuery", "define a BigQuery schema", or needs guidance on best practices for the Python BigQuery client library.
npx skill4agent add the-perfect-developer/the-perfect-opencode python-bigquery-sdkgoogle-cloud-bigquerypip install google-cloud-bigquery
# Optional extras for Arrow/pandas integration
pip install "google-cloud-bigquery[pandas,pyarrow]"Clientfrom google.cloud import bigquery
# Picks up credentials from GOOGLE_APPLICATION_CREDENTIALS or ADC
client = bigquery.Client(project="my-project")
# Explicit project + credentials
from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_file("key.json")
client = bigquery.Client(project="my-project", credentials=credentials)Clientprojectclient.close()query = "SELECT name, age FROM `my-project.my_dataset.users` WHERE age > 18"
rows = client.query_and_wait(query) # Returns RowIterator directly (v3+)
for row in rows:
print(row["name"], row.age)query_and_waitclient.query(...).result()from google.cloud.bigquery import ScalarQueryParameter, QueryJobConfig
config = QueryJobConfig(
query_parameters=[
ScalarQueryParameter("min_age", "INT64", 18),
ScalarQueryParameter("country", "STRING", "US"),
]
)
sql = """
SELECT name FROM `project.dataset.users`
WHERE age > @min_age AND country = @country
"""
rows = client.query_and_wait(sql, job_config=config)"STRING""INT64""FLOAT64""BOOL""TIMESTAMP""DATE""BYTES"ArrayQueryParameterfrom google.cloud.bigquery import ArrayQueryParameter
config = QueryJobConfig(
query_parameters=[
ArrayQueryParameter("ids", "INT64", [1, 2, 3]),
]
)job = client.query(sql, job_config=config) # Returns QueryJob immediately
# ... do other work ...
rows = job.result(timeout=300) # Block until complete or timeout
print(f"Bytes processed: {job.total_bytes_processed}")job.state"RUNNING""DONE"job.error_resultdry_config = QueryJobConfig(dry_run=True, use_query_cache=False)
job = client.query(sql, job_config=dry_config)
print(f"Estimated bytes: {job.total_bytes_processed}")for row in rows:
value = row["column_name"] # by name
value = row[0] # by position
value = row.column_name # attribute accessdf = rows.to_dataframe() # requires pandas + pyarrow
df = rows.to_dataframe(dtypes={"age": "Int64"})table = rows.to_arrow()to_arrow()to_dataframe()rows = client.query_and_wait(sql, max_results=1000) # cap result rowsschema = [
bigquery.SchemaField("user_id", "INT64", mode="REQUIRED"),
bigquery.SchemaField("email", "STRING", mode="REQUIRED"),
bigquery.SchemaField("created_at", "TIMESTAMP", mode="NULLABLE"),
bigquery.SchemaField(
"address",
"RECORD",
mode="NULLABLE",
fields=[
bigquery.SchemaField("city", "STRING"),
bigquery.SchemaField("postcode", "STRING"),
],
),
]| Mode | Meaning |
|---|---|
| NOT NULL; value must be present |
| Default; value may be NULL |
| Array of the given type |
STRINGBYTESINTEGERINT64FLOATFLOAT64NUMERICBIGNUMERICBOOLEANBOOLTIMESTAMPDATETIMEDATETIMEGEOGRAPHYJSONRECORDSTRUCTdataset_ref = client.dataset("my_dataset")
table_ref = dataset_ref.table("my_table")
table = bigquery.Table(table_ref, schema=schema)
# Time partitioning
table.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field="created_at",
expiration_ms=7 * 24 * 60 * 60 * 1000, # 7 days
)
# Clustering
table.clustering_fields = ["country", "user_id"]
table = client.create_table(table, exists_ok=True)job_config = bigquery.LoadJobConfig(
schema=schema,
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
)
with open("data.ndjson", "rb") as f:
job = client.load_table_from_file(f, table_ref, job_config=job_config)
job.result() # Wait for completionuri = "gs://my-bucket/data/*.parquet"
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET,
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
)
job = client.load_table_from_uri(uri, table_ref, job_config=job_config)
job.result()job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
job.result()| Value | Behaviour |
|---|---|
| Replace all existing rows |
| Add rows to existing data |
| Fail if table already contains data |
google-cloud-bigquery-storageerrors = client.insert_rows_json(table_ref, [
{"user_id": 1, "email": "a@example.com"},
{"user_id": 2, "email": "b@example.com"},
])
if errors:
raise RuntimeError(f"Streaming insert errors: {errors}")insert_rows_jsonfrom google.cloud.exceptions import GoogleCloudError
from google.api_core.exceptions import BadRequest, NotFound
try:
rows = client.query_and_wait(sql)
except BadRequest as exc:
# SQL syntax / schema errors
print(f"Query error: {exc.message}")
except NotFound as exc:
print(f"Table or dataset not found: {exc}")
except GoogleCloudError as exc:
print(f"API error: {exc}")job.errorsjob = client.query(sql)
job.result()
if job.errors:
for err in job.errors:
print(err["message"], err.get("reason"))| Class | Module | Purpose |
|---|---|---|
| | Entry point for all API calls |
| | Query execution options |
| | Load job options |
| | Column definition |
| | Table resource |
| | Dataset resource |
| | Partitioning config |
| | Single-value param |
| | Array param |
| | Struct param |
| | Overwrite vs append |
| | Input file format |
references/advanced-patterns.md