medallion-architecture
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseMedallion Architecture Skill
勋章架构(Medallion Architecture)技能
Overview
概述
The medallion architecture (also called multi-hop architecture) is a design pattern for organizing data in a lakehouse using three progressive layers:
- Bronze (Raw): Ingested data in its original format
- Silver (Refined): Cleansed and conformed data
- Gold (Curated): Business-level aggregates and features
勋章架构(也称为多跳架构)是一种在湖仓中组织数据的设计模式,包含三个递进层级:
- 青铜层(Bronze,原始数据):以原始格式存储摄入的数据
- 白银层(Silver,精炼数据):经过清洗和标准化的数据
- 黄金层(Gold,精选数据):业务级别的聚合数据和特征数据
When to Use This Skill
何时使用该技能
Use this skill when you need to:
- Design a new data pipeline with proper layering
- Migrate from traditional ETL to lakehouse architecture
- Implement incremental processing patterns
- Build a scalable data platform
- Ensure data quality at each layer
当你需要以下场景时使用本技能:
- 设计具备合理分层的新数据管道
- 从传统ETL迁移至湖仓架构
- 实现增量处理模式
- 搭建可扩展的数据平台
- 确保每个层级的数据质量
Architecture Principles
架构原则
1. Bronze Layer (Raw)
1. 青铜层(原始数据)
Purpose: Store raw data exactly as received from source systems
Characteristics:
- Immutable historical record
- Schema-on-read approach
- Metadata enrichment (_ingested_at, _source_file)
- Minimal transformations
- Full audit trail
Use Cases:
- Data recovery
- Reprocessing requirements
- Audit compliance
- Debugging data issues
用途:完全按照源系统接收的格式存储原始数据
特性:
- 不可变的历史记录
- 读时模式(Schema-on-read)方式
- 元数据增强(_ingested_at、_source_file)
- 最小化转换
- 完整的审计追踪
适用场景:
- 数据恢复
- 重新处理需求
- 审计合规
- 调试数据问题
2. Silver Layer (Refined)
2. 白银层(精炼数据)
Purpose: Cleansed, validated, and standardized data
Characteristics:
- Schema enforcement
- Data quality checks
- Deduplication
- Standardization
- Type conversions
- Business rules applied
Use Cases:
- Downstream analytics
- Feature engineering
- Data science modeling
- Operational reporting
用途:经过清洗、验证和标准化的数据
特性:
- 模式强制执行
- 数据质量检查
- 去重
- 标准化
- 类型转换
- 应用业务规则
适用场景:
- 下游分析
- 特征工程
- 数据科学建模
- 运营报表
3. Gold Layer (Curated)
3. 黄金层(精选数据)
Purpose: Business-level aggregates optimized for consumption
Characteristics:
- Highly aggregated
- Optimized for queries
- Business KPIs
- Feature tables
- Production-ready datasets
Use Cases:
- Dashboards and BI
- ML model serving
- Real-time applications
- Executive reporting
用途:针对消费场景优化的业务级聚合数据
特性:
- 高度聚合
- 针对查询进行优化
- 业务KPI
- 特征表
- 可用于生产的数据集
适用场景:
- 仪表盘与BI分析
- ML模型服务
- 实时应用
- 高管报表
Implementation Patterns
实现模式
Pattern 1: Batch Processing
模式1:批量处理
Bronze Layer:
python
def ingest_to_bronze(source_path: str, target_table: str):
"""Ingest raw data to Bronze layer."""
df = (spark.read
.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.load(source_path)
.withColumn("_ingested_at", current_timestamp())
.withColumn("_source_file", input_file_name())
)
(df.write
.format("delta")
.mode("append")
.option("mergeSchema", "true")
.saveAsTable(target_table)
)Silver Layer:
python
def process_to_silver(bronze_table: str, silver_table: str):
"""Transform Bronze to Silver with quality checks."""
bronze_df = spark.read.table(bronze_table)
silver_df = (bronze_df
.dropDuplicates(["id"])
.filter(col("id").isNotNull())
.withColumn("email", lower(trim(col("email"))))
.withColumn("created_date", to_date(col("created_at")))
.withColumn("quality_score",
when(col("email").rlike(r"^[\w\.-]+@[\w\.-]+\.\w+$"), 1.0)
.otherwise(0.5)
)
)
(silver_df.write
.format("delta")
.mode("overwrite")
.saveAsTable(silver_table)
)Gold Layer:
python
def aggregate_to_gold(silver_table: str, gold_table: str):
"""Aggregate Silver to Gold business metrics."""
silver_df = spark.read.table(silver_table)
gold_df = (silver_df
.groupBy("customer_segment", "region")
.agg(
count("*").alias("customer_count"),
sum("lifetime_value").alias("total_ltv"),
avg("quality_score").alias("avg_quality")
)
.withColumn("updated_at", current_timestamp())
)
(gold_df.write
.format("delta")
.mode("overwrite")
.saveAsTable(gold_table)
)青铜层:
python
def ingest_to_bronze(source_path: str, target_table: str):
"""Ingest raw data to Bronze layer."""
df = (spark.read
.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.load(source_path)
.withColumn("_ingested_at", current_timestamp())
.withColumn("_source_file", input_file_name())
)
(df.write
.format("delta")
.mode("append")
.option("mergeSchema", "true")
.saveAsTable(target_table)
)白银层:
python
def process_to_silver(bronze_table: str, silver_table: str):
"""Transform Bronze to Silver with quality checks."""
bronze_df = spark.read.table(bronze_table)
silver_df = (bronze_df
.dropDuplicates(["id"])
.filter(col("id").isNotNull())
.withColumn("email", lower(trim(col("email"))))
.withColumn("created_date", to_date(col("created_at")))
.withColumn("quality_score",
when(col("email").rlike(r"^[\w\.-]+@[\w\.-]+\.\w+$"), 1.0)
.otherwise(0.5)
)
)
(silver_df.write
.format("delta")
.mode("overwrite")
.saveAsTable(silver_table)
)黄金层:
python
def aggregate_to_gold(silver_table: str, gold_table: str):
"""Aggregate Silver to Gold business metrics."""
silver_df = spark.read.table(silver_table)
gold_df = (silver_df
.groupBy("customer_segment", "region")
.agg(
count("*").alias("customer_count"),
sum("lifetime_value").alias("total_ltv"),
avg("quality_score").alias("avg_quality")
)
.withColumn("updated_at", current_timestamp())
)
(gold_df.write
.format("delta")
.mode("overwrite")
.saveAsTable(gold_table)
)Pattern 2: Incremental Processing
模式2:增量处理
Bronze (Streaming):
python
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(source_path)
.withColumn("_ingested_at", current_timestamp())
.writeStream
.format("delta")
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable(bronze_table)
)Silver (Incremental Merge):
python
from delta.tables import DeltaTable
def incremental_silver_merge(bronze_table: str, silver_table: str, watermark: str):
"""Incrementally merge new Bronze data into Silver."""
# Get new records since last watermark
new_records = (spark.read.table(bronze_table)
.filter(col("_ingested_at") > watermark)
)
# Transform
transformed = transform_to_silver(new_records)
# Merge into Silver
silver = DeltaTable.forName(spark, silver_table)
(silver.alias("target")
.merge(
transformed.alias("source"),
"target.id = source.id"
)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)青铜层(流处理):
python
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(source_path)
.withColumn("_ingested_at", current_timestamp())
.writeStream
.format("delta")
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable(bronze_table)
)白银层(增量合并):
python
from delta.tables import DeltaTable
def incremental_silver_merge(bronze_table: str, silver_table: str, watermark: str):
"""Incrementally merge new Bronze data into Silver."""
# Get new records since last watermark
new_records = (spark.read.table(bronze_table)
.filter(col("_ingested_at") > watermark)
)
# Transform
transformed = transform_to_silver(new_records)
# Merge into Silver
silver = DeltaTable.forName(spark, silver_table)
(silver.alias("target")
.merge(
transformed.alias("source"),
"target.id = source.id"
)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)Data Quality Patterns
数据质量模式
Quality Checks at Each Layer
各层级的质量检查
Bronze:
- File completeness check
- Row count validation
- Schema drift detection
Silver:
- Null value checks
- Data type validation
- Business rule validation
- Referential integrity
- Duplicate detection
Gold:
- Aggregate accuracy
- KPI threshold checks
- Trend anomaly detection
- Completeness validation
青铜层:
- 文件完整性检查
- 行数验证
- 模式漂移检测
白银层:
- 空值检查
- 数据类型验证
- 业务规则验证
- 引用完整性
- 重复项检测
黄金层:
- 聚合准确性
- KPI阈值检查
- 趋势异常检测
- 完整性验证
Quality Check Implementation
质量检查实现
python
def validate_silver_quality(table_name: str) -> Dict[str, bool]:
"""Run quality checks on Silver table."""
df = spark.read.table(table_name)
checks = {
"no_null_ids": df.filter(col("id").isNull()).count() == 0,
"valid_emails": df.filter(
~col("email").rlike(r"^[\w\.-]+@[\w\.-]+\.\w+$")
).count() == 0,
"no_duplicates": df.count() == df.select("id").distinct().count(),
"within_date_range": df.filter(
(col("created_date") < "2020-01-01") |
(col("created_date") > current_date())
).count() == 0
}
return checkspython
def validate_silver_quality(table_name: str) -> Dict[str, bool]:
"""Run quality checks on Silver table."""
df = spark.read.table(table_name)
checks = {
"no_null_ids": df.filter(col("id").isNull()).count() == 0,
"valid_emails": df.filter(
~col("email").rlike(r"^[\w\.-]+@[\w\.-]+\.\w+$")
).count() == 0,
"no_duplicates": df.count() == df.select("id").distinct().count(),
"within_date_range": df.filter(
(col("created_date") < "2020-01-01") |
(col("created_date") > current_date())
).count() == 0
}
return checksOptimization Strategies
优化策略
Bronze Layer Optimization
青铜层优化
sql
-- Partition by ingestion date
CREATE TABLE bronze.raw_events
USING delta
PARTITIONED BY (ingestion_date)
AS SELECT *, current_date() as ingestion_date FROM source;
-- Enable auto-optimize
ALTER TABLE bronze.raw_events
SET TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true'
);sql
-- Partition by ingestion date
CREATE TABLE bronze.raw_events
USING delta
PARTITIONED BY (ingestion_date)
AS SELECT *, current_date() as ingestion_date FROM source;
-- Enable auto-optimize
ALTER TABLE bronze.raw_events
SET TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true'
);Silver Layer Optimization
白银层优化
sql
-- Z-ORDER for common filters
OPTIMIZE silver.customers
ZORDER BY (customer_segment, region, created_date);
-- Enable Change Data Feed
ALTER TABLE silver.customers
SET TBLPROPERTIES (delta.enableChangeDataFeed = true);sql
-- Z-ORDER for common filters
OPTIMIZE silver.customers
ZORDER BY (customer_segment, region, created_date);
-- Enable Change Data Feed
ALTER TABLE silver.customers
SET TBLPROPERTIES (delta.enableChangeDataFeed = true);Gold Layer Optimization
黄金层优化
sql
-- Liquid clustering for query performance
CREATE TABLE gold.customer_metrics
USING delta
CLUSTER BY (customer_segment, date)
AS SELECT * FROM aggregated_metrics;
-- Optimize and vacuum
OPTIMIZE gold.customer_metrics;
VACUUM gold.customer_metrics RETAIN 168 HOURS;sql
-- Liquid clustering for query performance
CREATE TABLE gold.customer_metrics
USING delta
CLUSTER BY (customer_segment, date)
AS SELECT * FROM aggregated_metrics;
-- Optimize and vacuum
OPTIMIZE gold.customer_metrics;
VACUUM gold.customer_metrics RETAIN 168 HOURS;Complete Example
完整示例
See for a complete implementation including:
/templates/bronze-silver-gold/- Project structure
- Bronze ingestion scripts
- Silver transformation logic
- Gold aggregation queries
- Data quality tests
- Deployment configuration
请查看 获取完整实现,包括:
/templates/bronze-silver-gold/- 项目结构
- 青铜层摄入脚本
- 白银层转换逻辑
- 黄金层聚合查询
- 数据质量测试
- 部署配置
Best Practices
最佳实践
- Idempotency: Ensure pipelines can be re-run safely
- Incrementality: Process only new/changed data
- Quality Gates: Block bad data from progressing
- Schema Evolution: Handle schema changes gracefully
- Monitoring: Track pipeline health and data quality
- Documentation: Document data lineage and transformations
- Testing: Unit test transformations, integration test pipelines
- 幂等性:确保管道可以安全地重新运行
- 增量性:仅处理新增/变更的数据
- 质量网关:阻止不良数据进入下一层级
- 模式演进:优雅地处理模式变更
- 监控:跟踪管道健康状况和数据质量
- 文档:记录数据血缘和转换逻辑
- 测试:对转换逻辑进行单元测试,对管道进行集成测试
Common Pitfalls to Avoid
需避免的常见陷阱
❌ Don't:
- Mix transformation logic across layers
- Skip Bronze layer to "save storage"
- Over-aggregate too early
- Ignore data quality in Silver
- Hard-code business logic in Bronze
✅ Do:
- Keep Bronze immutable
- Enforce quality in Silver
- Optimize Gold for consumption
- Use incremental processing
- Implement proper monitoring
❌ 请勿:
- 在不同层级之间混合转换逻辑
- 跳过青铜层以“节省存储”
- 过早进行过度聚合
- 在白银层忽略数据质量
- 在青铜层硬编码业务逻辑
✅ 建议:
- 保持青铜层不可变
- 在白银层强制执行质量检查
- 针对消费场景优化黄金层
- 使用增量处理
- 实施适当的监控
Related Skills
相关技能
- : Declarative pipeline orchestration
delta-live-tables - : Great Expectations integration
data-quality - : Pipeline testing strategies
testing-patterns - : Deployment automation
cicd-workflows
- :声明式管道编排
delta-live-tables - :Great Expectations集成
data-quality - :管道测试策略
testing-patterns - :部署自动化
cicd-workflows