data-autocleaning

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Data Autocleaning Skill

数据自动清洗Skill

Automated data profiling, quality assessment, and transformation for data sourced from BigQuery or Google Cloud Storage (GCS).
为来自BigQuery或**Google Cloud Storage (GCS)**的数据源提供自动化数据剖析、质量评估与转换功能。

When to Use

使用场景

[!IMPORTANT]
You MUST use this skill for ANY task where the source is BigQuery or GCS — including seemingly simple operations like "move data" or "copy table".
  • Apply to all operations on new and existing sources: copying, moving, appending, ingesting, or extracting data.
  • Apply to the source node specifically, not to subsequent pipeline steps.
  • Never skip Dataplex profiling (Steps 1 and 3). Always use Dataplex — not ad-hoc BigQuery profiling.
[!IMPORTANT]
若数据源为BigQuery或GCS,任何任务都必须使用本Skill——包括看似简单的操作,如“移动数据”或“复制表”。
  • 适用于对新数据源和现有数据源的所有操作:复制、移动、追加、摄取或提取数据。
  • 需专门应用于源节点,而非后续管道步骤。
  • 绝不要跳过Dataplex剖析(步骤1和步骤3)。始终使用Dataplex——不要使用临时BigQuery剖析工具。

Task Execution Workflow

任务执行流程

Step 1: Preliminary Checks (Before Implementation Planning)

步骤1:初步检查(实施规划前)

Perform these checks before generating the
implementation_plan.md
.
  1. Check Eligibility — You MUST confirm the source is a BigQuery table or GCS source.
  2. Gather Data Profile via Dataplex:
    • GCS sources: For GCS sources, you MUST create an external table first before running the dataplex scan.
    • Wait for results: You MUST NOT proceed until the Dataplex profile is available, unless user scan approval was denied.
    • Use the profile as input for cleansing and schema mapping decisions. The transformations MUST NOT be finalized before profile information is available (unless scan was denied).
    • Commands:
      1. Obtain user approval: Present the
        scripts/dataplex_scanner.py
        scan command to the user and obtain explicit approval before executing it. Use the following template to present the command:
        • Command:
          python3 scripts/dataplex_scanner.py ...
          (Fetch full arguments from step 6 below)
        • Summary: The script automates Dataplex data profiling. It checks table sizes, applies dynamic sampling for large tables (>1M rows) to reduce costs, skips empty tables, executes concurrent scans for multiple tables, and polls for results automatically.
        • Value Add: Enables deep data analysis (null rates, distinct values, distributions) allowing data-driven cleansing decisions. It helps identify hidden anomalies (garbage values, format variance) to guide accurate transformations and verifies that the cleaning logic resolves them without introducing regressions.
        • Scope: The approval obtained here covers all executions of this scanner script for this task (including verification steps).
      2. Run the
        scripts/dataplex_scanner.py
        script located in the same directory as this
        SKILL.md
        file. This script handles concurrent scan creation, dynamic sampling for large tables, and polling for results. Use --help to learn more.
      3. The script will save the full results as JSON files in the specified output directory.
      4. [!IMPORTANT] The location MUST be a specific Google Cloud region like
        us-central1
        ; multi-regions like
        us
        are not supported in Dataplex scan.
      5. If there are multiple tables to scan, provide them all in the
        --tables
        argument to run them concurrently.
      6. Use the following command template:
        bash
        python3 scripts/dataplex_scanner.py \
          --tables <project.dataset.table> <project.catalog.namespace.table> \
          --location <location> \
          --output-dir <output_dir>
        Note: The script accepts table IDs in the format
        project.dataset.table
        for BigQuery tables and
        project.catalog.namespace.table
        for BigLake Iceberg tables.
  3. Fetch Schema & Samples — Use
    bq
    commands to fetch schema and sample data for both source and destination tables.
在生成
implementation_plan.md
之前执行以下检查。
  1. 资格验证——必须确认数据源为BigQuery表或GCS源。
  2. 通过Dataplex收集数据剖析信息
    • GCS数据源:对于GCS数据源,必须先创建外部表,再运行Dataplex扫描。
    • 等待结果:必须等到Dataplex剖析结果可用后再继续,除非用户拒绝了扫描请求。
    • 将剖析结果作为清洗和schema映射决策的输入。在获取到剖析信息前(除非扫描被拒绝),不得最终确定转换逻辑。
    • 命令
      1. 获取用户批准:向用户展示
        scripts/dataplex_scanner.py
        扫描命令,并在执行前获得明确批准。使用以下模板展示命令:
        • 命令
          python3 scripts/dataplex_scanner.py ...
          (从下方步骤6获取完整参数)
        • 说明:该脚本可自动化Dataplex数据剖析。它会检查表大小,对大型表(>100万行)应用动态抽样以降低成本,跳过空表,并发扫描多张表,并自动轮询结果。
        • 价值:支持深度数据分析(空值率、不同值、分布情况),助力基于数据的清洗决策。有助于识别隐藏异常(无效值、格式差异),指导精准转换,并验证清洗逻辑能否解决这些异常且不引入回归问题。
        • 范围:此处获得的批准涵盖本次任务中该扫描脚本的所有执行(包括验证步骤)。
      2. 运行与本
        SKILL.md
        文件同一目录下的
        scripts/dataplex_scanner.py
        脚本。该脚本支持并发创建扫描任务、对大型表进行动态抽样,并自动轮询结果。使用--help了解更多信息。
      3. 脚本会将完整结果保存为JSON文件至指定输出目录。
      4. [!IMPORTANT] 位置必须是特定的Google Cloud区域,如
        us-central1
        ;Dataplex扫描不支持
        us
        这类多区域。
      5. 若需扫描多张表,将所有表名放入
        --tables
        参数中以并发运行。
      6. 使用以下命令模板:
        bash
        python3 scripts/dataplex_scanner.py \
          --tables <project.dataset.table> <project.catalog.namespace.table> \
          --location <location> \
          --output-dir <output_dir>
        注意:脚本接受的表ID格式为:BigQuery表使用
        project.dataset.table
        ,BigLake Iceberg表使用
        project.catalog.namespace.table
  3. 获取Schema与样本数据——使用
    bq
    命令获取源表和目标表的schema与样本数据。

Step 1.5: Implementation Plan Requirements

步骤1.5:实施规划要求

  1. Your
    implementation_plan.md
    MUST include a Profiling Evidence section. Note: If scan execution was denied by the user, document the denial reason here instead of Job IDs.
markdown
undefined
  1. 你的
    implementation_plan.md
    必须包含剖析证据部分。注意:若用户拒绝了扫描请求,此处需记录拒绝原因,而非任务ID。
markdown
undefined

Profiling Evidence

剖析证据

  • Dataplex Data Profile Job ID: <JOB_ID>
  • Profile Result Summary: <Brief summary of key findings, e.g., % nulls, distinct values>

1.  Your `implementation_plan.md` **MUST** include a step to generate cleansing
    SQL transformations based on the profile output and instructions in Step 2:
    Generate Transformations.
2.  Your `implementation_plan.md` **MUST** also reference Step 3 (Quality
    Review) under its **Verification Plan** section.

> [!CAUTION]
>
> **Do not proceed to implementation** until both sections are completed. You
> MUST ensure that the verification phase only validates that your
> transformations successfully addressed the anomalies found in Step 1.
  • Dataplex数据剖析任务ID: <JOB_ID>
  • 剖析结果摘要: <关键发现的简要总结,如空值占比、不同值数量>

1.  你的`implementation_plan.md`**必须**包含一个步骤,基于剖析输出和步骤2(生成转换逻辑)中的说明生成清洗SQL转换语句。
2.  你的`implementation_plan.md`还需在**验证规划**部分中引用步骤3(质量审核)。

> [!CAUTION]
>
> **完成上述两部分前,不要进入实施阶段**。必须确保验证阶段仅验证你的转换逻辑是否成功解决了步骤1中发现的异常。

Step 2: Generate Transformations

步骤2:生成转换逻辑

Schema Alignment

Schema对齐

  • Match the destination table schema (types and names) if provided.
  • Do not perform column splits, merges, or other schema operations when no destination table is specified.
  • 若提供了目标表schema,需匹配目标表的schema(类型和名称)。
  • 未指定目标表时,不要执行列拆分、合并或其他schema操作。

Data Cleaning Rules

数据清洗规则

  • Garbage Values: Drop or convert to
    NULL
    only for malformed data (e.g., unparseable dates, zero-length strings for non-nullable integers).
  • Unit Normalization: Standardize measurable units (e.g.,
    'C'
    'F'
    ) to the most common unit. If units are too varied (e.g.,
    mg
    ,
    liter
    ), leave as-is.
  • Type Conversion: Use
    COALESCE
    with
    SAFE.PARSE_*
    functions for multiple date/time/datetime/timestamp formats. Fetch diverse samples when source data shows high variance.
  • 无效值:仅对格式错误的数据(如无法解析的日期、非空整数列中的空字符串)进行删除或转换为
    NULL
    操作。
  • 单位标准化:将可测量单位(如
    'C'
    'F'
    )标准化为最常用的单位。若单位差异过大(如
    mg
    liter
    ),则保持原样。
  • 类型转换:对多种日期/时间/日期时间/时间戳格式,使用
    COALESCE
    结合
    SAFE.PARSE_*
    函数。若源数据格式差异大,需获取多样本数据。

JSON Data Handling

JSON数据处理

  • Parsing: Use
    SAFE.PARSE_JSON
    to cast JSON strings to
    JSON
    type. Never use deprecated
    JSON_EXTRACT_*
    .
  • Extraction: Flatten or extract fields only if a destination schema requires it.
  • Accessors: Use
    JSON_VALUE
    ,
    JSON_QUERY
    ,
    JSON_QUERY_ARRAY
    ,
    JSON_VALUE_ARRAY
    without
    SAFE.
    prefix (they are safe by default).
  • Schema mapping: When a destination schema is provided, extract JSON fields to match target column names and types.
  • NULL handling: If
    SAFE.PARSE_JSON
    returns NULL, keep the original string and note the invalid JSON in the cleaning summary.
  • 解析:使用
    SAFE.PARSE_JSON
    将JSON字符串转换为
    JSON
    类型。绝不要使用已弃用的
    JSON_EXTRACT_*
    函数。
  • 提取:仅当目标schema要求时,才进行扁平化或字段提取操作。
  • 访问器:使用
    JSON_VALUE
    JSON_QUERY
    JSON_QUERY_ARRAY
    JSON_VALUE_ARRAY
    ,无需添加
    SAFE.
    前缀(它们本身已具备安全性)。
  • Schema映射:若提供了目标schema,提取JSON字段以匹配目标列的名称和类型。
  • NULL处理:若
    SAFE.PARSE_JSON
    返回NULL,保留原始字符串,并在清洗摘要中记录无效JSON的情况。

Array Data Handling

数组数据处理

  • Unnesting: Unnest array fields only if a destination schema explicitly requires it.
  • Type Casting: Attempt to cast elements to the most appropriate common type.
  • CRITICAL: Filter out
    NULL
    elements after
    SAFE_CAST
    (e.g., using
    ARRAY
    ) as BigQuery arrays cannot contain
    NULL
    s.
  • Normalization: Only trim whitespace on string fields. Make sure to PRESERVE CASE. DO NOT perform case conversions (e.g.,
    LOWER()
    ,
    UPPER()
    ) unless explicitly required.
  • Filtering: Filter out
    NULL
    values using
    ARRAY_FILTER(array_column, e -> e IS NOT NULL)
    .
  • Deduplication: Use
    ARRAY(SELECT DISTINCT x FROM UNNEST(array_column))
    for case-sensitive deduplication.
  • Transformations: Use
    ARRAY_TRANSFORM
    or
    UNNEST
    /
    ARRAY_AGG
    for element-wise changes (e.g., date parsing).
  • Restructuring: Use
    UNNEST
    to expand to rows, or
    ARRAY_AGG
    to group rows into an array, as required by the destination schema.
  • 展开:仅当目标schema明确要求时,才展开数组字段。
  • 类型转换:尝试将元素转换为最合适的通用类型。
  • 关键注意事项
    SAFE_CAST
    后需过滤掉
    NULL
    元素(如使用
    ARRAY
    ),因为BigQuery数组不能包含
    NULL
    值。
  • 标准化:仅对字符串字段去除空格。务必保留大小写除非明确要求,否则不要执行大小写转换(如
    LOWER()
    UPPER()
    )。
  • 过滤:使用
    ARRAY_FILTER(array_column, e -> e IS NOT NULL)
    过滤掉
    NULL
    值。
  • 去重:使用
    ARRAY(SELECT DISTINCT x FROM UNNEST(array_column))
    进行区分大小写的去重操作。
  • 转换:使用
    ARRAY_TRANSFORM
    UNNEST
    /
    ARRAY_AGG
    进行元素级别的修改(如日期解析)。
  • 重构:根据目标schema要求,使用
    UNNEST
    将数组展开为行,或使用
    ARRAY_AGG
    将行分组为数组。

STRUCT/Record Data Handling

STRUCT/记录数据处理

  • Extraction: Extract fields to top-level columns only if the destination schema requires it.
  • Type Casting: Cast each field using
    SAFE_CAST
    based on the destination schema or inferred profile.
  • Normalization: Only trim whitespace on string fields. Make sure to PRESERVE CASE. DO NOT perform case conversions (e.g.,
    LOWER()
    ,
    UPPER()
    ) unless explicitly required.
  • Field Mapping: Map directly if structures align; use dot notation (e.g.,
    struct.field
    ) to extract; or use
    STRUCT()
    constructor to group columns.
  • Schema Alignment: Populate missing fields with
    NULL
    and drop fields not present in the destination schema.
  • 提取:仅当目标schema要求时,才将字段提取至顶层列。
  • 类型转换:根据目标schema或推断的剖析信息,使用
    SAFE_CAST
    转换每个字段。
  • 标准化:仅对字符串字段去除空格。务必保留大小写除非明确要求,否则不要执行大小写转换(如
    LOWER()
    UPPER()
    )。
  • 字段映射:若结构对齐则直接映射;使用点符号(如
    struct.field
    )提取字段;或使用
    STRUCT()
    构造函数对列进行分组。
  • Schema对齐:用
    NULL
    填充缺失字段,并删除目标schema中不存在的字段。

Step 3: Quality Review & Profiling

步骤3:质量审核与剖析

[!IMPORTANT]
You MUST verify transformations strictly using the protocol below before completing the task. Never skip this step. Use Dataplex profiling only (unless scan was denied by the user) — not ad-hoc SQL queries.
Quality review protocol:
  1. Extract the
    SELECT
    query containing all generated transformations (autocleaning, schema mapping, JSON extractions).
  2. Create a temporary sample output table (max 1M rows, 1-hour TTL) by running the transformation query.
  3. Fix any runtime errors and re-run until the query succeeds.
  4. Profile the temporary sample output table using Dataplex:
    • Verify approval: If scan execution was approved in Step 1, proceed. If approval was DENIED in Step 1, DO NOT run the scanner script and DO NOT ask the user for approval again. Proceed with manual verification using
      bq
      sample queries to ensure transformations were successful.
    • Run the
      scripts/dataplex_scanner.py
      script on the temporary table.
    • The script will automatically wait for the profile job to finish and save the results as JSON.
  5. Compare profiles (Skip if scans were denied) — Check the new profile against the Step 1 profile for every transformed column:
    markdown
    | Anomaly Type | Threshold |
    | --- | --- |
    | **NULL increase** | >1% increase compared to source (unless expected) |
    | **Value range shift** | Unexpected ranges or formats |
  6. Iterate on anomalies — For each anomaly:
    1. Identify: Query samples where source is
      NOT NULL
      but transformed value
      IS NULL
      .
    2. Fix: Update the transformation logic.
    3. Repeat: Re-run Step 3 until the anomaly is resolved.
[!IMPORTANT]
完成任务前,必须严格按照以下流程验证转换逻辑。绝不要跳过此步骤。仅使用Dataplex剖析(除非用户拒绝了扫描请求)——不要使用临时SQL查询。
质量审核流程:
  1. 提取包含所有生成的转换逻辑(自动清洗、schema映射、JSON提取)的
    SELECT
    查询语句。
  2. 通过运行转换查询创建一个临时样本输出表(最多100万行,TTL为1小时)。
  3. 修复所有运行时错误,重新运行直到查询成功。
  4. 使用Dataplex剖析临时样本输出表
    • 验证批准状态:若步骤1中已批准扫描,则继续执行。若步骤1中拒绝了扫描请求,不要运行扫描脚本,也不要再向用户请求批准。使用
      bq
      样本查询进行手动验证,确保转换逻辑成功。
    • 在临时表上运行
      scripts/dataplex_scanner.py
      脚本。
    • 脚本会自动等待剖析任务完成,并将结果保存为JSON文件。
  5. 对比剖析结果(若扫描被拒绝则跳过)——针对每一个转换后的列,将新的剖析结果与步骤1的剖析结果进行对比:
    markdown
    | 异常类型 | 阈值 |
    | --- | --- |
    | **空值增加** | 与源表相比增加超过1%(预期情况除外) |
    | **值范围偏移** | 出现意外的范围或格式 |
  6. 针对异常进行迭代优化——对于每个异常:
    1. 识别:查询源表值
      NOT NULL
      但转换后值
      IS NULL
      的样本数据。
    2. 修复:更新转换逻辑。
    3. 重复:重复步骤3直到异常解决。

Step 3.5: Quality Review Evidence Requirements

步骤3.5:质量审核证据要求

Your
walkthrough.md
MUST include a Quality Review Profiling Evidence section. Note: If scan execution was denied by the user, document the denial reason here instead of Job IDs.
markdown
undefined
你的
walkthrough.md
必须包含质量审核剖析证据部分。注意:若用户拒绝了扫描请求,此处需记录拒绝原因,而非任务ID。
markdown
undefined

Quality Review Profiling Evidence

质量审核剖析证据

  • Post-Transformation Dataplex Profile Job ID: <JOB_ID>
  • Profile Comparison Summary: <Detailed comparison between initial and final profiles per column>

> [!CAUTION]
>
> **Do not** conclude the task or ask for user review until this section is
> filled and the profile comparison is documented.
  • 转换后Dataplex剖析任务ID: <JOB_ID>
  • 剖析结果对比摘要: <每列初始与最终剖析结果的详细对比>

> [!CAUTION]
>
> **完成此部分并记录剖析结果对比前,不要结束任务或请求用户审核**。

Step 4: Documentation

步骤4:文档记录

Your
walkthrough.md
must contain a table for each transformation in the following format:
markdown
| Field | Description |
| --- | --- |
| **Destination schema considered** | The target column/type being matched |
| **Issue Detected** | What data quality problem was found |
| **Transformation Applied** | The SQL logic used to fix it |
| **Benefit** | Why this transformation improves the data |
Include a summary of all quality review steps and profiling evidence.
你的
walkthrough.md
必须为每个转换逻辑包含如下格式的表格:
markdown
| 字段 | 描述 |
| --- | --- |
| **参考的目标schema** | 匹配的目标列/类型 |
| **发现的问题** | 发现的数据质量问题 |
| **应用的转换逻辑** | 用于修复问题的SQL逻辑 |
| **收益** | 该转换逻辑如何提升数据质量 |
需包含所有质量审核步骤和剖析证据的摘要。

Definition of Done

完成标准

  • All source data quality issues are identified and addressed via SQL transformations.
  • Verification MUST be completed using the Quality review protocol and documented with evidence.
  • The verification step MUST only test the changes and should succeed when the sql is executed.
  • Transformations align with the target schema if provided.
  • Cleaning summary is provided with clear justification for each transformation.
  • If skipped, the reason is clearly stated (e.g., ineligible source).
  • 所有源数据质量问题均已识别,并通过SQL转换逻辑解决。
  • 必须使用质量审核流程完成验证,并记录相关证据。
  • 验证步骤必须仅测试变更内容,且SQL执行应成功。
  • 若提供了目标schema,转换逻辑需与目标schema对齐。
  • 提供清洗摘要,为每个转换逻辑提供清晰的理由。
  • 若未使用本Skill,需明确说明原因(如数据源不符合要求)。