harvard-art-museum-etl-analytics

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Harvard Art Museum ETL Analytics Skill

哈佛艺术博物馆ETL分析技能

Skill by ara.so — Data Skills collection.
ara.so提供的技能 — 数据技能合集。

Overview

概述

The Harvard-Artifacts-Collection-Data-Engineering-Analytics-App is an end-to-end data engineering solution that demonstrates production-grade ETL pipelines using the Harvard Art Museums API. It extracts artifact metadata, transforms nested JSON into relational structures, loads data into SQL databases, and provides interactive analytics dashboards via Streamlit.
Architecture: API → ETL → SQL → Analytics → Visualization
Key Components:
  • API integration with Harvard Art Museums
  • ETL pipeline with pagination and rate limiting
  • Relational database design (MySQL/TiDB Cloud)
  • SQL analytics queries
  • Streamlit dashboard with Plotly visualizations
Harvard-Artifacts-Collection-Data-Engineering-Analytics-App是一个端到端的数据工程解决方案,展示了基于哈佛艺术博物馆API的生产级ETL管道。它提取文物元数据,将嵌套JSON转换为关系型结构,将数据加载到SQL数据库,并通过Streamlit提供交互式分析仪表盘。
架构: API → ETL → SQL → 分析 → 可视化
核心组件:
  • 与哈佛艺术博物馆的API集成
  • 带有分页和速率限制的ETL管道
  • 关系型数据库设计(MySQL/TiDB Cloud)
  • SQL分析查询
  • 带有Plotly可视化的Streamlit仪表盘

Installation

安装

bash
undefined
bash
undefined

Clone the repository

克隆仓库

git clone https://github.com/Manali0711/Harvard-Artifacts-Collection-Data-Engineering-Analytics-App.git cd Harvard-Artifacts-Collection-Data-Engineering-Analytics-App
git clone https://github.com/Manali0711/Harvard-Artifacts-Collection-Data-Engineering-Analytics-App.git cd Harvard-Artifacts-Collection-Data-Engineering-Analytics-App

Install dependencies

安装依赖

pip install -r requirements.txt

**Core Dependencies**:
```txt
streamlit
pandas
requests
mysql-connector-python
plotly
python-dotenv
pip install -r requirements.txt

**核心依赖**:
```txt
streamlit
pandas
requests
mysql-connector-python
plotly
python-dotenv

Configuration

配置

Environment Variables

环境变量

Create a
.env
file in the project root:
bash
undefined
在项目根目录创建
.env
文件:
bash
undefined

Harvard Art Museums API

哈佛艺术博物馆API

HARVARD_API_KEY=your_api_key_here
HARVARD_API_KEY=your_api_key_here

Database Configuration

数据库配置

DB_HOST=your_database_host DB_PORT=3306 DB_USER=your_db_username DB_PASSWORD=your_db_password DB_NAME=harvard_artifacts
undefined
DB_HOST=your_database_host DB_PORT=3306 DB_USER=your_db_username DB_PASSWORD=your_db_password DB_NAME=harvard_artifacts
undefined

Get Harvard API Key

获取哈佛API密钥

Visit Harvard Art Museums API to request a free API key.
访问Harvard Art Museums API申请免费API密钥。

Database Setup

数据库设置

sql
CREATE DATABASE harvard_artifacts;

CREATE TABLE artifactmetadata (
    id INT PRIMARY KEY,
    title VARCHAR(500),
    culture VARCHAR(255),
    century VARCHAR(100),
    classification VARCHAR(255),
    department VARCHAR(255),
    technique VARCHAR(500),
    period VARCHAR(255),
    dated VARCHAR(255),
    url TEXT,
    totalpageviews INT,
    totaluniquepageviews INT
);

CREATE TABLE artifactmedia (
    media_id INT AUTO_INCREMENT PRIMARY KEY,
    artifact_id INT,
    iiifbaseuri TEXT,
    baseimageurl TEXT,
    format VARCHAR(50),
    FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(id)
);

CREATE TABLE artifactcolors (
    color_id INT AUTO_INCREMENT PRIMARY KEY,
    artifact_id INT,
    color VARCHAR(50),
    spectrum VARCHAR(50),
    hue VARCHAR(50),
    percent FLOAT,
    FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(id)
);
sql
CREATE DATABASE harvard_artifacts;

CREATE TABLE artifactmetadata (
    id INT PRIMARY KEY,
    title VARCHAR(500),
    culture VARCHAR(255),
    century VARCHAR(100),
    classification VARCHAR(255),
    department VARCHAR(255),
    technique VARCHAR(500),
    period VARCHAR(255),
    dated VARCHAR(255),
    url TEXT,
    totalpageviews INT,
    totaluniquepageviews INT
);

CREATE TABLE artifactmedia (
    media_id INT AUTO_INCREMENT PRIMARY KEY,
    artifact_id INT,
    iiifbaseuri TEXT,
    baseimageurl TEXT,
    format VARCHAR(50),
    FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(id)
);

CREATE TABLE artifactcolors (
    color_id INT AUTO_INCREMENT PRIMARY KEY,
    artifact_id INT,
    color VARCHAR(50),
    spectrum VARCHAR(50),
    hue VARCHAR(50),
    percent FLOAT,
    FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(id)
);

Running the Application

运行应用

bash
undefined
bash
undefined

Start the Streamlit app

启动Streamlit应用

streamlit run app.py

The app will open at `http://localhost:8501`
streamlit run app.py

应用将在`http://localhost:8501`打开

ETL Pipeline Implementation

ETL管道实现

Extract: Fetch Data from API

提取:从API获取数据

python
import requests
import os
from dotenv import load_dotenv

load_dotenv()

def fetch_artifacts(page=1, size=100):
    """
    Fetch artifacts from Harvard Art Museums API with pagination
    """
    api_key = os.getenv('HARVARD_API_KEY')
    base_url = "https://api.harvardartmuseums.org/object"
    
    params = {
        'apikey': api_key,
        'page': page,
        'size': size,
        'hasimage': 1  # Only artifacts with images
    }
    
    response = requests.get(base_url, params=params)
    
    if response.status_code == 200:
        return response.json()
    else:
        raise Exception(f"API Error: {response.status_code}")

def fetch_all_artifacts(max_pages=10):
    """
    Fetch multiple pages with rate limiting
    """
    import time
    all_artifacts = []
    
    for page in range(1, max_pages + 1):
        print(f"Fetching page {page}...")
        data = fetch_artifacts(page=page, size=100)
        all_artifacts.extend(data['records'])
        
        # Rate limiting
        time.sleep(1)
        
        if not data.get('info', {}).get('next'):
            break
    
    return all_artifacts
python
import requests
import os
from dotenv import load_dotenv

load_dotenv()

def fetch_artifacts(page=1, size=100):
    """
    通过分页从哈佛艺术博物馆API获取文物数据
    """
    api_key = os.getenv('HARVARD_API_KEY')
    base_url = "https://api.harvardartmuseums.org/object"
    
    params = {
        'apikey': api_key,
        'page': page,
        'size': size,
        'hasimage': 1  # 仅获取带有图片的文物
    }
    
    response = requests.get(base_url, params=params)
    
    if response.status_code == 200:
        return response.json()
    else:
        raise Exception(f"API错误: {response.status_code}")

def fetch_all_artifacts(max_pages=10):
    """
    通过速率限制获取多页数据
    """
    import time
    all_artifacts = []
    
    for page in range(1, max_pages + 1):
        print(f"正在获取第{page}页...")
        data = fetch_artifacts(page=page, size=100)
        all_artifacts.extend(data['records'])
        
        # 速率限制
        time.sleep(1)
        
        if not data.get('info', {}).get('next'):
            break
    
    return all_artifacts

Transform: Process JSON to Relational Format

转换:将JSON处理为关系型格式

python
import pandas as pd

def transform_metadata(artifacts):
    """
    Transform artifact metadata into structured DataFrame
    """
    metadata = []
    
    for artifact in artifacts:
        metadata.append({
            'id': artifact.get('id'),
            'title': artifact.get('title', '')[:500],
            'culture': artifact.get('culture', ''),
            'century': artifact.get('century', ''),
            'classification': artifact.get('classification', ''),
            'department': artifact.get('department', ''),
            'technique': artifact.get('technique', '')[:500],
            'period': artifact.get('period', ''),
            'dated': artifact.get('dated', ''),
            'url': artifact.get('url', ''),
            'totalpageviews': artifact.get('totalpageviews', 0),
            'totaluniquepageviews': artifact.get('totaluniquepageviews', 0)
        })
    
    return pd.DataFrame(metadata)

def transform_media(artifacts):
    """
    Extract media/image information
    """
    media = []
    
    for artifact in artifacts:
        artifact_id = artifact.get('id')
        images = artifact.get('images', [])
        
        for img in images:
            media.append({
                'artifact_id': artifact_id,
                'iiifbaseuri': img.get('iiifbaseuri', ''),
                'baseimageurl': img.get('baseimageurl', ''),
                'format': img.get('format', '')
            })
    
    return pd.DataFrame(media)

def transform_colors(artifacts):
    """
    Extract color information
    """
    colors = []
    
    for artifact in artifacts:
        artifact_id = artifact.get('id')
        color_data = artifact.get('colors', [])
        
        for color in color_data:
            colors.append({
                'artifact_id': artifact_id,
                'color': color.get('color', ''),
                'spectrum': color.get('spectrum', ''),
                'hue': color.get('hue', ''),
                'percent': color.get('percent', 0.0)
            })
    
    return pd.DataFrame(colors)
python
import pandas as pd

def transform_metadata(artifacts):
    """
    将文物元数据转换为结构化DataFrame
    """
    metadata = []
    
    for artifact in artifacts:
        metadata.append({
            'id': artifact.get('id'),
            'title': artifact.get('title', '')[:500],
            'culture': artifact.get('culture', ''),
            'century': artifact.get('century', ''),
            'classification': artifact.get('classification', ''),
            'department': artifact.get('department', ''),
            'technique': artifact.get('technique', '')[:500],
            'period': artifact.get('period', ''),
            'dated': artifact.get('dated', ''),
            'url': artifact.get('url', ''),
            'totalpageviews': artifact.get('totalpageviews', 0),
            'totaluniquepageviews': artifact.get('totaluniquepageviews', 0)
        })
    
    return pd.DataFrame(metadata)

def transform_media(artifacts):
    """
    提取媒体/图片信息
    """
    media = []
    
    for artifact in artifacts:
        artifact_id = artifact.get('id')
        images = artifact.get('images', [])
        
        for img in images:
            media.append({
                'artifact_id': artifact_id,
                'iiifbaseuri': img.get('iiifbaseuri', ''),
                'baseimageurl': img.get('baseimageurl', ''),
                'format': img.get('format', '')
            })
    
    return pd.DataFrame(media)

def transform_colors(artifacts):
    """
    提取颜色信息
    """
    colors = []
    
    for artifact in artifacts:
        artifact_id = artifact.get('id')
        color_data = artifact.get('colors', [])
        
        for color in color_data:
            colors.append({
                'artifact_id': artifact_id,
                'color': color.get('color', ''),
                'spectrum': color.get('spectrum', ''),
                'hue': color.get('hue', ''),
                'percent': color.get('percent', 0.0)
            })
    
    return pd.DataFrame(colors)

Load: Insert into SQL Database

加载:插入到SQL数据库

python
import mysql.connector
from mysql.connector import Error

def get_db_connection():
    """
    Create database connection
    """
    return mysql.connector.connect(
        host=os.getenv('DB_HOST'),
        port=int(os.getenv('DB_PORT', 3306)),
        user=os.getenv('DB_USER'),
        password=os.getenv('DB_PASSWORD'),
        database=os.getenv('DB_NAME')
    )

def load_metadata(df_metadata):
    """
    Batch insert metadata using executemany for performance
    """
    conn = get_db_connection()
    cursor = conn.cursor()
    
    insert_query = """
    INSERT INTO artifactmetadata 
    (id, title, culture, century, classification, department, 
     technique, period, dated, url, totalpageviews, totaluniquepageviews)
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    ON DUPLICATE KEY UPDATE
    title=VALUES(title), culture=VALUES(culture)
    """
    
    data = df_metadata.values.tolist()
    cursor.executemany(insert_query, data)
    
    conn.commit()
    cursor.close()
    conn.close()
    
    print(f"Loaded {len(data)} metadata records")

def load_media(df_media):
    """
    Load media/image data
    """
    conn = get_db_connection()
    cursor = conn.cursor()
    
    insert_query = """
    INSERT INTO artifactmedia 
    (artifact_id, iiifbaseuri, baseimageurl, format)
    VALUES (%s, %s, %s, %s)
    """
    
    data = df_media.values.tolist()
    cursor.executemany(insert_query, data)
    
    conn.commit()
    cursor.close()
    conn.close()
    
    print(f"Loaded {len(data)} media records")

def load_colors(df_colors):
    """
    Load color data
    """
    conn = get_db_connection()
    cursor = conn.cursor()
    
    insert_query = """
    INSERT INTO artifactcolors 
    (artifact_id, color, spectrum, hue, percent)
    VALUES (%s, %s, %s, %s, %s)
    """
    
    data = df_colors.values.tolist()
    cursor.executemany(insert_query, data)
    
    conn.commit()
    cursor.close()
    conn.close()
    
    print(f"Loaded {len(data)} color records")
python
import mysql.connector
from mysql.connector import Error

def get_db_connection():
    """
    创建数据库连接
    """
    return mysql.connector.connect(
        host=os.getenv('DB_HOST'),
        port=int(os.getenv('DB_PORT', 3306)),
        user=os.getenv('DB_USER'),
        password=os.getenv('DB_PASSWORD'),
        database=os.getenv('DB_NAME')
    )

def load_metadata(df_metadata):
    """
    使用executemany批量插入元数据以提升性能
    """
    conn = get_db_connection()
    cursor = conn.cursor()
    
    insert_query = """
    INSERT INTO artifactmetadata 
    (id, title, culture, century, classification, department, 
     technique, period, dated, url, totalpageviews, totaluniquepageviews)
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    ON DUPLICATE KEY UPDATE
    title=VALUES(title), culture=VALUES(culture)
    """
    
    data = df_metadata.values.tolist()
    cursor.executemany(insert_query, data)
    
    conn.commit()
    cursor.close()
    conn.close()
    
    print(f"已加载{len(data)}条元数据记录")

def load_media(df_media):
    """
    加载媒体/图片数据
    """
    conn = get_db_connection()
    cursor = conn.cursor()
    
    insert_query = """
    INSERT INTO artifactmedia 
    (artifact_id, iiifbaseuri, baseimageurl, format)
    VALUES (%s, %s, %s, %s)
    """
    
    data = df_media.values.tolist()
    cursor.executemany(insert_query, data)
    
    conn.commit()
    cursor.close()
    conn.close()
    
    print(f"已加载{len(data)}条媒体记录")

def load_colors(df_colors):
    """
    加载颜色数据
    """
    conn = get_db_connection()
    cursor = conn.cursor()
    
    insert_query = """
    INSERT INTO artifactcolors 
    (artifact_id, color, spectrum, hue, percent)
    VALUES (%s, %s, %s, %s, %s)
    """
    
    data = df_colors.values.tolist()
    cursor.executemany(insert_query, data)
    
    conn.commit()
    cursor.close()
    conn.close()
    
    print(f"已加载{len(data)}条颜色记录")

Complete ETL Script

完整ETL脚本

python
undefined
python
undefined

etl_pipeline.py

etl_pipeline.py

import os from dotenv import load_dotenv
load_dotenv()
def run_etl_pipeline(max_pages=5): """ Execute complete ETL pipeline """ print("Starting ETL Pipeline...")
# Extract
print("Step 1: Extracting data from API...")
artifacts = fetch_all_artifacts(max_pages=max_pages)
print(f"Extracted {len(artifacts)} artifacts")

# Transform
print("Step 2: Transforming data...")
df_metadata = transform_metadata(artifacts)
df_media = transform_media(artifacts)
df_colors = transform_colors(artifacts)

# Load
print("Step 3: Loading data to database...")
load_metadata(df_metadata)
load_media(df_media)
load_colors(df_colors)

print("ETL Pipeline completed successfully!")
if name == "main": run_etl_pipeline(max_pages=10)
undefined
import os from dotenv import load_dotenv
load_dotenv()
def run_etl_pipeline(max_pages=5): """ 执行完整ETL管道 """ print("正在启动ETL管道...")
# 提取
print("步骤1:从API提取数据...")
artifacts = fetch_all_artifacts(max_pages=max_pages)
print(f"已提取{len(artifacts)}件文物数据")

# 转换
print("步骤2:转换数据...")
df_metadata = transform_metadata(artifacts)
df_media = transform_media(artifacts)
df_colors = transform_colors(artifacts)

# 加载
print("步骤3:将数据加载到数据库...")
load_metadata(df_metadata)
load_media(df_media)
load_colors(df_colors)

print("ETL管道执行成功!")
if name == "main": run_etl_pipeline(max_pages=10)
undefined

Streamlit Dashboard Implementation

Streamlit仪表盘实现

python
undefined
python
undefined

app.py

app.py

import streamlit as st import pandas as pd import plotly.express as px import mysql.connector import os from dotenv import load_dotenv
load_dotenv()
st.set_page_config( page_title="Harvard Artifacts Analytics", page_icon="🏛️", layout="wide" )
st.title("🏛️ Harvard Art Museums Analytics Dashboard")
def execute_query(query): """ Execute SQL query and return results as DataFrame """ conn = mysql.connector.connect( host=os.getenv('DB_HOST'), port=int(os.getenv('DB_PORT', 3306)), user=os.getenv('DB_USER'), password=os.getenv('DB_PASSWORD'), database=os.getenv('DB_NAME') )
df = pd.read_sql(query, conn)
conn.close()
return df
import streamlit as st import pandas as pd import plotly.express as px import mysql.connector import os from dotenv import load_dotenv
load_dotenv()
st.set_page_config( page_title="哈佛文物分析", page_icon="🏛️", layout="wide" )
st.title("🏛️ 哈佛艺术博物馆分析仪表盘")
def execute_query(query): """ 执行SQL查询并将结果以DataFrame返回 """ conn = mysql.connector.connect( host=os.getenv('DB_HOST'), port=int(os.getenv('DB_PORT', 3306)), user=os.getenv('DB_USER'), password=os.getenv('DB_PASSWORD'), database=os.getenv('DB_NAME') )
df = pd.read_sql(query, conn)
conn.close()
return df

Sidebar with query selection

侧边栏查询选择

st.sidebar.header("Analytics Queries")
queries = { "Artifacts by Culture": """ SELECT culture, COUNT() as count FROM artifactmetadata WHERE culture IS NOT NULL AND culture != '' GROUP BY culture ORDER BY count DESC LIMIT 20 """, "Artifacts by Century": """ SELECT century, COUNT() as count FROM artifactmetadata WHERE century IS NOT NULL AND century != '' GROUP BY century ORDER BY count DESC """, "Top Departments": """ SELECT department, COUNT() as artifact_count FROM artifactmetadata WHERE department IS NOT NULL GROUP BY department ORDER BY artifact_count DESC """, "Most Viewed Artifacts": """ SELECT title, culture, totalpageviews FROM artifactmetadata WHERE totalpageviews > 0 ORDER BY totalpageviews DESC LIMIT 15 """, "Color Distribution": """ SELECT color, COUNT() as frequency FROM artifactcolors GROUP BY color ORDER BY frequency DESC LIMIT 10 """, "Media Format Analysis": """ SELECT format, COUNT(*) as count FROM artifactmedia WHERE format IS NOT NULL GROUP BY format ORDER BY count DESC """ }
selected_query = st.sidebar.selectbox("Select Analysis", list(queries.keys()))
st.sidebar.header("分析查询")
queries = { "按文化分类的文物": """ SELECT culture, COUNT() as count FROM artifactmetadata WHERE culture IS NOT NULL AND culture != '' GROUP BY culture ORDER BY count DESC LIMIT 20 """, "按世纪分类的文物": """ SELECT century, COUNT() as count FROM artifactmetadata WHERE century IS NOT NULL AND century != '' GROUP BY century ORDER BY count DESC """, "热门部门": """ SELECT department, COUNT() as artifact_count FROM artifactmetadata WHERE department IS NOT NULL GROUP BY department ORDER BY artifact_count DESC """, "浏览量最高的文物": """ SELECT title, culture, totalpageviews FROM artifactmetadata WHERE totalpageviews > 0 ORDER BY totalpageviews DESC LIMIT 15 """, "颜色分布": """ SELECT color, COUNT() as frequency FROM artifactcolors GROUP BY color ORDER BY frequency DESC LIMIT 10 """, "媒体格式分析": """ SELECT format, COUNT(*) as count FROM artifactmedia WHERE format IS NOT NULL GROUP BY format ORDER BY count DESC """ }
selected_query = st.sidebar.selectbox("选择分析类型", list(queries.keys()))

Execute and display results

执行并展示结果

if st.sidebar.button("Run Analysis"): with st.spinner("Running query..."): df = execute_query(queries[selected_query])
    col1, col2 = st.columns([1, 2])
    
    with col1:
        st.subheader("Query Results")
        st.dataframe(df, use_container_width=True)
    
    with col2:
        st.subheader("Visualization")
        if len(df) > 0:
            x_col = df.columns[0]
            y_col = df.columns[1]
            
            fig = px.bar(
                df,
                x=x_col,
                y=y_col,
                title=selected_query,
                labels={x_col: x_col.title(), y_col: y_col.title()}
            )
            st.plotly_chart(fig, use_container_width=True)
if st.sidebar.button("运行分析"): with st.spinner("正在执行查询..."): df = execute_query(queries[selected_query])
    col1, col2 = st.columns([1, 2])
    
    with col1:
        st.subheader("查询结果")
        st.dataframe(df, use_container_width=True)
    
    with col2:
        st.subheader("可视化图表")
        if len(df) > 0:
            x_col = df.columns[0]
            y_col = df.columns[1]
            
            fig = px.bar(
                df,
                x=x_col,
                y=y_col,
                title=selected_query,
                labels={x_col: x_col.title(), y_col: y_col.title()}
            )
            st.plotly_chart(fig, use_container_width=True)

Summary statistics

汇总统计

st.header("📊 Collection Summary")
col1, col2, col3 = st.columns(3)
with col1: total_artifacts = execute_query("SELECT COUNT(*) as count FROM artifactmetadata") st.metric("Total Artifacts", total_artifacts['count'].iloc[0])
with col2: total_images = execute_query("SELECT COUNT(*) as count FROM artifactmedia") st.metric("Total Images", total_images['count'].iloc[0])
with col3: unique_cultures = execute_query("SELECT COUNT(DISTINCT culture) as count FROM artifactmetadata WHERE culture IS NOT NULL") st.metric("Unique Cultures", unique_cultures['count'].iloc[0])
undefined
st.header("📊 藏品汇总")
col1, col2, col3 = st.columns(3)
with col1: total_artifacts = execute_query("SELECT COUNT(*) as count FROM artifactmetadata") st.metric("文物总数", total_artifacts['count'].iloc[0])
with col2: total_images = execute_query("SELECT COUNT(*) as count FROM artifactmedia") st.metric("图片总数", total_images['count'].iloc[0])
with col3: unique_cultures = execute_query("SELECT COUNT(DISTINCT culture) as count FROM artifactmetadata WHERE culture IS NOT NULL") st.metric("独特文化数量", unique_cultures['count'].iloc[0])
undefined

Common Analytical Queries

常见分析查询

sql
-- Artifacts without images
SELECT COUNT(*) 
FROM artifactmetadata m
LEFT JOIN artifactmedia media ON m.id = media.artifact_id
WHERE media.artifact_id IS NULL;

-- Most common color per culture
SELECT m.culture, c.color, COUNT(*) as frequency
FROM artifactmetadata m
JOIN artifactcolors c ON m.id = c.artifact_id
WHERE m.culture IS NOT NULL
GROUP BY m.culture, c.color
ORDER BY m.culture, frequency DESC;

-- Classification distribution by department
SELECT department, classification, COUNT(*) as count
FROM artifactmetadata
WHERE department IS NOT NULL AND classification IS NOT NULL
GROUP BY department, classification
ORDER BY department, count DESC;

-- Average pageviews by century
SELECT century, AVG(totalpageviews) as avg_views
FROM artifactmetadata
WHERE century IS NOT NULL AND totalpageviews > 0
GROUP BY century
ORDER BY avg_views DESC;
sql
-- 无图片的文物数量
SELECT COUNT(*) 
FROM artifactmetadata m
LEFT JOIN artifactmedia media ON m.id = media.artifact_id
WHERE media.artifact_id IS NULL;

-- 各文化最常见的颜色
SELECT m.culture, c.color, COUNT(*) as frequency
FROM artifactmetadata m
JOIN artifactcolors c ON m.id = c.artifact_id
WHERE m.culture IS NOT NULL
GROUP BY m.culture, c.color
ORDER BY m.culture, frequency DESC;

-- 各部门的分类分布
SELECT department, classification, COUNT(*) as count
FROM artifactmetadata
WHERE department IS NOT NULL AND classification IS NOT NULL
GROUP BY department, classification
ORDER BY department, count DESC;

-- 各世纪平均浏览量
SELECT century, AVG(totalpageviews) as avg_views
FROM artifactmetadata
WHERE century IS NOT NULL AND totalpageviews > 0
GROUP BY century
ORDER BY avg_views DESC;

Troubleshooting

故障排除

API Rate Limiting

API速率限制

python
import time
from functools import wraps

def rate_limit(calls_per_second=1):
    """
    Decorator to rate limit API calls
    """
    min_interval = 1.0 / calls_per_second
    
    def decorator(func):
        last_called = [0.0]
        
        @wraps(func)
        def wrapper(*args, **kwargs):
            elapsed = time.time() - last_called[0]
            wait_time = min_interval - elapsed
            
            if wait_time > 0:
                time.sleep(wait_time)
            
            result = func(*args, **kwargs)
            last_called[0] = time.time()
            return result
        
        return wrapper
    return decorator

@rate_limit(calls_per_second=1)
def fetch_artifacts_with_limit(page=1):
    return fetch_artifacts(page=page)
python
import time
from functools import wraps

def rate_limit(calls_per_second=1):
    """
    用于限制API调用速率的装饰器
    """
    min_interval = 1.0 / calls_per_second
    
    def decorator(func):
        last_called = [0.0]
        
        @wraps(func)
        def wrapper(*args, **kwargs):
            elapsed = time.time() - last_called[0]
            wait_time = min_interval - elapsed
            
            if wait_time > 0:
                time.sleep(wait_time)
            
            result = func(*args, **kwargs)
            last_called[0] = time.time()
            return result
        
        return wrapper
    return decorator

@rate_limit(calls_per_second=1)
def fetch_artifacts_with_limit(page=1):
    return fetch_artifacts(page=page)

Database Connection Pooling

数据库连接池

python
from mysql.connector import pooling

db_pool = pooling.MySQLConnectionPool(
    pool_name="harvard_pool",
    pool_size=5,
    host=os.getenv('DB_HOST'),
    port=int(os.getenv('DB_PORT', 3306)),
    user=os.getenv('DB_USER'),
    password=os.getenv('DB_PASSWORD'),
    database=os.getenv('DB_NAME')
)

def get_pooled_connection():
    return db_pool.get_connection()
python
from mysql.connector import pooling

db_pool = pooling.MySQLConnectionPool(
    pool_name="harvard_pool",
    pool_size=5,
    host=os.getenv('DB_HOST'),
    port=int(os.getenv('DB_PORT', 3306)),
    user=os.getenv('DB_USER'),
    password=os.getenv('DB_PASSWORD'),
    database=os.getenv('DB_NAME')
)

def get_pooled_connection():
    return db_pool.get_connection()

Handling Large Datasets

处理大型数据集

python
def batch_load_data(df, table_name, batch_size=1000):
    """
    Load data in batches to avoid memory issues
    """
    conn = get_db_connection()
    
    for i in range(0, len(df), batch_size):
        batch = df.iloc[i:i+batch_size]
        batch.to_sql(
            table_name,
            conn,
            if_exists='append',
            index=False,
            method='multi'
        )
        print(f"Loaded batch {i//batch_size + 1}")
    
    conn.close()
python
def batch_load_data(df, table_name, batch_size=1000):
    """
    分批加载数据以避免内存问题
    """
    conn = get_db_connection()
    
    for i in range(0, len(df), batch_size):
        batch = df.iloc[i:i+batch_size]
        batch.to_sql(
            table_name,
            conn,
            if_exists='append',
            index=False,
            method='multi'
        )
        print(f"已加载第{i//batch_size + 1}批数据")
    
    conn.close()

Best Practices

最佳实践

  1. Always use environment variables for sensitive credentials
  2. Implement retry logic for API calls to handle transient failures
  3. Use batch inserts (
    executemany
    ) for better database performance
  4. Add error logging to track ETL pipeline failures
  5. Validate data before loading into database
  6. Create indexes on frequently queried columns (culture, century, department)
  7. Schedule ETL jobs using cron or Apache Airflow for production environments
  1. 始终使用环境变量存储敏感凭证
  2. 实现重试逻辑以处理API调用的临时故障
  3. 使用批量插入
    executemany
    )提升数据库性能
  4. 添加错误日志以跟踪ETL管道的故障
  5. 在加载到数据库前验证数据
  6. 在频繁查询的列上创建索引(文化、世纪、部门)
  7. 在生产环境中使用cron或Apache Airflow调度ETL任务