data-autocleaning
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseData Autocleaning Skill
数据自动清洗Skill
Automated data profiling, quality assessment, and transformation for data
sourced from BigQuery or Google Cloud Storage (GCS).
为来自BigQuery或**Google Cloud Storage (GCS)**的数据源提供自动化数据剖析、质量评估与转换功能。
When to Use
使用场景
[!IMPORTANT]You MUST use this skill for ANY task where the source is BigQuery or GCS — including seemingly simple operations like "move data" or "copy table".
- Apply to all operations on new and existing sources: copying, moving, appending, ingesting, or extracting data.
- Apply to the source node specifically, not to subsequent pipeline steps.
- Never skip Dataplex profiling (Steps 1 and 3). Always use Dataplex — not ad-hoc BigQuery profiling.
[!IMPORTANT]若数据源为BigQuery或GCS,任何任务都必须使用本Skill——包括看似简单的操作,如“移动数据”或“复制表”。
- 适用于对新数据源和现有数据源的所有操作:复制、移动、追加、摄取或提取数据。
- 需专门应用于源节点,而非后续管道步骤。
- 绝不要跳过Dataplex剖析(步骤1和步骤3)。始终使用Dataplex——不要使用临时BigQuery剖析工具。
Task Execution Workflow
任务执行流程
Step 1: Preliminary Checks (Before Implementation Planning)
步骤1:初步检查(实施规划前)
Perform these checks before generating the .
implementation_plan.md-
Check Eligibility — You MUST confirm the source is a BigQuery table or GCS source.
-
Gather Data Profile via Dataplex:
-
GCS sources: For GCS sources, you MUST create an external table first before running the dataplex scan.
-
Wait for results: You MUST NOT proceed until the Dataplex profile is available, unless user scan approval was denied.
-
Use the profile as input for cleansing and schema mapping decisions. The transformations MUST NOT be finalized before profile information is available (unless scan was denied).
-
Commands:
-
Obtain user approval: Present thescan command to the user and obtain explicit approval before executing it. Use the following template to present the command:
scripts/dataplex_scanner.py- Command: (Fetch full arguments from step 6 below)
python3 scripts/dataplex_scanner.py ... - Summary: The script automates Dataplex data profiling. It checks table sizes, applies dynamic sampling for large tables (>1M rows) to reduce costs, skips empty tables, executes concurrent scans for multiple tables, and polls for results automatically.
- Value Add: Enables deep data analysis (null rates, distinct values, distributions) allowing data-driven cleansing decisions. It helps identify hidden anomalies (garbage values, format variance) to guide accurate transformations and verifies that the cleaning logic resolves them without introducing regressions.
- Scope: The approval obtained here covers all executions of this scanner script for this task (including verification steps).
- Command:
-
Run thescript located in the same directory as this
scripts/dataplex_scanner.pyfile. This script handles concurrent scan creation, dynamic sampling for large tables, and polling for results. Use --help to learn more.SKILL.md -
The script will save the full results as JSON files in the specified output directory.
-
[!IMPORTANT] The location MUST be a specific Google Cloud region like; multi-regions like
us-central1are not supported in Dataplex scan.us -
If there are multiple tables to scan, provide them all in theargument to run them concurrently.
--tables -
Use the following command template:bash
python3 scripts/dataplex_scanner.py \ --tables <project.dataset.table> <project.catalog.namespace.table> \ --location <location> \ --output-dir <output_dir>Note: The script accepts table IDs in the formatfor BigQuery tables andproject.dataset.tablefor BigLake Iceberg tables.project.catalog.namespace.table
-
-
-
Fetch Schema & Samples — Usecommands to fetch schema and sample data for both source and destination tables.
bq
在生成之前执行以下检查。
implementation_plan.md-
资格验证——必须确认数据源为BigQuery表或GCS源。
-
通过Dataplex收集数据剖析信息:
-
GCS数据源:对于GCS数据源,必须先创建外部表,再运行Dataplex扫描。
-
等待结果:必须等到Dataplex剖析结果可用后再继续,除非用户拒绝了扫描请求。
-
将剖析结果作为清洗和schema映射决策的输入。在获取到剖析信息前(除非扫描被拒绝),不得最终确定转换逻辑。
-
命令:
-
获取用户批准:向用户展示扫描命令,并在执行前获得明确批准。使用以下模板展示命令:
scripts/dataplex_scanner.py- 命令:(从下方步骤6获取完整参数)
python3 scripts/dataplex_scanner.py ... - 说明:该脚本可自动化Dataplex数据剖析。它会检查表大小,对大型表(>100万行)应用动态抽样以降低成本,跳过空表,并发扫描多张表,并自动轮询结果。
- 价值:支持深度数据分析(空值率、不同值、分布情况),助力基于数据的清洗决策。有助于识别隐藏异常(无效值、格式差异),指导精准转换,并验证清洗逻辑能否解决这些异常且不引入回归问题。
- 范围:此处获得的批准涵盖本次任务中该扫描脚本的所有执行(包括验证步骤)。
- 命令:
-
运行与本文件同一目录下的
SKILL.md脚本。该脚本支持并发创建扫描任务、对大型表进行动态抽样,并自动轮询结果。使用--help了解更多信息。scripts/dataplex_scanner.py -
脚本会将完整结果保存为JSON文件至指定输出目录。
-
[!IMPORTANT] 位置必须是特定的Google Cloud区域,如;Dataplex扫描不支持
us-central1这类多区域。us -
若需扫描多张表,将所有表名放入参数中以并发运行。
--tables -
使用以下命令模板:bash
python3 scripts/dataplex_scanner.py \ --tables <project.dataset.table> <project.catalog.namespace.table> \ --location <location> \ --output-dir <output_dir>注意:脚本接受的表ID格式为:BigQuery表使用,BigLake Iceberg表使用project.dataset.table。project.catalog.namespace.table
-
-
-
获取Schema与样本数据——使用命令获取源表和目标表的schema与样本数据。
bq
Step 1.5: Implementation Plan Requirements
步骤1.5:实施规划要求
- Your MUST include a Profiling Evidence section. Note: If scan execution was denied by the user, document the denial reason here instead of Job IDs.
implementation_plan.md
markdown
undefined- 你的必须包含剖析证据部分。注意:若用户拒绝了扫描请求,此处需记录拒绝原因,而非任务ID。
implementation_plan.md
markdown
undefinedProfiling Evidence
剖析证据
- Dataplex Data Profile Job ID: <JOB_ID>
- Profile Result Summary: <Brief summary of key findings, e.g., % nulls, distinct values>
1. Your `implementation_plan.md` **MUST** include a step to generate cleansing
SQL transformations based on the profile output and instructions in Step 2:
Generate Transformations.
2. Your `implementation_plan.md` **MUST** also reference Step 3 (Quality
Review) under its **Verification Plan** section.
> [!CAUTION]
>
> **Do not proceed to implementation** until both sections are completed. You
> MUST ensure that the verification phase only validates that your
> transformations successfully addressed the anomalies found in Step 1.- Dataplex数据剖析任务ID: <JOB_ID>
- 剖析结果摘要: <关键发现的简要总结,如空值占比、不同值数量>
1. 你的`implementation_plan.md`**必须**包含一个步骤,基于剖析输出和步骤2(生成转换逻辑)中的说明生成清洗SQL转换语句。
2. 你的`implementation_plan.md`还需在**验证规划**部分中引用步骤3(质量审核)。
> [!CAUTION]
>
> **完成上述两部分前,不要进入实施阶段**。必须确保验证阶段仅验证你的转换逻辑是否成功解决了步骤1中发现的异常。Step 2: Generate Transformations
步骤2:生成转换逻辑
Schema Alignment
Schema对齐
- Match the destination table schema (types and names) if provided.
- Do not perform column splits, merges, or other schema operations when no destination table is specified.
- 若提供了目标表schema,需匹配目标表的schema(类型和名称)。
- 未指定目标表时,不要执行列拆分、合并或其他schema操作。
Data Cleaning Rules
数据清洗规则
- Garbage Values: Drop or convert to only for malformed data (e.g., unparseable dates, zero-length strings for non-nullable integers).
NULL - Unit Normalization: Standardize measurable units (e.g., →
'C') to the most common unit. If units are too varied (e.g.,'F',mg), leave as-is.liter - Type Conversion: Use with
COALESCEfunctions for multiple date/time/datetime/timestamp formats. Fetch diverse samples when source data shows high variance.SAFE.PARSE_*
- 无效值:仅对格式错误的数据(如无法解析的日期、非空整数列中的空字符串)进行删除或转换为操作。
NULL - 单位标准化:将可测量单位(如→
'C')标准化为最常用的单位。若单位差异过大(如'F'、mg),则保持原样。liter - 类型转换:对多种日期/时间/日期时间/时间戳格式,使用结合
COALESCE函数。若源数据格式差异大,需获取多样本数据。SAFE.PARSE_*
JSON Data Handling
JSON数据处理
- Parsing: Use to cast JSON strings to
SAFE.PARSE_JSONtype. Never use deprecatedJSON.JSON_EXTRACT_* - Extraction: Flatten or extract fields only if a destination schema requires it.
- Accessors: Use ,
JSON_VALUE,JSON_QUERY,JSON_QUERY_ARRAYwithoutJSON_VALUE_ARRAYprefix (they are safe by default).SAFE. - Schema mapping: When a destination schema is provided, extract JSON fields to match target column names and types.
- NULL handling: If returns NULL, keep the original string and note the invalid JSON in the cleaning summary.
SAFE.PARSE_JSON
- 解析:使用将JSON字符串转换为
SAFE.PARSE_JSON类型。绝不要使用已弃用的JSON函数。JSON_EXTRACT_* - 提取:仅当目标schema要求时,才进行扁平化或字段提取操作。
- 访问器:使用、
JSON_VALUE、JSON_QUERY、JSON_QUERY_ARRAY,无需添加JSON_VALUE_ARRAY前缀(它们本身已具备安全性)。SAFE. - Schema映射:若提供了目标schema,提取JSON字段以匹配目标列的名称和类型。
- NULL处理:若返回NULL,保留原始字符串,并在清洗摘要中记录无效JSON的情况。
SAFE.PARSE_JSON
Array Data Handling
数组数据处理
- Unnesting: Unnest array fields only if a destination schema explicitly requires it.
- Type Casting: Attempt to cast elements to the most appropriate common type.
- CRITICAL: Filter out elements after
NULL(e.g., usingSAFE_CAST) as BigQuery arrays cannot containARRAYs.NULL - Normalization: Only trim whitespace on string fields. Make sure to
PRESERVE CASE. DO NOT perform case conversions (e.g., ,
LOWER()) unless explicitly required.UPPER() - Filtering: Filter out values using
NULL.ARRAY_FILTER(array_column, e -> e IS NOT NULL) - Deduplication: Use for case-sensitive deduplication.
ARRAY(SELECT DISTINCT x FROM UNNEST(array_column)) - Transformations: Use or
ARRAY_TRANSFORM/UNNESTfor element-wise changes (e.g., date parsing).ARRAY_AGG - Restructuring: Use to expand to rows, or
UNNESTto group rows into an array, as required by the destination schema.ARRAY_AGG
- 展开:仅当目标schema明确要求时,才展开数组字段。
- 类型转换:尝试将元素转换为最合适的通用类型。
- 关键注意事项:后需过滤掉
SAFE_CAST元素(如使用NULL),因为BigQuery数组不能包含ARRAY值。NULL - 标准化:仅对字符串字段去除空格。务必保留大小写。除非明确要求,否则不要执行大小写转换(如、
LOWER())。UPPER() - 过滤:使用过滤掉
ARRAY_FILTER(array_column, e -> e IS NOT NULL)值。NULL - 去重:使用进行区分大小写的去重操作。
ARRAY(SELECT DISTINCT x FROM UNNEST(array_column)) - 转换:使用或
ARRAY_TRANSFORM/UNNEST进行元素级别的修改(如日期解析)。ARRAY_AGG - 重构:根据目标schema要求,使用将数组展开为行,或使用
UNNEST将行分组为数组。ARRAY_AGG
STRUCT/Record Data Handling
STRUCT/记录数据处理
- Extraction: Extract fields to top-level columns only if the destination schema requires it.
- Type Casting: Cast each field using based on the destination schema or inferred profile.
SAFE_CAST - Normalization: Only trim whitespace on string fields. Make sure to
PRESERVE CASE. DO NOT perform case conversions (e.g., ,
LOWER()) unless explicitly required.UPPER() - Field Mapping: Map directly if structures align; use dot notation (e.g.,
) to extract; or use
struct.fieldconstructor to group columns.STRUCT() - Schema Alignment: Populate missing fields with and drop fields not present in the destination schema.
NULL
- 提取:仅当目标schema要求时,才将字段提取至顶层列。
- 类型转换:根据目标schema或推断的剖析信息,使用转换每个字段。
SAFE_CAST - 标准化:仅对字符串字段去除空格。务必保留大小写。除非明确要求,否则不要执行大小写转换(如、
LOWER())。UPPER() - 字段映射:若结构对齐则直接映射;使用点符号(如)提取字段;或使用
struct.field构造函数对列进行分组。STRUCT() - Schema对齐:用填充缺失字段,并删除目标schema中不存在的字段。
NULL
Step 3: Quality Review & Profiling
步骤3:质量审核与剖析
[!IMPORTANT]You MUST verify transformations strictly using the protocol below before completing the task. Never skip this step. Use Dataplex profiling only (unless scan was denied by the user) — not ad-hoc SQL queries.
Quality review protocol:
-
Extract thequery containing all generated transformations (autocleaning, schema mapping, JSON extractions).
SELECT -
Create a temporary sample output table (max 1M rows, 1-hour TTL) by running the transformation query.
-
Fix any runtime errors and re-run until the query succeeds.
-
Profile the temporary sample output table using Dataplex:
- Verify approval: If scan execution was approved in Step 1, proceed.
If approval was DENIED in Step 1, DO NOT run the scanner script and DO
NOT ask the user for approval again. Proceed with manual verification
using sample queries to ensure transformations were successful.
bq - Run the script on the temporary table.
scripts/dataplex_scanner.py - The script will automatically wait for the profile job to finish and save the results as JSON.
- Verify approval: If scan execution was approved in Step 1, proceed.
If approval was DENIED in Step 1, DO NOT run the scanner script and DO
NOT ask the user for approval again. Proceed with manual verification
using
-
Compare profiles (Skip if scans were denied) — Check the new profile against the Step 1 profile for every transformed column:markdown
| Anomaly Type | Threshold | | --- | --- | | **NULL increase** | >1% increase compared to source (unless expected) | | **Value range shift** | Unexpected ranges or formats | -
Iterate on anomalies — For each anomaly:
- Identify: Query samples where source is but transformed value
NOT NULL.IS NULL - Fix: Update the transformation logic.
- Repeat: Re-run Step 3 until the anomaly is resolved.
- Identify: Query samples where source is
[!IMPORTANT]完成任务前,必须严格按照以下流程验证转换逻辑。绝不要跳过此步骤。仅使用Dataplex剖析(除非用户拒绝了扫描请求)——不要使用临时SQL查询。
质量审核流程:
-
提取包含所有生成的转换逻辑(自动清洗、schema映射、JSON提取)的查询语句。
SELECT -
通过运行转换查询创建一个临时样本输出表(最多100万行,TTL为1小时)。
-
修复所有运行时错误,重新运行直到查询成功。
-
使用Dataplex剖析临时样本输出表:
- 验证批准状态:若步骤1中已批准扫描,则继续执行。若步骤1中拒绝了扫描请求,不要运行扫描脚本,也不要再向用户请求批准。使用样本查询进行手动验证,确保转换逻辑成功。
bq - 在临时表上运行脚本。
scripts/dataplex_scanner.py - 脚本会自动等待剖析任务完成,并将结果保存为JSON文件。
- 验证批准状态:若步骤1中已批准扫描,则继续执行。若步骤1中拒绝了扫描请求,不要运行扫描脚本,也不要再向用户请求批准。使用
-
对比剖析结果(若扫描被拒绝则跳过)——针对每一个转换后的列,将新的剖析结果与步骤1的剖析结果进行对比:markdown
| 异常类型 | 阈值 | | --- | --- | | **空值增加** | 与源表相比增加超过1%(预期情况除外) | | **值范围偏移** | 出现意外的范围或格式 | -
针对异常进行迭代优化——对于每个异常:
- 识别:查询源表值但转换后值
NOT NULL的样本数据。IS NULL - 修复:更新转换逻辑。
- 重复:重复步骤3直到异常解决。
- 识别:查询源表值
Step 3.5: Quality Review Evidence Requirements
步骤3.5:质量审核证据要求
Your MUST include a Quality Review Profiling Evidence
section. Note: If scan execution was denied by the user, document the denial
reason here instead of Job IDs.
walkthrough.mdmarkdown
undefined你的必须包含质量审核剖析证据部分。注意:若用户拒绝了扫描请求,此处需记录拒绝原因,而非任务ID。
walkthrough.mdmarkdown
undefinedQuality Review Profiling Evidence
质量审核剖析证据
- Post-Transformation Dataplex Profile Job ID: <JOB_ID>
- Profile Comparison Summary: <Detailed comparison between initial and final profiles per column>
> [!CAUTION]
>
> **Do not** conclude the task or ask for user review until this section is
> filled and the profile comparison is documented.- 转换后Dataplex剖析任务ID: <JOB_ID>
- 剖析结果对比摘要: <每列初始与最终剖析结果的详细对比>
> [!CAUTION]
>
> **完成此部分并记录剖析结果对比前,不要结束任务或请求用户审核**。Step 4: Documentation
步骤4:文档记录
Your must contain a table for each transformation in the
following format:
walkthrough.mdmarkdown
| Field | Description |
| --- | --- |
| **Destination schema considered** | The target column/type being matched |
| **Issue Detected** | What data quality problem was found |
| **Transformation Applied** | The SQL logic used to fix it |
| **Benefit** | Why this transformation improves the data |Include a summary of all quality review steps and profiling evidence.
你的必须为每个转换逻辑包含如下格式的表格:
walkthrough.mdmarkdown
| 字段 | 描述 |
| --- | --- |
| **参考的目标schema** | 匹配的目标列/类型 |
| **发现的问题** | 发现的数据质量问题 |
| **应用的转换逻辑** | 用于修复问题的SQL逻辑 |
| **收益** | 该转换逻辑如何提升数据质量 |需包含所有质量审核步骤和剖析证据的摘要。
Definition of Done
完成标准
- All source data quality issues are identified and addressed via SQL transformations.
- Verification MUST be completed using the Quality review protocol and documented with evidence.
- The verification step MUST only test the changes and should succeed when the sql is executed.
- Transformations align with the target schema if provided.
- Cleaning summary is provided with clear justification for each transformation.
- If skipped, the reason is clearly stated (e.g., ineligible source).
- 所有源数据质量问题均已识别,并通过SQL转换逻辑解决。
- 必须使用质量审核流程完成验证,并记录相关证据。
- 验证步骤必须仅测试变更内容,且SQL执行应成功。
- 若提供了目标schema,转换逻辑需与目标schema对齐。
- 提供清洗摘要,为每个转换逻辑提供清晰的理由。
- 若未使用本Skill,需明确说明原因(如数据源不符合要求)。