data_transform
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseData Transformation
数据转换
Transform raw data into analytical assets using modern transformation patterns, frameworks, and orchestration tools.
借助现代转换模式、框架与编排工具,将原始数据转换为可用于分析的资产。
Purpose
用途
Select and implement data transformation patterns across the modern data stack. Transform raw data into clean, tested, and documented analytical datasets using SQL (dbt), Python DataFrames (pandas, polars, PySpark), and pipeline orchestration (Airflow, Dagster, Prefect).
在现代数据栈中选择并实施数据转换模式。借助SQL(dbt)、Python DataFrame(pandas、polars、PySpark)以及管道编排工具(Airflow、Dagster、Prefect),将原始数据转换为干净、经过测试且有文档记录的分析数据集。
When to Use
适用场景
Invoke this skill when:
- Choosing between ETL and ELT transformation patterns
- Building dbt models (staging, intermediate, marts)
- Implementing incremental data loads and merge strategies
- Migrating pandas code to polars for performance improvements
- Orchestrating data pipelines with dependencies and retries
- Adding data quality tests and validation
- Processing large datasets with PySpark
- Creating production-ready transformation workflows
在以下场景中使用本技能:
- 选择ETL与ELT转换模式
- 构建dbt模型(staging层、intermediate层、marts层)
- 实现增量数据加载与合并策略
- 将pandas代码迁移至polars以提升性能
- 编排带有依赖关系与重试机制的数据管道
- 添加数据质量测试与验证
- 使用PySpark处理大型数据集
- 创建可用于生产环境的转换工作流
Quick Start: Common Patterns
快速开始:常见模式
dbt Incremental Model
dbt增量模型
sql
{{
config(
materialized='incremental',
unique_key='order_id'
)
}}
select order_id, customer_id, order_created_at, sum(revenue) as total_revenue
from {{ ref('int_order_items_joined') }}
group by 1, 2, 3
{% if is_incremental() %}
where order_created_at > (select max(order_created_at) from {{ this }})
{% endif %}sql
{{
config(
materialized='incremental',
unique_key='order_id'
)
}}
select order_id, customer_id, order_created_at, sum(revenue) as total_revenue
from {{ ref('int_order_items_joined') }}
group by 1, 2, 3
{% if is_incremental() %}
where order_created_at > (select max(order_created_at) from {{ this }})
{% endif %}polars High-Performance Transformation
polars高性能转换
python
import polars as pl
result = (
pl.scan_csv('large_dataset.csv')
.filter(pl.col('year') == 2024)
.with_columns([(pl.col('quantity') * pl.col('price')).alias('revenue')])
.group_by('region')
.agg(pl.col('revenue').sum())
.collect() # Execute lazy query
)python
import polars as pl
result = (
pl.scan_csv('large_dataset.csv')
.filter(pl.col('year') == 2024)
.with_columns([(pl.col('quantity') * pl.col('price')).alias('revenue')])
.group_by('region')
.agg(pl.col('revenue').sum())
.collect() # Execute lazy query
)Airflow Data Pipeline
Airflow数据管道
python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
with DAG(
dag_id='daily_sales_pipeline',
schedule_interval='0 2 * * *',
default_args={'retries': 2, 'retry_delay': timedelta(minutes=5)},
start_date=datetime(2024, 1, 1),
catchup=False
) as dag:
extract = PythonOperator(task_id='extract', python_callable=extract_data)
transform = PythonOperator(task_id='transform', python_callable=transform_data)
extract >> transformpython
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
with DAG(
dag_id='daily_sales_pipeline',
schedule_interval='0 2 * * *',
default_args={'retries': 2, 'retry_delay': timedelta(minutes=5)},
start_date=datetime(2024, 1, 1),
catchup=False
) as dag:
extract = PythonOperator(task_id='extract', python_callable=extract_data)
transform = PythonOperator(task_id='transform', python_callable=transform_data)
extract >> transformDecision Frameworks
决策框架
ETL vs ELT Selection
ETL与ELT选择
Use ELT (Extract, Load, Transform) when:
- Using modern cloud data warehouse (Snowflake, BigQuery, Databricks)
- Transformation logic changes frequently
- Team includes SQL analysts
- Data volume 10GB-1TB+ (leverage warehouse parallelism)
Tools: dbt, Dataform, Snowflake tasks, BigQuery scheduled queries
Use ETL (Extract, Transform, Load) when:
- Regulatory compliance requires pre-load data redaction (PII/PHI)
- Target system lacks compute power
- Real-time streaming with immediate transformation
- Legacy systems without cloud warehouse
Tools: AWS Glue, Azure Data Factory, custom Python scripts
Use Hybrid when combining sensitive data cleansing (ETL) with analytics transformations (ELT).
Default recommendation: ELT with dbt unless specific compliance or performance constraints require ETL.
For detailed patterns, see .
references/etl-vs-elt-patterns.md选择ELT(抽取、加载、转换)的场景:
- 使用现代云数据仓库(Snowflake、BigQuery、Databricks)
- 转换逻辑频繁变更
- 团队包含SQL分析师
- 数据量在10GB-1TB+(利用仓库并行计算能力)
工具:dbt、Dataform、Snowflake Tasks、BigQuery定时查询
选择ETL(抽取、转换、加载)的场景:
- 合规要求在加载前对数据进行脱敏(PII/PHI数据)
- 目标系统计算能力不足
- 需要实时流处理与即时转换
- 使用无云数据仓库的遗留系统
工具:AWS Glue、Azure Data Factory、自定义Python脚本
混合模式:结合敏感数据清洗(ETL)与分析转换(ELT)时使用。
默认推荐:除非有特定合规或性能限制要求使用ETL,否则优先选择基于dbt的ELT模式。
详细模式请参考。
references/etl-vs-elt-patterns.mdDataFrame Library Selection
DataFrame库选择
Choose pandas when:
- Data size < 500MB
- Prototyping or exploratory analysis
- Need compatibility with pandas-only libraries
Choose polars when:
- Data size 500MB-100GB
- Performance critical (10-100x faster than pandas)
- Production pipelines with memory constraints
- Want lazy evaluation with query optimization
Choose PySpark when:
- Data size > 100GB
- Need distributed processing across cluster
- Existing Spark infrastructure (EMR, Databricks)
Migration path: pandas → polars (easier, similar API) or pandas → PySpark (requires cluster)
For comparisons and migration guides, see .
references/dataframe-comparison.md选择pandas的场景:
- 数据量<500MB
- 原型开发或探索性分析
- 需要与仅支持pandas的库兼容
选择polars的场景:
- 数据量在500MB-100GB
- 性能要求高(比pandas快10-100倍)
- 生产管道存在内存限制
- 希望使用延迟查询与查询优化
选择PySpark的场景:
- 数据量>100GB
- 需要跨集群的分布式处理
- 已有Spark基础设施(EMR、Databricks)
迁移路径:pandas → polars(更简单,API相似)或pandas → PySpark(需要集群支持)
对比与迁移指南请参考。
references/dataframe-comparison.mdOrchestration Tool Selection
编排工具选择
Choose Airflow when:
- Enterprise production (proven at scale)
- Need 5,000+ integrations
- Managed services available (AWS MWAA, GCP Cloud Composer)
Choose Dagster when:
- Heavy dbt usage (native integration)
dbt_assets - Data lineage and asset-based workflows prioritized
- ML pipelines requiring testability
Choose Prefect when:
- Dynamic workflows (runtime task generation)
- Cloud-native architecture preferred
- Pythonic API with decorators
Safe default: Airflow (battle-tested) unless specific needs for Dagster/Prefect.
For detailed patterns, see .
references/orchestration-patterns.md选择Airflow的场景:
- 企业级生产环境(经大规模验证)
- 需要5000+集成能力
- 有托管服务可用(AWS MWAA、GCP Cloud Composer)
选择Dagster的场景:
- 大量使用dbt(原生集成)
dbt_assets - 优先考虑数据血缘与基于资产的工作流
- 需要可测试的机器学习管道
选择Prefect的场景:
- 动态工作流(运行时生成任务)
- 偏好云原生架构
- 采用Python装饰器的API
安全默认选项:Airflow(久经考验),除非对Dagster/Prefect有特定需求。
详细模式请参考。
references/orchestration-patterns.mdSQL Transformations with dbt
基于dbt的SQL转换
Model Layer Structure
模型分层结构
-
Staging Layer ()
models/staging/- 1:1 with source tables
- Minimal transformations (renaming, type casting, basic filtering)
- Materialized as views or ephemeral
-
Intermediate Layer ()
models/intermediate/- Business logic and complex joins
- Not exposed to end users
- Often ephemeral (CTEs only)
-
Marts Layer ()
models/marts/- Final models for reporting
- Fact tables (events, transactions)
- Dimension tables (customers, products)
- Materialized as tables or incremental
-
Staging层()
models/staging/- 与源表1:1对应
- 仅做最小转换(重命名、类型转换、基础过滤)
- 以视图或临时表形式物化
-
Intermediate层()
models/intermediate/- 包含业务逻辑与复杂关联
- 不向终端用户开放
- 通常为临时表(仅CTE)
-
Marts层()
models/marts/- 用于报表的最终模型
- 事实表(事件、交易)
- 维度表(客户、产品)
- 以表或增量形式物化
dbt Materialization Types
dbt物化类型
View: Query re-run each time model referenced. Use for fast queries, staging layer.
Table: Full refresh on each run. Use for frequently queried models, expensive computations.
Incremental: Only processes new/changed records. Use for large fact tables, event logs.
Ephemeral: CTE only, not persisted. Use for intermediate calculations.
View:每次引用模型时重新执行查询。适用于快速查询、Staging层。
Table:每次运行时全量刷新。适用于频繁查询的模型、计算成本高的任务。
Incremental:仅处理新增/变更记录。适用于大型事实表、事件日志。
Ephemeral:仅作为CTE,不持久化。适用于中间计算。
dbt Testing
dbt测试
yaml
models:
- name: fct_orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: customer_id
tests:
- relationships:
to: ref('dim_customers')
field: customer_id
- name: total_revenue
tests:
- dbt_utils.accepted_range:
min_value: 0For comprehensive dbt patterns, see:
references/dbt-best-practices.mdreferences/incremental-strategies.md
yaml
models:
- name: fct_orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: customer_id
tests:
- relationships:
to: ref('dim_customers')
field: customer_id
- name: total_revenue
tests:
- dbt_utils.accepted_range:
min_value: 0完整dbt模式请参考:
references/dbt-best-practices.mdreferences/incremental-strategies.md
Python DataFrame Transformations
Python DataFrame转换
pandas Transformation
pandas转换
python
import pandas as pd
df = pd.read_csv('sales.csv')
result = (
df
.query('year == 2024')
.assign(revenue=lambda x: x['quantity'] * x['price'])
.groupby('region')
.agg({'revenue': ['sum', 'mean']})
)python
import pandas as pd
df = pd.read_csv('sales.csv')
result = (
df
.query('year == 2024')
.assign(revenue=lambda x: x['quantity'] * x['price'])
.groupby('region')
.agg({'revenue': ['sum', 'mean']})
)polars Transformation (10-100x Faster)
polars转换(快10-100倍)
python
import polars as pl
result = (
pl.scan_csv('sales.csv') # Lazy evaluation
.filter(pl.col('year') == 2024)
.with_columns([(pl.col('quantity') * pl.col('price')).alias('revenue')])
.group_by('region')
.agg([
pl.col('revenue').sum().alias('revenue_sum'),
pl.col('revenue').mean().alias('revenue_mean')
])
.collect() # Execute lazy query
)Key differences:
- polars uses (lazy) vs pandas
scan_csv()(eager)read_csv() - polars uses vs pandas
with_columns()assign() - polars uses expressions vs pandas string references
pl.col() - polars requires to execute lazy queries
collect()
python
import polars as pl
result = (
pl.scan_csv('sales.csv') # Lazy evaluation
.filter(pl.col('year') == 2024)
.with_columns([(pl.col('quantity') * pl.col('price')).alias('revenue')])
.group_by('region')
.agg([
pl.col('revenue').sum().alias('revenue_sum'),
pl.col('revenue').mean().alias('revenue_mean')
])
.collect() # Execute lazy query
)核心差异:
- polars使用(延迟加载),pandas使用
scan_csv()(即时加载)read_csv() - polars使用,pandas使用
with_columns()assign() - polars使用表达式,pandas使用字符串引用
pl.col() - polars需要执行延迟查询
collect()
PySpark for Distributed Processing
PySpark分布式处理
python
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.appName("Transform").getOrCreate()
df = spark.read.csv('sales.csv', header=True, inferSchema=True)
result = (
df
.filter(F.col('year') == 2024)
.withColumn('revenue', F.col('quantity') * F.col('price'))
.groupBy('region')
.agg(F.sum('revenue').alias('total_revenue'))
)For migration guides, see .
references/dataframe-comparison.mdpython
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.appName("Transform").getOrCreate()
df = spark.read.csv('sales.csv', header=True, inferSchema=True)
result = (
df
.filter(F.col('year') == 2024)
.withColumn('revenue', F.col('quantity') * F.col('price'))
.groupBy('region')
.agg(F.sum('revenue').alias('total_revenue'))
)迁移指南请参考。
references/dataframe-comparison.mdPipeline Orchestration
管道编排
Airflow DAG Structure
Airflow DAG结构
python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-engineering',
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
with DAG(
dag_id='data_pipeline',
default_args=default_args,
schedule_interval='0 2 * * *', # Daily at 2 AM
start_date=datetime(2024, 1, 1),
catchup=False
) as dag:
task1 = PythonOperator(task_id='extract', python_callable=extract_fn)
task2 = PythonOperator(task_id='transform', python_callable=transform_fn)
task1 >> task2 # Define dependencypython
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-engineering',
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
with DAG(
dag_id='data_pipeline',
default_args=default_args,
schedule_interval='0 2 * * *', # Daily at 2 AM
start_date=datetime(2024, 1, 1),
catchup=False
) as dag:
task1 = PythonOperator(task_id='extract', python_callable=extract_fn)
task2 = PythonOperator(task_id='transform', python_callable=transform_fn)
task1 >> task2 # Define dependencyTask Dependency Patterns
任务依赖模式
Linear: (sequential)
Fan-out: (parallel after A)
Fan-in: (D waits for all)
A >> B >> CA >> [B, C, D][A, B, C] >> DFor Airflow, Dagster, and Prefect patterns, see .
references/orchestration-patterns.md线性:(顺序执行)
扇出:(A执行后并行执行B、C、D)
扇入:(D等待A、B、C全部完成)
A >> B >> CA >> [B, C, D][A, B, C] >> DAirflow、Dagster与Prefect模式请参考。
references/orchestration-patterns.mdData Quality and Testing
数据质量与测试
dbt Tests
dbt测试
Generic tests (reusable): unique, not_null, accepted_values, relationships
Singular tests (custom SQL):
sql
-- tests/assert_positive_revenue.sql
select * from {{ ref('fct_orders') }}
where total_revenue < 0通用测试(可复用):unique、not_null、accepted_values、relationships
自定义测试(自定义SQL):
sql
-- tests/assert_positive_revenue.sql
select * from {{ ref('fct_orders') }}
where total_revenue < 0Great Expectations
Great Expectations
python
import great_expectations as gx
context = gx.get_context()
suite = context.add_expectation_suite("orders_suite")
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="total_revenue", min_value=0
)
)For comprehensive testing patterns, see .
references/data-quality-testing.mdpython
import great_expectations as gx
context = gx.get_context()
suite = context.add_expectation_suite("orders_suite")
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="total_revenue", min_value=0
)
)完整测试模式请参考。
references/data-quality-testing.mdAdvanced SQL Patterns
高级SQL模式
Window functions for analytics:
sql
select
order_date,
daily_revenue,
avg(daily_revenue) over (
partition by region
order by order_date
rows between 6 preceding and current row
) as revenue_7d_ma,
sum(daily_revenue) over (
partition by region
order by order_date
) as cumulative_revenue
from daily_salesFor advanced window functions, see .
references/window-functions-guide.md用于分析的窗口函数:
sql
select
order_date,
daily_revenue,
avg(daily_revenue) over (
partition by region
order by order_date
rows between 6 preceding and current row
) as revenue_7d_ma,
sum(daily_revenue) over (
partition by region
order by order_date
) as cumulative_revenue
from daily_sales高级窗口函数请参考。
references/window-functions-guide.mdProduction Best Practices
生产环境最佳实践
Idempotency
幂等性
Ensure transformations produce same result when run multiple times:
- Use statements in incremental models
merge - Implement deduplication logic
- Use in dbt incremental models
unique_key
确保转换任务多次运行结果一致:
- 在增量模型中使用语句
merge - 实现去重逻辑
- 在dbt增量模型中使用
unique_key
Incremental Loading
增量加载
sql
{% if is_incremental() %}
where created_at > (select max(created_at) from {{ this }})
{% endif %}sql
{% if is_incremental() %}
where created_at > (select max(created_at) from {{ this }})
{% endif %}Error Handling
错误处理
python
try:
result = perform_transformation()
validate_result(result)
except ValidationError as e:
log_error(e)
raisepython
try:
result = perform_transformation()
validate_result(result)
except ValidationError as e:
log_error(e)
raiseMonitoring
监控
- Set up Airflow email/Slack alerts on task failure
- Monitor dbt test failures
- Track data freshness (SLAs)
- Log row counts and data quality metrics
- 配置Airflow任务失败时的邮件/Slack告警
- 监控dbt测试失败情况
- 跟踪数据新鲜度(SLA)
- 记录行数与数据质量指标
Tool Recommendations
工具推荐
SQL Transformations: dbt Core (industry standard, multi-warehouse, rich ecosystem)
bash
pip install dbt-core dbt-snowflakePython DataFrames: polars (10-100x faster than pandas, multi-threaded, lazy evaluation)
bash
pip install polarsOrchestration: Apache Airflow (battle-tested at scale, 5,000+ integrations)
bash
pip install apache-airflowSQL转换:dbt Core(行业标准,支持多仓库,生态丰富)
bash
pip install dbt-core dbt-snowflakePython DataFrame:polars(比pandas快10-100倍,多线程,延迟查询)
bash
pip install polars编排工具:Apache Airflow(久经大规模验证,5000+集成)
bash
pip install apache-airflowExamples
示例
Working examples in:
- - pandas transformations
examples/python/pandas-basics.py - - pandas to polars migration
examples/python/polars-migration.py - - PySpark operations
examples/python/pyspark-transformations.py - - Complete Airflow DAG
examples/python/airflow-data-pipeline.py - - dbt staging layer
examples/sql/dbt-staging-model.sql - - dbt intermediate layer
examples/sql/dbt-intermediate-model.sql - - Incremental patterns
examples/sql/dbt-incremental-model.sql - - Advanced SQL
examples/sql/window-functions.sql
可运行示例位于:
- - pandas转换
examples/python/pandas-basics.py - - pandas转polars迁移
examples/python/polars-migration.py - - PySpark操作
examples/python/pyspark-transformations.py - - 完整Airflow DAG
examples/python/airflow-data-pipeline.py - - dbt Staging层
examples/sql/dbt-staging-model.sql - - dbt Intermediate层
examples/sql/dbt-intermediate-model.sql - - 增量模式
examples/sql/dbt-incremental-model.sql - - 高级SQL
examples/sql/window-functions.sql
Scripts
脚本
- - Generate dbt model boilerplate
scripts/generate_dbt_models.py - - Compare pandas vs polars performance
scripts/benchmark_dataframes.py
- - 生成dbt模型模板
scripts/generate_dbt_models.py - - 对比pandas与polars性能
scripts/benchmark_dataframes.py
Related Skills
相关技能
For data ingestion patterns, see .
For data visualization, see .
For database design, see skills.
For real-time streaming, see .
For data platform architecture, see .
For monitoring pipelines, see .
ingesting-datavisualizing-datadatabases-*streaming-dataai-data-engineeringobservability数据抽取模式请参考。
数据可视化请参考。
数据库设计请参考技能。
实时流处理请参考。
数据平台架构请参考。
管道监控请参考。
ingesting-datavisualizing-datadatabases-*streaming-dataai-data-engineeringobservabilityMerged Content from etl-pipelines
合并自etl-pipelines的内容
name: data_transform description: Design ETL/ELT pipelines with proper orchestration, error handling, and monitoring. Use when building data pipelines, designing data workflows, or implementing data transformations.
name: data_transform description: Design ETL/ELT pipelines with proper orchestration, error handling, and monitoring. Use when building data pipelines, designing data workflows, or implementing data transformations.
ETL Designer
ETL设计器
Design robust ETL/ELT pipelines for data processing.
设计稳健的ETL/ELT数据处理管道。
Quick Start
快速开始
Use Airflow for orchestration, implement idempotent operations, add error handling, monitor pipeline health.
使用Airflow进行编排,实现幂等操作,添加错误处理,监控管道健康状态。
Instructions
操作指南
Airflow DAG Structure
Airflow DAG结构
python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'retries': 3,
'retry_delay': timedelta(minutes=5),
'email_on_failure': True,
'email': ['alerts@company.com']
}
with DAG(
'etl_pipeline',
default_args=default_args,
schedule_interval='0 2 * * *', # Daily at 2 AM
start_date=datetime(2024, 1, 1),
catchup=False
) as dag:
extract = PythonOperator(
task_id='extract_data',
python_callable=extract_from_source
)
transform = PythonOperator(
task_id='transform_data',
python_callable=transform_data
)
load = PythonOperator(
task_id='load_to_warehouse',
python_callable=load_to_warehouse
)
extract >> transform >> loadpython
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'retries': 3,
'retry_delay': timedelta(minutes=5),
'email_on_failure': True,
'email': ['alerts@company.com']
}
with DAG(
'etl_pipeline',
default_args=default_args,
schedule_interval='0 2 * * *', # Daily at 2 AM
start_date=datetime(2024, 1, 1),
catchup=False
) as dag:
extract = PythonOperator(
task_id='extract_data',
python_callable=extract_from_source
)
transform = PythonOperator(
task_id='transform_data',
python_callable=transform_data
)
load = PythonOperator(
task_id='load_to_warehouse',
python_callable=load_to_warehouse
)
extract >> transform >> loadIncremental Processing
增量处理
python
def extract_incremental(last_run_date):
query = f"""
SELECT * FROM source_table
WHERE updated_at > '{last_run_date}'
"""
return pd.read_sql(query, conn)python
def extract_incremental(last_run_date):
query = f"""
SELECT * FROM source_table
WHERE updated_at > '{last_run_date}'
"""
return pd.read_sql(query, conn)Error Handling
错误处理
python
def safe_transform(data):
try:
transformed = transform_data(data)
return transformed
except Exception as e:
logger.error(f"Transform failed: {e}")
send_alert(f"Pipeline failed: {e}")
raisepython
def safe_transform(data):
try:
transformed = transform_data(data)
return transformed
except Exception as e:
logger.error(f"Transform failed: {e}")
send_alert(f"Pipeline failed: {e}")
raiseBest Practices
最佳实践
🔄 Workflow
🔄 工作流
Aşama 1: Data Contract & Source Audit
步骤1:数据契约与源数据审计
- Data Contracts: Veri kaynağı (Source) ve hedef (Target) arasındaki şemayı sabitle.
- Profiling: Ham verideki eksikleri, null oranlarını ve tipleri (Profiling) analiz et.
- Pattern Selection: Veri boyutuna göre ETL (Pandas/Polars) veya ELT (SQL/dbt) seçimi yap.
- 数据契约:固定数据源(Source)与目标(Target)之间的 schema。
- 数据探查:分析原始数据中的缺失值、空值占比与数据类型。
- 模式选择:根据数据量选择ETL(Pandas/Polars)或ELT(SQL/dbt)。
Aşama 2: Transformation Engine Setup
步骤2:转换引擎设置
- Infrastructure: profilini kur veya Cloud IDE yapılandır.
dbt-core - Modular Modeling: Veriyi Staging (Renaming), Intermediate (Logic) ve Marts (Final) katmanlarına ayır.
- Polars Optimization: Python tabanlı dönüşümlerde modunu (
lazy/scan_csv) kullanarak bellek ve hız optimizasyonu yap.collect
- 基础设施:配置环境或云IDE。
dbt-core - 模块化建模:将数据分为Staging(重命名)、Intermediate(逻辑处理)与Marts(最终输出)三层。
- Polars优化:基于Python的转换中使用模式(
lazy/scan_csv)优化内存与速度。collect
Aşama 3: Testing & Orchestration
步骤3:测试与编排
- Unit Tests: Kritik dönüşüm mantığı için veya
dbt testsile validation yaz.Great Expectations - Idempotency: Boru hattının (Pipeline) hata durumunda tekrar çalıştırılabilir (Idempotent) olduğundan emin ol.
- Orchestration: İş akışını Airflow veya Dagster üzerinde takvime bağla ve hata bildirimlerini kur.
- 单元测试:使用或
dbt tests编写关键转换逻辑的验证用例。Great Expectations - 幂等性:确保管道在出错时可重复执行(幂等)。
- 编排:将工作流与Airflow或Dagster调度关联,并配置错误通知。
Kontrol Noktaları
检查点
| Aşama | Doğrulama |
|---|---|
| 1 | Dönüşüm sonrası veri kaybı yaşandı mı? (Check Sum) |
| 2 | dbt modellerinde |
| 3 | Pipeline başarısız olduğunda "Rollback" veya "Reprocessing" stratejisi var mı? |
Data Transformation v2.0 - With Workflow
| 步骤 | 验证项 |
|---|---|
| 1 | 转换后是否出现数据丢失?(校验总和) |
| 2 | dbt模型中是否使用了 |
| 3 | 管道失败时是否有回滚或重处理策略? |
Data Transformation v2.0 - With Workflow