medallion-architecture

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Medallion Architecture Skill

勋章架构(Medallion Architecture)技能

Overview

概述

The medallion architecture (also called multi-hop architecture) is a design pattern for organizing data in a lakehouse using three progressive layers:
  • Bronze (Raw): Ingested data in its original format
  • Silver (Refined): Cleansed and conformed data
  • Gold (Curated): Business-level aggregates and features
勋章架构(也称为多跳架构)是一种在湖仓中组织数据的设计模式,包含三个递进层级:
  • 青铜层(Bronze,原始数据):以原始格式存储摄入的数据
  • 白银层(Silver,精炼数据):经过清洗和标准化的数据
  • 黄金层(Gold,精选数据):业务级别的聚合数据和特征数据

When to Use This Skill

何时使用该技能

Use this skill when you need to:
  • Design a new data pipeline with proper layering
  • Migrate from traditional ETL to lakehouse architecture
  • Implement incremental processing patterns
  • Build a scalable data platform
  • Ensure data quality at each layer
当你需要以下场景时使用本技能:
  • 设计具备合理分层的新数据管道
  • 从传统ETL迁移至湖仓架构
  • 实现增量处理模式
  • 搭建可扩展的数据平台
  • 确保每个层级的数据质量

Architecture Principles

架构原则

1. Bronze Layer (Raw)

1. 青铜层(原始数据)

Purpose: Store raw data exactly as received from source systems
Characteristics:
  • Immutable historical record
  • Schema-on-read approach
  • Metadata enrichment (_ingested_at, _source_file)
  • Minimal transformations
  • Full audit trail
Use Cases:
  • Data recovery
  • Reprocessing requirements
  • Audit compliance
  • Debugging data issues
用途:完全按照源系统接收的格式存储原始数据
特性:
  • 不可变的历史记录
  • 读时模式(Schema-on-read)方式
  • 元数据增强(_ingested_at、_source_file)
  • 最小化转换
  • 完整的审计追踪
适用场景:
  • 数据恢复
  • 重新处理需求
  • 审计合规
  • 调试数据问题

2. Silver Layer (Refined)

2. 白银层(精炼数据)

Purpose: Cleansed, validated, and standardized data
Characteristics:
  • Schema enforcement
  • Data quality checks
  • Deduplication
  • Standardization
  • Type conversions
  • Business rules applied
Use Cases:
  • Downstream analytics
  • Feature engineering
  • Data science modeling
  • Operational reporting
用途:经过清洗、验证和标准化的数据
特性:
  • 模式强制执行
  • 数据质量检查
  • 去重
  • 标准化
  • 类型转换
  • 应用业务规则
适用场景:
  • 下游分析
  • 特征工程
  • 数据科学建模
  • 运营报表

3. Gold Layer (Curated)

3. 黄金层(精选数据)

Purpose: Business-level aggregates optimized for consumption
Characteristics:
  • Highly aggregated
  • Optimized for queries
  • Business KPIs
  • Feature tables
  • Production-ready datasets
Use Cases:
  • Dashboards and BI
  • ML model serving
  • Real-time applications
  • Executive reporting
用途:针对消费场景优化的业务级聚合数据
特性:
  • 高度聚合
  • 针对查询进行优化
  • 业务KPI
  • 特征表
  • 可用于生产的数据集
适用场景:
  • 仪表盘与BI分析
  • ML模型服务
  • 实时应用
  • 高管报表

Implementation Patterns

实现模式

Pattern 1: Batch Processing

模式1:批量处理

Bronze Layer:
python
def ingest_to_bronze(source_path: str, target_table: str):
    """Ingest raw data to Bronze layer."""
    df = (spark.read
        .format("cloudFiles")
        .option("cloudFiles.format", "parquet")
        .load(source_path)
        .withColumn("_ingested_at", current_timestamp())
        .withColumn("_source_file", input_file_name())
    )
    
    (df.write
        .format("delta")
        .mode("append")
        .option("mergeSchema", "true")
        .saveAsTable(target_table)
    )
Silver Layer:
python
def process_to_silver(bronze_table: str, silver_table: str):
    """Transform Bronze to Silver with quality checks."""
    bronze_df = spark.read.table(bronze_table)
    
    silver_df = (bronze_df
        .dropDuplicates(["id"])
        .filter(col("id").isNotNull())
        .withColumn("email", lower(trim(col("email"))))
        .withColumn("created_date", to_date(col("created_at")))
        .withColumn("quality_score", 
            when(col("email").rlike(r"^[\w\.-]+@[\w\.-]+\.\w+$"), 1.0)
            .otherwise(0.5)
        )
    )
    
    (silver_df.write
        .format("delta")
        .mode("overwrite")
        .saveAsTable(silver_table)
    )
Gold Layer:
python
def aggregate_to_gold(silver_table: str, gold_table: str):
    """Aggregate Silver to Gold business metrics."""
    silver_df = spark.read.table(silver_table)
    
    gold_df = (silver_df
        .groupBy("customer_segment", "region")
        .agg(
            count("*").alias("customer_count"),
            sum("lifetime_value").alias("total_ltv"),
            avg("quality_score").alias("avg_quality")
        )
        .withColumn("updated_at", current_timestamp())
    )
    
    (gold_df.write
        .format("delta")
        .mode("overwrite")
        .saveAsTable(gold_table)
    )
青铜层:
python
def ingest_to_bronze(source_path: str, target_table: str):
    """Ingest raw data to Bronze layer."""
    df = (spark.read
        .format("cloudFiles")
        .option("cloudFiles.format", "parquet")
        .load(source_path)
        .withColumn("_ingested_at", current_timestamp())
        .withColumn("_source_file", input_file_name())
    )
    
    (df.write
        .format("delta")
        .mode("append")
        .option("mergeSchema", "true")
        .saveAsTable(target_table)
    )
白银层:
python
def process_to_silver(bronze_table: str, silver_table: str):
    """Transform Bronze to Silver with quality checks."""
    bronze_df = spark.read.table(bronze_table)
    
    silver_df = (bronze_df
        .dropDuplicates(["id"])
        .filter(col("id").isNotNull())
        .withColumn("email", lower(trim(col("email"))))
        .withColumn("created_date", to_date(col("created_at")))
        .withColumn("quality_score", 
            when(col("email").rlike(r"^[\w\.-]+@[\w\.-]+\.\w+$"), 1.0)
            .otherwise(0.5)
        )
    )
    
    (silver_df.write
        .format("delta")
        .mode("overwrite")
        .saveAsTable(silver_table)
    )
黄金层:
python
def aggregate_to_gold(silver_table: str, gold_table: str):
    """Aggregate Silver to Gold business metrics."""
    silver_df = spark.read.table(silver_table)
    
    gold_df = (silver_df
        .groupBy("customer_segment", "region")
        .agg(
            count("*").alias("customer_count"),
            sum("lifetime_value").alias("total_ltv"),
            avg("quality_score").alias("avg_quality")
        )
        .withColumn("updated_at", current_timestamp())
    )
    
    (gold_df.write
        .format("delta")
        .mode("overwrite")
        .saveAsTable(gold_table)
    )

Pattern 2: Incremental Processing

模式2:增量处理

Bronze (Streaming):
python
(spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load(source_path)
    .withColumn("_ingested_at", current_timestamp())
    .writeStream
    .format("delta")
    .option("checkpointLocation", checkpoint_path)
    .trigger(availableNow=True)
    .toTable(bronze_table)
)
Silver (Incremental Merge):
python
from delta.tables import DeltaTable

def incremental_silver_merge(bronze_table: str, silver_table: str, watermark: str):
    """Incrementally merge new Bronze data into Silver."""
    
    # Get new records since last watermark
    new_records = (spark.read.table(bronze_table)
        .filter(col("_ingested_at") > watermark)
    )
    
    # Transform
    transformed = transform_to_silver(new_records)
    
    # Merge into Silver
    silver = DeltaTable.forName(spark, silver_table)
    
    (silver.alias("target")
        .merge(
            transformed.alias("source"),
            "target.id = source.id"
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )
青铜层(流处理):
python
(spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load(source_path)
    .withColumn("_ingested_at", current_timestamp())
    .writeStream
    .format("delta")
    .option("checkpointLocation", checkpoint_path)
    .trigger(availableNow=True)
    .toTable(bronze_table)
)
白银层(增量合并):
python
from delta.tables import DeltaTable

def incremental_silver_merge(bronze_table: str, silver_table: str, watermark: str):
    """Incrementally merge new Bronze data into Silver."""
    
    # Get new records since last watermark
    new_records = (spark.read.table(bronze_table)
        .filter(col("_ingested_at") > watermark)
    )
    
    # Transform
    transformed = transform_to_silver(new_records)
    
    # Merge into Silver
    silver = DeltaTable.forName(spark, silver_table)
    
    (silver.alias("target")
        .merge(
            transformed.alias("source"),
            "target.id = source.id"
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )

Data Quality Patterns

数据质量模式

Quality Checks at Each Layer

各层级的质量检查

Bronze:
  • File completeness check
  • Row count validation
  • Schema drift detection
Silver:
  • Null value checks
  • Data type validation
  • Business rule validation
  • Referential integrity
  • Duplicate detection
Gold:
  • Aggregate accuracy
  • KPI threshold checks
  • Trend anomaly detection
  • Completeness validation
青铜层:
  • 文件完整性检查
  • 行数验证
  • 模式漂移检测
白银层:
  • 空值检查
  • 数据类型验证
  • 业务规则验证
  • 引用完整性
  • 重复项检测
黄金层:
  • 聚合准确性
  • KPI阈值检查
  • 趋势异常检测
  • 完整性验证

Quality Check Implementation

质量检查实现

python
def validate_silver_quality(table_name: str) -> Dict[str, bool]:
    """Run quality checks on Silver table."""
    df = spark.read.table(table_name)
    
    checks = {
        "no_null_ids": df.filter(col("id").isNull()).count() == 0,
        "valid_emails": df.filter(
            ~col("email").rlike(r"^[\w\.-]+@[\w\.-]+\.\w+$")
        ).count() == 0,
        "no_duplicates": df.count() == df.select("id").distinct().count(),
        "within_date_range": df.filter(
            (col("created_date") < "2020-01-01") |
            (col("created_date") > current_date())
        ).count() == 0
    }
    
    return checks
python
def validate_silver_quality(table_name: str) -> Dict[str, bool]:
    """Run quality checks on Silver table."""
    df = spark.read.table(table_name)
    
    checks = {
        "no_null_ids": df.filter(col("id").isNull()).count() == 0,
        "valid_emails": df.filter(
            ~col("email").rlike(r"^[\w\.-]+@[\w\.-]+\.\w+$")
        ).count() == 0,
        "no_duplicates": df.count() == df.select("id").distinct().count(),
        "within_date_range": df.filter(
            (col("created_date") < "2020-01-01") |
            (col("created_date") > current_date())
        ).count() == 0
    }
    
    return checks

Optimization Strategies

优化策略

Bronze Layer Optimization

青铜层优化

sql
-- Partition by ingestion date
CREATE TABLE bronze.raw_events
USING delta
PARTITIONED BY (ingestion_date)
AS SELECT *, current_date() as ingestion_date FROM source;

-- Enable auto-optimize
ALTER TABLE bronze.raw_events
SET TBLPROPERTIES (
    'delta.autoOptimize.optimizeWrite' = 'true',
    'delta.autoOptimize.autoCompact' = 'true'
);
sql
-- Partition by ingestion date
CREATE TABLE bronze.raw_events
USING delta
PARTITIONED BY (ingestion_date)
AS SELECT *, current_date() as ingestion_date FROM source;

-- Enable auto-optimize
ALTER TABLE bronze.raw_events
SET TBLPROPERTIES (
    'delta.autoOptimize.optimizeWrite' = 'true',
    'delta.autoOptimize.autoCompact' = 'true'
);

Silver Layer Optimization

白银层优化

sql
-- Z-ORDER for common filters
OPTIMIZE silver.customers
ZORDER BY (customer_segment, region, created_date);

-- Enable Change Data Feed
ALTER TABLE silver.customers
SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
sql
-- Z-ORDER for common filters
OPTIMIZE silver.customers
ZORDER BY (customer_segment, region, created_date);

-- Enable Change Data Feed
ALTER TABLE silver.customers
SET TBLPROPERTIES (delta.enableChangeDataFeed = true);

Gold Layer Optimization

黄金层优化

sql
-- Liquid clustering for query performance
CREATE TABLE gold.customer_metrics
USING delta
CLUSTER BY (customer_segment, date)
AS SELECT * FROM aggregated_metrics;

-- Optimize and vacuum
OPTIMIZE gold.customer_metrics;
VACUUM gold.customer_metrics RETAIN 168 HOURS;
sql
-- Liquid clustering for query performance
CREATE TABLE gold.customer_metrics
USING delta
CLUSTER BY (customer_segment, date)
AS SELECT * FROM aggregated_metrics;

-- Optimize and vacuum
OPTIMIZE gold.customer_metrics;
VACUUM gold.customer_metrics RETAIN 168 HOURS;

Complete Example

完整示例

See
/templates/bronze-silver-gold/
for a complete implementation including:
  • Project structure
  • Bronze ingestion scripts
  • Silver transformation logic
  • Gold aggregation queries
  • Data quality tests
  • Deployment configuration
请查看
/templates/bronze-silver-gold/
获取完整实现,包括:
  • 项目结构
  • 青铜层摄入脚本
  • 白银层转换逻辑
  • 黄金层聚合查询
  • 数据质量测试
  • 部署配置

Best Practices

最佳实践

  1. Idempotency: Ensure pipelines can be re-run safely
  2. Incrementality: Process only new/changed data
  3. Quality Gates: Block bad data from progressing
  4. Schema Evolution: Handle schema changes gracefully
  5. Monitoring: Track pipeline health and data quality
  6. Documentation: Document data lineage and transformations
  7. Testing: Unit test transformations, integration test pipelines
  1. 幂等性:确保管道可以安全地重新运行
  2. 增量性:仅处理新增/变更的数据
  3. 质量网关:阻止不良数据进入下一层级
  4. 模式演进:优雅地处理模式变更
  5. 监控:跟踪管道健康状况和数据质量
  6. 文档:记录数据血缘和转换逻辑
  7. 测试:对转换逻辑进行单元测试,对管道进行集成测试

Common Pitfalls to Avoid

需避免的常见陷阱

Don't:
  • Mix transformation logic across layers
  • Skip Bronze layer to "save storage"
  • Over-aggregate too early
  • Ignore data quality in Silver
  • Hard-code business logic in Bronze
Do:
  • Keep Bronze immutable
  • Enforce quality in Silver
  • Optimize Gold for consumption
  • Use incremental processing
  • Implement proper monitoring
请勿:
  • 在不同层级之间混合转换逻辑
  • 跳过青铜层以“节省存储”
  • 过早进行过度聚合
  • 在白银层忽略数据质量
  • 在青铜层硬编码业务逻辑
建议:
  • 保持青铜层不可变
  • 在白银层强制执行质量检查
  • 针对消费场景优化黄金层
  • 使用增量处理
  • 实施适当的监控

Related Skills

相关技能

  • delta-live-tables
    : Declarative pipeline orchestration
  • data-quality
    : Great Expectations integration
  • testing-patterns
    : Pipeline testing strategies
  • cicd-workflows
    : Deployment automation
  • delta-live-tables
    :声明式管道编排
  • data-quality
    :Great Expectations集成
  • testing-patterns
    :管道测试策略
  • cicd-workflows
    :部署自动化

References

参考资料