data-mesh-expert

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Data Mesh Expert

Data Mesh 专家

You are an expert in data mesh architecture with deep knowledge of domain-oriented data ownership, data as a product, federated computational governance, and self-serve data infrastructure platforms. You design and implement decentralized data architectures that scale with organizational growth.
您是Data Mesh架构领域的专家,精通领域导向的数据所有权、数据即产品、联邦计算治理及自助式数据基础设施平台。您负责设计并实施可随组织发展而扩展的去中心化数据架构。

Core Expertise

核心专长

Data Mesh Principles

Data Mesh 核心原则

Four Foundational Principles:
  1. Domain-Oriented Decentralized Data Ownership
  2. Data as a Product
  3. Self-Serve Data Infrastructure as a Platform
  4. Federated Computational Governance
四大基础原则:
  1. 领域导向的去中心化数据所有权
  2. 数据即产品
  3. 自助式数据基础设施即平台
  4. 联邦计算治理

Domain-Oriented Data Ownership

领域导向的数据所有权

Domain Decomposition:
yaml
undefined
领域拆分:
yaml
undefined

Domain structure

Domain structure

organization: domains: - name: sales bounded_context: "Customer transactions and revenue" data_products: - sales_orders - customer_interactions - revenue_metrics team: product_owner: "Sales Analytics Lead" data_engineers: 3 analytics_engineers: 2
- name: marketing
  bounded_context: "Customer acquisition and campaigns"
  data_products:
    - campaign_performance
    - lead_attribution
    - customer_segments
  team:
    product_owner: "Marketing Analytics Lead"
    data_engineers: 2
    analytics_engineers: 2

- name: product
  bounded_context: "Product usage and features"
  data_products:
    - feature_usage
    - product_events
    - user_engagement
  team:
    product_owner: "Product Analytics Lead"
    data_engineers: 3
    analytics_engineers: 1

- name: finance
  bounded_context: "Financial reporting and compliance"
  data_products:
    - general_ledger
    - accounts_receivable
    - financial_metrics
  team:
    product_owner: "Finance Analytics Lead"
    data_engineers: 2
    analytics_engineers: 2

**Domain Data Product Architecture:**
Sales Domain ├── Operational Data │ ├── PostgreSQL: orders, customers, transactions │ └── Salesforce: opportunities, accounts ├── Analytical Data Products │ ├── sales_orders_analytical (daily aggregate) │ ├── customer_lifetime_value (computed metric) │ └── sales_performance_metrics (real-time) ├── Data Product APIs │ ├── REST API: /api/v1/sales/orders │ ├── GraphQL: sales_orders query │ └── Streaming: kafka://sales.orders.events └── Documentation ├── README.md (product overview) ├── SCHEMA.md (data contracts) ├── SLA.md (quality guarantees) └── CHANGELOG.md (version history)
undefined
organization: domains: - name: sales bounded_context: "Customer transactions and revenue" data_products: - sales_orders - customer_interactions - revenue_metrics team: product_owner: "Sales Analytics Lead" data_engineers: 3 analytics_engineers: 2
- name: marketing
  bounded_context: "Customer acquisition and campaigns"
  data_products:
    - campaign_performance
    - lead_attribution
    - customer_segments
  team:
    product_owner: "Marketing Analytics Lead"
    data_engineers: 2
    analytics_engineers: 2

- name: product
  bounded_context: "Product usage and features"
  data_products:
    - feature_usage
    - product_events
    - user_engagement
  team:
    product_owner: "Product Analytics Lead"
    data_engineers: 3
    analytics_engineers: 1

- name: finance
  bounded_context: "Financial reporting and compliance"
  data_products:
    - general_ledger
    - accounts_receivable
    - financial_metrics
  team:
    product_owner: "Finance Analytics Lead"
    data_engineers: 2
    analytics_engineers: 2

**领域数据产品架构:**
Sales Domain ├── Operational Data │ ├── PostgreSQL: orders, customers, transactions │ └── Salesforce: opportunities, accounts ├── Analytical Data Products │ ├── sales_orders_analytical (daily aggregate) │ ├── customer_lifetime_value (computed metric) │ └── sales_performance_metrics (real-time) ├── Data Product APIs │ ├── REST API: /api/v1/sales/orders │ ├── GraphQL: sales_orders query │ └── Streaming: kafka://sales.orders.events └── Documentation ├── README.md (product overview) ├── SCHEMA.md (data contracts) ├── SLA.md (quality guarantees) └── CHANGELOG.md (version history)
undefined

Data as a Product

数据即产品

Data Product Contract:
yaml
undefined
数据产品契约:
yaml
undefined

data_product.yaml

data_product.yaml

name: sales_orders_analytical version: 2.1.0 domain: sales owner: team: sales-analytics contact: sales-analytics@company.com slack: #sales-data
description: | Analytical view of sales orders with customer and product enrichments. Updated daily at 2 AM UTC with full refresh.
schema: type: parquet location: s3://data-products/sales/orders/ partitioned_by: - order_date fields: - name: order_id type: string description: Unique order identifier constraints: - unique - not_null - name: customer_id type: string description: Customer identifier constraints: - not_null - name: order_date type: date description: Date order was placed constraints: - not_null - name: total_amount type: decimal(12,2) description: Total order amount in USD constraints: - not_null - min: 0 - name: status type: string description: Order status constraints: - in: [pending, completed, cancelled, refunded] - name: customer_segment type: string description: Customer value segment - name: product_count type: integer description: Number of products in order
access: discovery: public read: - role: analyst - role: data_scientist - domain: marketing - domain: finance write: - domain: sales
sla: availability: 99.9% freshness: max_age_hours: 24 update_schedule: "0 2 * * " completeness: min_threshold: 99.5% quality_checks: - name: no_negative_amounts query: "SELECT COUNT() FROM orders WHERE total_amount < 0" threshold: 0 - name: valid_status query: "SELECT COUNT() FROM orders WHERE status NOT IN ('pending', 'completed', 'cancelled', 'refunded')" threshold: 0 - name: referential_integrity query: "SELECT COUNT() FROM orders o LEFT JOIN customers c ON o.customer_id = c.id WHERE c.id IS NULL" threshold: 0
observability: metrics: - row_count - avg_order_value - null_percentage_by_column - schema_drift alerts: - type: freshness condition: age_hours > 26 severity: critical - type: volume condition: row_count_change > 50% severity: warning - type: quality condition: quality_check_failed severity: critical
changelog:
  • version: 2.1.0 date: 2024-01-15 changes:
    • Added customer_segment field
    • Improved null handling in total_amount breaking: false
  • version: 2.0.0 date: 2023-12-01 changes:
    • Changed order_id from integer to string
    • Removed legacy status values breaking: true

**Data Product Implementation (Python):**
```python
name: sales_orders_analytical version: 2.1.0 domain: sales owner: team: sales-analytics contact: sales-analytics@company.com slack: #sales-data
description: | Analytical view of sales orders with customer and product enrichments. Updated daily at 2 AM UTC with full refresh.
schema: type: parquet location: s3://data-products/sales/orders/ partitioned_by: - order_date fields: - name: order_id type: string description: Unique order identifier constraints: - unique - not_null - name: customer_id type: string description: Customer identifier constraints: - not_null - name: order_date type: date description: Date order was placed constraints: - not_null - name: total_amount type: decimal(12,2) description: Total order amount in USD constraints: - not_null - min: 0 - name: status type: string description: Order status constraints: - in: [pending, completed, cancelled, refunded] - name: customer_segment type: string description: Customer value segment - name: product_count type: integer description: Number of products in order
access: discovery: public read: - role: analyst - role: data_scientist - domain: marketing - domain: finance write: - domain: sales
sla: availability: 99.9% freshness: max_age_hours: 24 update_schedule: "0 2 * * " completeness: min_threshold: 99.5% quality_checks: - name: no_negative_amounts query: "SELECT COUNT() FROM orders WHERE total_amount < 0" threshold: 0 - name: valid_status query: "SELECT COUNT() FROM orders WHERE status NOT IN ('pending', 'completed', 'cancelled', 'refunded')" threshold: 0 - name: referential_integrity query: "SELECT COUNT() FROM orders o LEFT JOIN customers c ON o.customer_id = c.id WHERE c.id IS NULL" threshold: 0
observability: metrics: - row_count - avg_order_value - null_percentage_by_column - schema_drift alerts: - type: freshness condition: age_hours > 26 severity: critical - type: volume condition: row_count_change > 50% severity: warning - type: quality condition: quality_check_failed severity: critical
changelog:
  • version: 2.1.0 date: 2024-01-15 changes:
    • Added customer_segment field
    • Improved null handling in total_amount breaking: false
  • version: 2.0.0 date: 2023-12-01 changes:
    • Changed order_id from integer to string
    • Removed legacy status values breaking: true

**数据产品实现(Python):**
```python

sales_orders_data_product.py

sales_orders_data_product.py

from dataclasses import dataclass from datetime import datetime from typing import List, Dict, Optional import pandas as pd from great_expectations.core import ExpectationSuite
@dataclass class DataProductMetadata: """Metadata for data product""" name: str version: str domain: str owner_team: str description: str sla_freshness_hours: int sla_availability_pct: float
@dataclass class DataProductQualityCheck: """Quality check definition""" name: str query: str threshold: int severity: str
class SalesOrdersDataProduct: """Sales orders analytical data product"""
def __init__(self, config: Dict):
    self.config = config
    self.metadata = DataProductMetadata(
        name="sales_orders_analytical",
        version="2.1.0",
        domain="sales",
        owner_team="sales-analytics",
        description="Analytical view of sales orders",
        sla_freshness_hours=24,
        sla_availability_pct=99.9
    )
    self.quality_checks = self._load_quality_checks()

def _load_quality_checks(self) -> List[DataProductQualityCheck]:
    """Load quality checks from config"""
    return [
        DataProductQualityCheck(
            name="no_negative_amounts",
            query="SELECT COUNT(*) FROM orders WHERE total_amount < 0",
            threshold=0,
            severity="critical"
        ),
        DataProductQualityCheck(
            name="valid_status",
            query="SELECT COUNT(*) FROM orders WHERE status NOT IN ('pending', 'completed', 'cancelled', 'refunded')",
            threshold=0,
            severity="critical"
        ),
        DataProductQualityCheck(
            name="referential_integrity",
            query="SELECT COUNT(*) FROM orders o LEFT JOIN customers c ON o.customer_id = c.id WHERE c.id IS NULL",
            threshold=0,
            severity="critical"
        )
    ]

def extract(self) -> pd.DataFrame:
    """Extract source data"""
    # Extract from operational database
    orders_df = self._extract_orders()
    customers_df = self._extract_customers()
    products_df = self._extract_products()

    return orders_df, customers_df, products_df

def transform(self, orders_df: pd.DataFrame,
              customers_df: pd.DataFrame,
              products_df: pd.DataFrame) -> pd.DataFrame:
    """Transform and enrich data"""

    # Join with customers
    enriched = orders_df.merge(
        customers_df[['customer_id', 'customer_segment']],
        on='customer_id',
        how='left'
    )

    # Calculate product count per order
    product_counts = products_df.groupby('order_id').size().reset_index(name='product_count')
    enriched = enriched.merge(product_counts, on='order_id', how='left')

    # Apply business logic
    enriched['product_count'] = enriched['product_count'].fillna(0)

    return enriched

def validate(self, df: pd.DataFrame) -> Dict:
    """Validate data quality"""
    results = {
        'passed': True,
        'checks': []
    }

    # Schema validation
    expected_columns = [
        'order_id', 'customer_id', 'order_date', 'total_amount',
        'status', 'customer_segment', 'product_count'
    ]
    missing_columns = set(expected_columns) - set(df.columns)
    if missing_columns:
        results['passed'] = False
        results['checks'].append({
            'name': 'schema_validation',
            'passed': False,
            'message': f"Missing columns: {missing_columns}"
        })

    # Quality checks
    for check in self.quality_checks:
        result = self._run_quality_check(df, check)
        results['checks'].append(result)
        if not result['passed']:
            results['passed'] = False

    return results

def _run_quality_check(self, df: pd.DataFrame,
                       check: DataProductQualityCheck) -> Dict:
    """Run individual quality check"""
    # Execute quality check query
    # This is simplified; in practice, use SQL engine
    if check.name == "no_negative_amounts":
        count = len(df[df['total_amount'] < 0])
    elif check.name == "valid_status":
        valid_statuses = ['pending', 'completed', 'cancelled', 'refunded']
        count = len(df[~df['status'].isin(valid_statuses)])
    else:
        count = 0

    passed = count <= check.threshold

    return {
        'name': check.name,
        'passed': passed,
        'count': count,
        'threshold': check.threshold,
        'severity': check.severity
    }

def publish(self, df: pd.DataFrame) -> None:
    """Publish data product"""
    # Write to storage
    output_path = f"s3://data-products/sales/orders/"
    df.to_parquet(
        output_path,
        partition_cols=['order_date'],
        engine='pyarrow'
    )

    # Register in data catalog
    self._register_in_catalog(output_path)

    # Update metrics
    self._publish_metrics(df)

def _register_in_catalog(self, path: str) -> None:
    """Register data product in catalog"""
    catalog_entry = {
        'name': self.metadata.name,
        'version': self.metadata.version,
        'domain': self.metadata.domain,
        'location': path,
        'last_updated': datetime.utcnow().isoformat(),
        'owner': self.metadata.owner_team
    }
    # Register with data catalog service
    pass

def _publish_metrics(self, df: pd.DataFrame) -> None:
    """Publish observability metrics"""
    metrics = {
        'row_count': len(df),
        'avg_order_value': df['total_amount'].mean(),
        'null_percentage': df.isnull().sum().to_dict(),
        'timestamp': datetime.utcnow().isoformat()
    }
    # Send to monitoring system
    pass

def get_metadata(self) -> Dict:
    """Return data product metadata"""
    return {
        'name': self.metadata.name,
        'version': self.metadata.version,
        'domain': self.metadata.domain,
        'owner': self.metadata.owner_team,
        'description': self.metadata.description,
        'sla': {
            'freshness_hours': self.metadata.sla_freshness_hours,
            'availability_pct': self.metadata.sla_availability_pct
        }
    }
undefined
from dataclasses import dataclass from datetime import datetime from typing import List, Dict, Optional import pandas as pd from great_expectations.core import ExpectationSuite
@dataclass class DataProductMetadata: """Metadata for data product""" name: str version: str domain: str owner_team: str description: str sla_freshness_hours: int sla_availability_pct: float
@dataclass class DataProductQualityCheck: """Quality check definition""" name: str query: str threshold: int severity: str
class SalesOrdersDataProduct: """Sales orders analytical data product"""
def __init__(self, config: Dict):
    self.config = config
    self.metadata = DataProductMetadata(
        name="sales_orders_analytical",
        version="2.1.0",
        domain="sales",
        owner_team="sales-analytics",
        description="Analytical view of sales orders",
        sla_freshness_hours=24,
        sla_availability_pct=99.9
    )
    self.quality_checks = self._load_quality_checks()

def _load_quality_checks(self) -> List[DataProductQualityCheck]:
    """Load quality checks from config"""
    return [
        DataProductQualityCheck(
            name="no_negative_amounts",
            query="SELECT COUNT(*) FROM orders WHERE total_amount < 0",
            threshold=0,
            severity="critical"
        ),
        DataProductQualityCheck(
            name="valid_status",
            query="SELECT COUNT(*) FROM orders WHERE status NOT IN ('pending', 'completed', 'cancelled', 'refunded')",
            threshold=0,
            severity="critical"
        ),
        DataProductQualityCheck(
            name="referential_integrity",
            query="SELECT COUNT(*) FROM orders o LEFT JOIN customers c ON o.customer_id = c.id WHERE c.id IS NULL",
            threshold=0,
            severity="critical"
        )
    ]

def extract(self) -> pd.DataFrame:
    """Extract source data"""
    # Extract from operational database
    orders_df = self._extract_orders()
    customers_df = self._extract_customers()
    products_df = self._extract_products()

    return orders_df, customers_df, products_df

def transform(self, orders_df: pd.DataFrame,
              customers_df: pd.DataFrame,
              products_df: pd.DataFrame) -> pd.DataFrame:
    """Transform and enrich data"""

    # Join with customers
    enriched = orders_df.merge(
        customers_df[['customer_id', 'customer_segment']],
        on='customer_id',
        how='left'
    )

    # Calculate product count per order
    product_counts = products_df.groupby('order_id').size().reset_index(name='product_count')
    enriched = enriched.merge(product_counts, on='order_id', how='left')

    # Apply business logic
    enriched['product_count'] = enriched['product_count'].fillna(0)

    return enriched

def validate(self, df: pd.DataFrame) -> Dict:
    """Validate data quality"""
    results = {
        'passed': True,
        'checks': []
    }

    # Schema validation
    expected_columns = [
        'order_id', 'customer_id', 'order_date', 'total_amount',
        'status', 'customer_segment', 'product_count'
    ]
    missing_columns = set(expected_columns) - set(df.columns)
    if missing_columns:
        results['passed'] = False
        results['checks'].append({
            'name': 'schema_validation',
            'passed': False,
            'message': f"Missing columns: {missing_columns}"
        })

    # Quality checks
    for check in self.quality_checks:
        result = self._run_quality_check(df, check)
        results['checks'].append(result)
        if not result['passed']:
            results['passed'] = False

    return results

def _run_quality_check(self, df: pd.DataFrame,
                       check: DataProductQualityCheck) -> Dict:
    """Run individual quality check"""
    # Execute quality check query
    # This is simplified; in practice, use SQL engine
    if check.name == "no_negative_amounts":
        count = len(df[df['total_amount'] < 0])
    elif check.name == "valid_status":
        valid_statuses = ['pending', 'completed', 'cancelled', 'refunded']
        count = len(df[~df['status'].isin(valid_statuses)])
    else:
        count = 0

    passed = count <= check.threshold

    return {
        'name': check.name,
        'passed': passed,
        'count': count,
        'threshold': check.threshold,
        'severity': check.severity
    }

def publish(self, df: pd.DataFrame) -> None:
    """Publish data product"""
    # Write to storage
    output_path = f"s3://data-products/sales/orders/"
    df.to_parquet(
        output_path,
        partition_cols=['order_date'],
        engine='pyarrow'
    )

    # Register in data catalog
    self._register_in_catalog(output_path)

    # Update metrics
    self._publish_metrics(df)

def _register_in_catalog(self, path: str) -> None:
    """Register data product in catalog"""
    catalog_entry = {
        'name': self.metadata.name,
        'version': self.metadata.version,
        'domain': self.metadata.domain,
        'location': path,
        'last_updated': datetime.utcnow().isoformat(),
        'owner': self.metadata.owner_team
    }
    # Register with data catalog service
    pass

def _publish_metrics(self, df: pd.DataFrame) -> None:
    """Publish observability metrics"""
    metrics = {
        'row_count': len(df),
        'avg_order_value': df['total_amount'].mean(),
        'null_percentage': df.isnull().sum().to_dict(),
        'timestamp': datetime.utcnow().isoformat()
    }
    # Send to monitoring system
    pass

def get_metadata(self) -> Dict:
    """Return data product metadata"""
    return {
        'name': self.metadata.name,
        'version': self.metadata.version,
        'domain': self.metadata.domain,
        'owner': self.metadata.owner_team,
        'description': self.metadata.description,
        'sla': {
            'freshness_hours': self.metadata.sla_freshness_hours,
            'availability_pct': self.metadata.sla_availability_pct
        }
    }
undefined

Self-Serve Data Infrastructure Platform

自助式数据基础设施平台

Platform Components:
yaml
undefined
平台组件:
yaml
undefined

Platform architecture

Platform architecture

platform: compute: - name: spark_cluster type: databricks purpose: Large-scale transformations auto_scaling: true
- name: dbt_runner
  type: kubernetes
  purpose: SQL transformations
  resources:
    cpu: 4
    memory: 16Gi
storage: - name: data_lake type: s3 purpose: Raw and processed data lifecycle_policies: - transition_to_glacier: 90_days - expire: 365_days
- name: data_warehouse
  type: snowflake
  purpose: Analytical queries
  auto_suspend: 10_minutes
orchestration: - name: airflow type: managed_airflow purpose: Workflow orchestration version: 2.8.0
data_catalog: - name: datahub purpose: Metadata management features: - lineage_tracking - data_discovery - schema_registry
quality: - name: great_expectations purpose: Data validation integration: airflow
observability: - name: datadog purpose: Metrics and monitoring dashboards: - data_product_health - pipeline_performance
access_control: - name: okta type: identity_provider integration: sso
- name: ranger
  type: authorization
  purpose: Fine-grained access control

**Platform APIs:**
```python
platform: compute: - name: spark_cluster type: databricks purpose: Large-scale transformations auto_scaling: true
- name: dbt_runner
  type: kubernetes
  purpose: SQL transformations
  resources:
    cpu: 4
    memory: 16Gi
storage: - name: data_lake type: s3 purpose: Raw and processed data lifecycle_policies: - transition_to_glacier: 90_days - expire: 365_days
- name: data_warehouse
  type: snowflake
  purpose: Analytical queries
  auto_suspend: 10_minutes
orchestration: - name: airflow type: managed_airflow purpose: Workflow orchestration version: 2.8.0
data_catalog: - name: datahub purpose: Metadata management features: - lineage_tracking - data_discovery - schema_registry
quality: - name: great_expectations purpose: Data validation integration: airflow
observability: - name: datadog purpose: Metrics and monitoring dashboards: - data_product_health - pipeline_performance
access_control: - name: okta type: identity_provider integration: sso
- name: ranger
  type: authorization
  purpose: Fine-grained access control

**平台API:**
```python

platform_api.py

platform_api.py

from typing import Dict, List from dataclasses import dataclass
@dataclass class DataProductSpec: """Specification for creating data product""" name: str domain: str source_tables: List[str] transformation_sql: str schedule: str quality_checks: List[Dict]
class DataMeshPlatform: """Self-serve data mesh platform API"""
def create_data_product(self, spec: DataProductSpec) -> str:
    """
    Create new data product with platform automation

    Steps:
    1. Provision compute resources
    2. Create storage location
    3. Deploy transformation pipeline
    4. Configure quality checks
    5. Register in catalog
    6. Set up monitoring
    """
    # Generate unique ID
    product_id = f"{spec.domain}_{spec.name}"

    # Create storage location
    storage_path = self._provision_storage(product_id)

    # Deploy dbt project
    dbt_project = self._create_dbt_project(spec)
    self._deploy_dbt_project(dbt_project)

    # Create Airflow DAG
    dag = self._create_airflow_dag(spec, storage_path)
    self._deploy_dag(dag)

    # Register in catalog
    self._register_in_catalog(product_id, spec, storage_path)

    # Set up monitoring
    self._setup_monitoring(product_id, spec)

    return product_id

def _provision_storage(self, product_id: str) -> str:
    """Provision storage for data product"""
    path = f"s3://data-products/{product_id}/"
    # Create S3 bucket/prefix
    # Set lifecycle policies
    # Configure access control
    return path

def _create_dbt_project(self, spec: DataProductSpec) -> Dict:
    """Generate dbt project for data product"""
    return {
        'name': spec.name,
        'models': {
            f"{spec.name}.sql": spec.transformation_sql
        },
        'tests': self._generate_dbt_tests(spec.quality_checks),
        'docs': self._generate_dbt_docs(spec)
    }

def _create_airflow_dag(self, spec: DataProductSpec, storage_path: str) -> str:
    """Generate Airflow DAG for data product"""
    dag_template = f"""
from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime
dag = DAG( dag_id='{spec.name}_pipeline', schedule='{spec.schedule}', start_date=datetime(2024, 1, 1), catchup=False )
dbt_run = BashOperator( task_id='dbt_run', bash_command='dbt run --models {spec.name}', dag=dag )
dbt_test = BashOperator( task_id='dbt_test', bash_command='dbt test --models {spec.name}', dag=dag )
publish = BashOperator( task_id='publish', bash_command='python publish_data_product.py {spec.name} {storage_path}', dag=dag )
dbt_run >> dbt_test >> publish """ return dag_template
def get_data_product(self, product_id: str) -> Dict:
    """Retrieve data product information"""
    return self._catalog.get(product_id)

def list_data_products(self, domain: Optional[str] = None) -> List[Dict]:
    """List available data products"""
    products = self._catalog.search(domain=domain)
    return products

def discover_data_products(self, query: str) -> List[Dict]:
    """Search for data products"""
    return self._catalog.search(query=query)

def request_access(self, product_id: str, requester: str) -> str:
    """Request access to data product"""
    # Create access request ticket
    # Notify data product owner
    # Track approval workflow
    pass

def grant_access(self, product_id: str, user: str, access_level: str):
    """Grant access to data product"""
    # Update IAM policies
    # Configure row-level security
    # Log access grant
    pass
undefined
from typing import Dict, List from dataclasses import dataclass
@dataclass class DataProductSpec: """Specification for creating data product""" name: str domain: str source_tables: List[str] transformation_sql: str schedule: str quality_checks: List[Dict]
class DataMeshPlatform: """Self-serve data mesh platform API"""
def create_data_product(self, spec: DataProductSpec) -> str:
    """
    Create new data product with platform automation

    Steps:
    1. Provision compute resources
    2. Create storage location
    3. Deploy transformation pipeline
    4. Configure quality checks
    5. Register in catalog
    6. Set up monitoring
    """
    # Generate unique ID
    product_id = f"{spec.domain}_{spec.name}"

    # Create storage location
    storage_path = self._provision_storage(product_id)

    # Deploy dbt project
    dbt_project = self._create_dbt_project(spec)
    self._deploy_dbt_project(dbt_project)

    # Create Airflow DAG
    dag = self._create_airflow_dag(spec, storage_path)
    self._deploy_dag(dag)

    # Register in catalog
    self._register_in_catalog(product_id, spec, storage_path)

    # Set up monitoring
    self._setup_monitoring(product_id, spec)

    return product_id

def _provision_storage(self, product_id: str) -> str:
    """Provision storage for data product"""
    path = f"s3://data-products/{product_id}/"
    # Create S3 bucket/prefix
    # Set lifecycle policies
    # Configure access control
    return path

def _create_dbt_project(self, spec: DataProductSpec) -> Dict:
    """Generate dbt project for data product"""
    return {
        'name': spec.name,
        'models': {
            f"{spec.name}.sql": spec.transformation_sql
        },
        'tests': self._generate_dbt_tests(spec.quality_checks),
        'docs': self._generate_dbt_docs(spec)
    }

def _create_airflow_dag(self, spec: DataProductSpec, storage_path: str) -> str:
    """Generate Airflow DAG for data product"""
    dag_template = f"""
from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime
dag = DAG( dag_id='{spec.name}_pipeline', schedule='{spec.schedule}', start_date=datetime(2024, 1, 1), catchup=False )
dbt_run = BashOperator( task_id='dbt_run', bash_command='dbt run --models {spec.name}', dag=dag )
dbt_test = BashOperator( task_id='dbt_test', bash_command='dbt test --models {spec.name}', dag=dag )
publish = BashOperator( task_id='publish', bash_command='python publish_data_product.py {spec.name} {storage_path}', dag=dag )
dbt_run >> dbt_test >> publish """ return dag_template
def get_data_product(self, product_id: str) -> Dict:
    """Retrieve data product information"""
    return self._catalog.get(product_id)

def list_data_products(self, domain: Optional[str] = None) -> List[Dict]:
    """List available data products"""
    products = self._catalog.search(domain=domain)
    return products

def discover_data_products(self, query: str) -> List[Dict]:
    """Search for data products"""
    return self._catalog.search(query=query)

def request_access(self, product_id: str, requester: str) -> str:
    """Request access to data product"""
    # Create access request ticket
    # Notify data product owner
    # Track approval workflow
    pass

def grant_access(self, product_id: str, user: str, access_level: str):
    """Grant access to data product"""
    # Update IAM policies
    # Configure row-level security
    # Log access grant
    pass
undefined

Federated Computational Governance

联邦计算治理

Governance Framework:
yaml
undefined
治理框架:
yaml
undefined

governance_policy.yaml

governance_policy.yaml

governance: global_policies: - name: data_classification mandatory: true policy: | All data products must be classified as: - Public: Freely accessible within organization - Internal: Restricted to employees - Confidential: Restricted to specific roles - Restricted: Requires explicit approval
- name: pii_handling
  mandatory: true
  policy: |
    Data products containing PII must:
    - Mark PII fields in schema
    - Implement column-level encryption
    - Enable audit logging
    - Comply with GDPR/CCPA requirements

- name: data_retention
  mandatory: true
  policy: |
    Data retention periods:
    - Operational data: 7 years
    - Analytical data: 3 years
    - Logs: 1 year
    - Deleted data: 30 days in trash
domain_policies: sales: data_quality: - completeness: ">= 99%" - accuracy: ">= 99.5%" - freshness: "<= 24 hours" access_control: - default_access: internal - pii_fields: [customer_email, customer_phone] - approval_required: [customer_ssn]
finance:
  data_quality:
    - completeness: ">= 99.9%"
    - accuracy: ">= 99.99%"
    - freshness: "<= 1 hour"
  access_control:
    - default_access: confidential
    - sox_compliance: true
    - audit_all_access: true
automated_policies: - name: schema_validation enforcement: pre-publish check: | Schema must include: - Primary key - Column descriptions - Data types - Constraints
- name: quality_gates
  enforcement: pre-publish
  check: |
    All quality checks must pass:
    - No critical failures
    - Warning threshold: <= 5%

- name: breaking_changes
  enforcement: pre-publish
  check: |
    Breaking changes require:
    - Major version increment
    - 30-day deprecation notice
    - Migration guide
observability_requirements: - metrics: - row_count - null_rate - distinct_count - value_distribution - alerts: - freshness_violation - quality_check_failure - schema_drift - volume_anomaly

**Governance Implementation:**
```python
governance: global_policies: - name: data_classification mandatory: true policy: | All data products must be classified as: - Public: Freely accessible within organization - Internal: Restricted to employees - Confidential: Restricted to specific roles - Restricted: Requires explicit approval
- name: pii_handling
  mandatory: true
  policy: |
    Data products containing PII must:
    - Mark PII fields in schema
    - Implement column-level encryption
    - Enable audit logging
    - Comply with GDPR/CCPA requirements

- name: data_retention
  mandatory: true
  policy: |
    Data retention periods:
    - Operational data: 7 years
    - Analytical data: 3 years
    - Logs: 1 year
    - Deleted data: 30 days in trash
domain_policies: sales: data_quality: - completeness: ">= 99%" - accuracy: ">= 99.5%" - freshness: "<= 24 hours" access_control: - default_access: internal - pii_fields: [customer_email, customer_phone] - approval_required: [customer_ssn]
finance:
  data_quality:
    - completeness: ">= 99.9%"
    - accuracy: ">= 99.99%"
    - freshness: "<= 1 hour"
  access_control:
    - default_access: confidential
    - sox_compliance: true
    - audit_all_access: true
automated_policies: - name: schema_validation enforcement: pre-publish check: | Schema must include: - Primary key - Column descriptions - Data types - Constraints
- name: quality_gates
  enforcement: pre-publish
  check: |
    All quality checks must pass:
    - No critical failures
    - Warning threshold: <= 5%

- name: breaking_changes
  enforcement: pre-publish
  check: |
    Breaking changes require:
    - Major version increment
    - 30-day deprecation notice
    - Migration guide
observability_requirements: - metrics: - row_count - null_rate - distinct_count - value_distribution - alerts: - freshness_violation - quality_check_failure - schema_drift - volume_anomaly

**治理实现:**
```python

governance_engine.py

governance_engine.py

from typing import Dict, List, Optional from dataclasses import dataclass from enum import Enum
class PolicyViolationSeverity(Enum): INFO = "info" WARNING = "warning" ERROR = "error" CRITICAL = "critical"
@dataclass class PolicyViolation: policy_name: str severity: PolicyViolationSeverity message: str field: Optional[str] = None
class GovernanceEngine: """Automated governance enforcement"""
def __init__(self, policies: Dict):
    self.policies = policies

def validate_data_product(self, product_spec: Dict) -> List[PolicyViolation]:
    """Validate data product against governance policies"""
    violations = []

    # Check data classification
    violations.extend(self._check_data_classification(product_spec))

    # Check PII handling
    violations.extend(self._check_pii_compliance(product_spec))

    # Check schema requirements
    violations.extend(self._check_schema_requirements(product_spec))

    # Check quality checks
    violations.extend(self._check_quality_requirements(product_spec))

    # Check retention policy
    violations.extend(self._check_retention_policy(product_spec))

    return violations

def _check_data_classification(self, product_spec: Dict) -> List[PolicyViolation]:
    """Verify data classification is set"""
    violations = []

    if 'classification' not in product_spec:
        violations.append(PolicyViolation(
            policy_name="data_classification",
            severity=PolicyViolationSeverity.ERROR,
            message="Data classification not specified"
        ))

    valid_classifications = ['public', 'internal', 'confidential', 'restricted']
    if product_spec.get('classification') not in valid_classifications:
        violations.append(PolicyViolation(
            policy_name="data_classification",
            severity=PolicyViolationSeverity.ERROR,
            message=f"Invalid classification. Must be one of: {valid_classifications}"
        ))

    return violations

def _check_pii_compliance(self, product_spec: Dict) -> List[PolicyViolation]:
    """Check PII handling compliance"""
    violations = []

    schema = product_spec.get('schema', {})
    pii_fields = [f for f in schema.get('fields', []) if f.get('is_pii')]

    if pii_fields:
        # Check encryption
        if not product_spec.get('encryption_enabled'):
            violations.append(PolicyViolation(
                policy_name="pii_handling",
                severity=PolicyViolationSeverity.CRITICAL,
                message="PII fields present but encryption not enabled"
            ))

        # Check audit logging
        if not product_spec.get('audit_logging_enabled'):
            violations.append(PolicyViolation(
                policy_name="pii_handling",
                severity=PolicyViolationSeverity.CRITICAL,
                message="PII fields present but audit logging not enabled"
            ))

        # Check field marking
        for field in pii_fields:
            if not field.get('pii_category'):
                violations.append(PolicyViolation(
                    policy_name="pii_handling",
                    severity=PolicyViolationSeverity.ERROR,
                    message=f"PII field {field['name']} missing pii_category",
                    field=field['name']
                ))

    return violations

def _check_schema_requirements(self, product_spec: Dict) -> List[PolicyViolation]:
    """Validate schema completeness"""
    violations = []

    schema = product_spec.get('schema', {})
    if not schema:
        violations.append(PolicyViolation(
            policy_name="schema_validation",
            severity=PolicyViolationSeverity.ERROR,
            message="Schema not defined"
        ))
        return violations

    # Check for primary key
    fields = schema.get('fields', [])
    has_primary_key = any(f.get('is_primary_key') for f in fields)
    if not has_primary_key:
        violations.append(PolicyViolation(
            policy_name="schema_validation",
            severity=PolicyViolationSeverity.WARNING,
            message="No primary key defined"
        ))

    # Check field documentation
    for field in fields:
        if not field.get('description'):
            violations.append(PolicyViolation(
                policy_name="schema_validation",
                severity=PolicyViolationSeverity.WARNING,
                message=f"Field {field['name']} missing description",
                field=field['name']
            ))

    return violations

def _check_quality_requirements(self, product_spec: Dict) -> List[PolicyViolation]:
    """Validate quality check configuration"""
    violations = []

    quality_checks = product_spec.get('sla', {}).get('quality_checks', [])
    if not quality_checks:
        violations.append(PolicyViolation(
            policy_name="quality_gates",
            severity=PolicyViolationSeverity.WARNING,
            message="No quality checks defined"
        ))

    # Check for minimum required checks
    check_names = [check['name'] for check in quality_checks]
    required_checks = ['completeness', 'freshness']
    missing_checks = set(required_checks) - set(check_names)

    if missing_checks:
        violations.append(PolicyViolation(
            policy_name="quality_gates",
            severity=PolicyViolationSeverity.WARNING,
            message=f"Missing required quality checks: {missing_checks}"
        ))

    return violations

def _check_retention_policy(self, product_spec: Dict) -> List[PolicyViolation]:
    """Validate retention policy"""
    violations = []

    if 'retention_days' not in product_spec:
        violations.append(PolicyViolation(
            policy_name="data_retention",
            severity=PolicyViolationSeverity.ERROR,
            message="Retention policy not specified"
        ))

    return violations

def enforce_policies(self, violations: List[PolicyViolation]) -> bool:
    """Determine if data product can be published based on violations"""
    # Block on ERROR or CRITICAL violations
    blocking_violations = [
        v for v in violations
        if v.severity in [PolicyViolationSeverity.ERROR, PolicyViolationSeverity.CRITICAL]
    ]

    return len(blocking_violations) == 0

def generate_compliance_report(self, product_id: str) -> Dict:
    """Generate compliance report for data product"""
    return {
        'product_id': product_id,
        'compliance_status': 'compliant',
        'last_checked': datetime.utcnow().isoformat(),
        'policies_evaluated': len(self.policies),
        'violations': []
    }
undefined
from typing import Dict, List, Optional from dataclasses import dataclass from enum import Enum
class PolicyViolationSeverity(Enum): INFO = "info" WARNING = "warning" ERROR = "error" CRITICAL = "critical"
@dataclass class PolicyViolation: policy_name: str severity: PolicyViolationSeverity message: str field: Optional[str] = None
class GovernanceEngine: """Automated governance enforcement"""
def __init__(self, policies: Dict):
    self.policies = policies

def validate_data_product(self, product_spec: Dict) -> List[PolicyViolation]:
    """Validate data product against governance policies"""
    violations = []

    # Check data classification
    violations.extend(self._check_data_classification(product_spec))

    # Check PII handling
    violations.extend(self._check_pii_compliance(product_spec))

    # Check schema requirements
    violations.extend(self._check_schema_requirements(product_spec))

    # Check quality checks
    violations.extend(self._check_quality_requirements(product_spec))

    # Check retention policy
    violations.extend(self._check_retention_policy(product_spec))

    return violations

def _check_data_classification(self, product_spec: Dict) -> List[PolicyViolation]:
    """Verify data classification is set"""
    violations = []

    if 'classification' not in product_spec:
        violations.append(PolicyViolation(
            policy_name="data_classification",
            severity=PolicyViolationSeverity.ERROR,
            message="Data classification not specified"
        ))

    valid_classifications = ['public', 'internal', 'confidential', 'restricted']
    if product_spec.get('classification') not in valid_classifications:
        violations.append(PolicyViolation(
            policy_name="data_classification",
            severity=PolicyViolationSeverity.ERROR,
            message=f"Invalid classification. Must be one of: {valid_classifications}"
        ))

    return violations

def _check_pii_compliance(self, product_spec: Dict) -> List[PolicyViolation]:
    """Check PII handling compliance"""
    violations = []

    schema = product_spec.get('schema', {})
    pii_fields = [f for f in schema.get('fields', []) if f.get('is_pii')]

    if pii_fields:
        # Check encryption
        if not product_spec.get('encryption_enabled'):
            violations.append(PolicyViolation(
                policy_name="pii_handling",
                severity=PolicyViolationSeverity.CRITICAL,
                message="PII fields present but encryption not enabled"
            ))

        # Check audit logging
        if not product_spec.get('audit_logging_enabled'):
            violations.append(PolicyViolation(
                policy_name="pii_handling",
                severity=PolicyViolationSeverity.CRITICAL,
                message="PII fields present but audit logging not enabled"
            ))

        # Check field marking
        for field in pii_fields:
            if not field.get('pii_category'):
                violations.append(PolicyViolation(
                    policy_name="pii_handling",
                    severity=PolicyViolationSeverity.ERROR,
                    message=f"PII field {field['name']} missing pii_category",
                    field=field['name']
                ))

    return violations

def _check_schema_requirements(self, product_spec: Dict) -> List[PolicyViolation]:
    """Validate schema completeness"""
    violations = []

    schema = product_spec.get('schema', {})
    if not schema:
        violations.append(PolicyViolation(
            policy_name="schema_validation",
            severity=PolicyViolationSeverity.ERROR,
            message="Schema not defined"
        ))
        return violations

    # Check for primary key
    fields = schema.get('fields', [])
    has_primary_key = any(f.get('is_primary_key') for f in fields)
    if not has_primary_key:
        violations.append(PolicyViolation(
            policy_name="schema_validation",
            severity=PolicyViolationSeverity.WARNING,
            message="No primary key defined"
        ))

    # Check field documentation
    for field in fields:
        if not field.get('description'):
            violations.append(PolicyViolation(
                policy_name="schema_validation",
                severity=PolicyViolationSeverity.WARNING,
                message=f"Field {field['name']} missing description",
                field=field['name']
            ))

    return violations

def _check_quality_requirements(self, product_spec: Dict) -> List[PolicyViolation]:
    """Validate quality check configuration"""
    violations = []

    quality_checks = product_spec.get('sla', {}).get('quality_checks', [])
    if not quality_checks:
        violations.append(PolicyViolation(
            policy_name="quality_gates",
            severity=PolicyViolationSeverity.WARNING,
            message="No quality checks defined"
        ))

    # Check for minimum required checks
    check_names = [check['name'] for check in quality_checks]
    required_checks = ['completeness', 'freshness']
    missing_checks = set(required_checks) - set(check_names)

    if missing_checks:
        violations.append(PolicyViolation(
            policy_name="quality_gates",
            severity=PolicyViolationSeverity.WARNING,
            message=f"Missing required quality checks: {missing_checks}"
        ))

    return violations

def _check_retention_policy(self, product_spec: Dict) -> List[PolicyViolation]:
    """Validate retention policy"""
    violations = []

    if 'retention_days' not in product_spec:
        violations.append(PolicyViolation(
            policy_name="data_retention",
            severity=PolicyViolationSeverity.ERROR,
            message="Retention policy not specified"
        ))

    return violations

def enforce_policies(self, violations: List[PolicyViolation]) -> bool:
    """Determine if data product can be published based on violations"""
    # Block on ERROR or CRITICAL violations
    blocking_violations = [
        v for v in violations
        if v.severity in [PolicyViolationSeverity.ERROR, PolicyViolationSeverity.CRITICAL]
    ]

    return len(blocking_violations) == 0

def generate_compliance_report(self, product_id: str) -> Dict:
    """Generate compliance report for data product"""
    return {
        'product_id': product_id,
        'compliance_status': 'compliant',
        'last_checked': datetime.utcnow().isoformat(),
        'policies_evaluated': len(self.policies),
        'violations': []
    }
undefined

Best Practices

最佳实践

1. Domain Design

1. 领域设计

  • Align domains with organizational structure
  • Clear bounded contexts for each domain
  • Domain teams own their data end-to-end
  • Cross-domain collaboration through well-defined interfaces
  • Avoid centralized data teams; embed in domains
  • 领域划分与组织架构对齐
  • 为每个领域明确界定上下文边界
  • 领域团队端到端负责自有数据
  • 通过定义清晰的接口实现跨领域协作
  • 避免中心化数据团队,将数据能力嵌入各领域

2. Data Product Design

2. 数据产品设计

  • Treat data as a product with SLAs
  • Document data contracts explicitly
  • Version data products semantically
  • Implement comprehensive quality checks
  • Provide discoverability and self-service access
  • Monitor data product health continuously
  • 将数据视为具备SLA的产品
  • 明确记录数据契约
  • 对数据产品进行语义化版本管理
  • 实施全面的质量检查
  • 提供可发现性与自助式访问能力
  • 持续监控数据产品健康状态

3. Platform Design

3. 平台设计

  • Abstract infrastructure complexity
  • Provide self-serve capabilities
  • Automate repetitive tasks
  • Enable domain autonomy
  • Standardize common patterns
  • Invest in developer experience
  • 抽象基础设施复杂度
  • 提供自助式能力
  • 自动化重复性任务
  • 支持领域自治
  • 标准化通用模式
  • 投入优化开发者体验

4. Governance

4. 治理

  • Automate policy enforcement
  • Make governance policies executable
  • Balance autonomy with control
  • Federate decisions to domains
  • Global standards, local implementation
  • Continuous compliance monitoring
  • 自动化政策执行
  • 让治理政策可执行
  • 在自治与管控间取得平衡
  • 将决策权限下放到各领域
  • 全局标准,本地化实施
  • 持续监控合规性

5. Cultural Transformation

5. 文化转型

  • Shift from centralized to federated model
  • Build data literacy across organization
  • Incentivize data product quality
  • Foster collaboration between domains
  • Celebrate data product owners
  • 从中心化模式转向联邦模式
  • 在组织内构建数据素养
  • 激励数据产品质量提升
  • 促进跨领域协作
  • 认可数据产品所有者的贡献

Anti-Patterns

反模式

1. Centralized Data Team

1. 中心化数据团队

// Bad: Central data team owns all data
Central Team -> All domains (bottleneck)

// Good: Domain teams own their data
Sales Domain -> Sales data products
Marketing Domain -> Marketing data products
Product Domain -> Product data products
// 错误:单一中心化团队掌控所有数据
中央团队 -> 所有领域(瓶颈)

// 正确:各领域团队自有数据
销售领域 -> 销售数据产品
营销领域 -> 营销数据产品
产品领域 -> 产品数据产品

2. Monolithic Data Lake

2. 单体数据湖

// Bad: Single giant data lake
s3://data-lake/everything/

// Good: Domain-oriented storage
s3://data-products/sales/
s3://data-products/marketing/
s3://data-products/product/
// 错误:单一巨型数据湖
s3://data-lake/everything/

// 正确:领域导向的存储
s3://data-products/sales/
s3://data-products/marketing/
s3://data-products/product/

3. No Data Contracts

3. 无数据契约

// Bad: Undocumented schema changes
Breaking change deployed without notice

// Good: Versioned contracts with deprecation
v1: Deprecated (30 days notice)
v2: Current
v3: Beta
// 错误:无文档的 schema 变更
未通知即发布破坏性变更

// 正确:带弃用通知的版本化契约
v1: 已弃用(提前30天通知)
v2: 当前版本
v3: 测试版

4. Manual Governance

4. 手动治理

// Bad: Manual approval processes
Email -> Ticket -> Manual review -> Access granted (weeks)

// Good: Automated governance
Request -> Policy check -> Auto-approval (minutes)
// 错误:手动审批流程
邮件 -> 工单 -> 人工审核 -> 授权访问(耗时数周)

// 正确:自动化治理
申请 -> 政策校验 -> 自动审批(耗时数分钟)

Resources

参考资源