harvard-art-museum-data-pipeline

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Harvard Art Museum Data Pipeline Skill

哈佛艺术博物馆数据管道Skill

Skill by ara.so — Data Skills collection.
ara.so提供的Skill — 数据技能合集。

Overview

概述

The Harvard Artifacts Collection Data Engineering Analytics App is an end-to-end data pipeline that extracts artifact data from the Harvard Art Museums API, transforms it into relational structures, loads it into SQL databases (MySQL/TiDB Cloud), and provides interactive analytics through a Streamlit dashboard.
Architecture Flow: API → ETL → SQL → Analytics → Visualization
哈佛文物收藏数据工程分析应用是一个端到端的数据管道,它从哈佛艺术博物馆API提取文物数据,将其转换为关系型结构,加载到SQL数据库(MySQL/TiDB Cloud)中,并通过Streamlit仪表盘提供交互式分析功能。
架构流程: API → ETL → SQL → 分析 → 可视化

Installation

安装

bash
undefined
bash
undefined

Clone the repository

Clone the repository

git clone https://github.com/Manali0711/Harvard-Artifacts-Collection-Data-Engineering-Analytics-App.git cd Harvard-Artifacts-Collection-Data-Engineering-Analytics-App
git clone https://github.com/Manali0711/Harvard-Artifacts-Collection-Data-Engineering-Analytics-App.git cd Harvard-Artifacts-Collection-Data-Engineering-Analytics-App

Install dependencies

Install dependencies

pip install -r requirements.txt

**Core Dependencies:**
streamlit pandas requests mysql-connector-python plotly python-dotenv
undefined
pip install -r requirements.txt

**核心依赖项:**
streamlit pandas requests mysql-connector-python plotly python-dotenv
undefined

Configuration

配置

API Key Setup

API密钥设置

Get your API key from Harvard Art Museums API.
Create a
.env
file or configure environment variables:
bash
HARVARD_API_KEY=your_api_key_here
DB_HOST=your_database_host
DB_USER=your_db_user
DB_PASSWORD=your_db_password
DB_NAME=harvard_artifacts
哈佛艺术博物馆API获取你的API密钥。
创建
.env
文件或配置环境变量:
bash
HARVARD_API_KEY=your_api_key_here
DB_HOST=your_database_host
DB_USER=your_db_user
DB_PASSWORD=your_db_password
DB_NAME=harvard_artifacts

Database Schema

数据库Schema

The pipeline creates three main tables:
sql
-- Artifact Metadata
CREATE TABLE artifactmetadata (
    id INT PRIMARY KEY,
    title VARCHAR(500),
    culture VARCHAR(255),
    century VARCHAR(100),
    classification VARCHAR(255),
    division VARCHAR(255),
    department VARCHAR(255),
    dated VARCHAR(255),
    medium VARCHAR(500),
    technique VARCHAR(500),
    dimensiontext TEXT,
    provenance TEXT,
    creditline TEXT,
    accessionyear INT
);

-- Artifact Media
CREATE TABLE artifactmedia (
    media_id INT AUTO_INCREMENT PRIMARY KEY,
    artifact_id INT,
    imageurl VARCHAR(1000),
    iiifbaseuri VARCHAR(1000),
    FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(id)
);

-- Artifact Colors
CREATE TABLE artifactcolors (
    color_id INT AUTO_INCREMENT PRIMARY KEY,
    artifact_id INT,
    color VARCHAR(50),
    spectrum VARCHAR(50),
    percent DECIMAL(5,2),
    FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(id)
);
该管道会创建三个主要表:
sql
-- Artifact Metadata
CREATE TABLE artifactmetadata (
    id INT PRIMARY KEY,
    title VARCHAR(500),
    culture VARCHAR(255),
    century VARCHAR(100),
    classification VARCHAR(255),
    division VARCHAR(255),
    department VARCHAR(255),
    dated VARCHAR(255),
    medium VARCHAR(500),
    technique VARCHAR(500),
    dimensiontext TEXT,
    provenance TEXT,
    creditline TEXT,
    accessionyear INT
);

-- Artifact Media
CREATE TABLE artifactmedia (
    media_id INT AUTO_INCREMENT PRIMARY KEY,
    artifact_id INT,
    imageurl VARCHAR(1000),
    iiifbaseuri VARCHAR(1000),
    FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(id)
);

-- Artifact Colors
CREATE TABLE artifactcolors (
    color_id INT AUTO_INCREMENT PRIMARY KEY,
    artifact_id INT,
    color VARCHAR(50),
    spectrum VARCHAR(50),
    percent DECIMAL(5,2),
    FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(id)
);

Running the Application

运行应用

bash
undefined
bash
undefined

Start Streamlit app

Start Streamlit app

streamlit run app.py

The application will be available at `http://localhost:8501`
streamlit run app.py

应用将在`http://localhost:8501`地址可用

Core Components

核心组件

1. API Data Extraction

1. API数据提取

python
import requests
import os
from dotenv import load_dotenv

load_dotenv()

def fetch_artifacts(api_key, size=100, page=1):
    """Fetch artifacts from Harvard Art Museums API with pagination"""
    base_url = "https://api.harvardartmuseums.org/object"
    
    params = {
        'apikey': api_key,
        'size': size,
        'page': page,
        'hasimage': 1  # Only artifacts with images
    }
    
    response = requests.get(base_url, params=params)
    response.raise_for_status()
    
    return response.json()
python
import requests
import os
from dotenv import load_dotenv

load_dotenv()

def fetch_artifacts(api_key, size=100, page=1):
    """Fetch artifacts from Harvard Art Museums API with pagination"""
    base_url = "https://api.harvardartmuseums.org/object"
    
    params = {
        'apikey': api_key,
        'size': size,
        'page': page,
        'hasimage': 1  # Only artifacts with images
    }
    
    response = requests.get(base_url, params=params)
    response.raise_for_status()
    
    return response.json()

Usage

Usage

api_key = os.getenv('HARVARD_API_KEY') data = fetch_artifacts(api_key, size=100, page=1)
total_records = data['info']['totalrecords'] artifacts = data['records']
undefined
api_key = os.getenv('HARVARD_API_KEY') data = fetch_artifacts(api_key, size=100, page=1)
total_records = data['info']['totalrecords'] artifacts = data['records']
undefined

2. ETL Pipeline

2. ETL管道

python
import pandas as pd
import mysql.connector
from mysql.connector import Error

def extract_artifact_metadata(artifact):
    """Extract metadata from artifact JSON"""
    return {
        'id': artifact.get('id'),
        'title': artifact.get('title', '')[:500],
        'culture': artifact.get('culture', '')[:255],
        'century': artifact.get('century', '')[:100],
        'classification': artifact.get('classification', '')[:255],
        'division': artifact.get('division', '')[:255],
        'department': artifact.get('department', '')[:255],
        'dated': artifact.get('dated', '')[:255],
        'medium': artifact.get('medium', '')[:500],
        'technique': artifact.get('technique', '')[:500],
        'dimensiontext': artifact.get('dimensions'),
        'provenance': artifact.get('provenance'),
        'creditline': artifact.get('creditline'),
        'accessionyear': artifact.get('accessionyear')
    }

def extract_artifact_media(artifact):
    """Extract media/image data from artifact"""
    media_list = []
    artifact_id = artifact.get('id')
    
    images = artifact.get('images', [])
    for img in images:
        media_list.append({
            'artifact_id': artifact_id,
            'imageurl': img.get('baseimageurl'),
            'iiifbaseuri': img.get('iiifbaseuri')
        })
    
    return media_list

def extract_artifact_colors(artifact):
    """Extract color data from artifact"""
    colors_list = []
    artifact_id = artifact.get('id')
    
    colors = artifact.get('colors', [])
    for color in colors:
        colors_list.append({
            'artifact_id': artifact_id,
            'color': color.get('color'),
            'spectrum': color.get('spectrum'),
            'percent': color.get('percent')
        })
    
    return colors_list

def transform_artifacts(artifacts):
    """Transform raw artifact data into structured dataframes"""
    metadata_list = []
    media_list = []
    colors_list = []
    
    for artifact in artifacts:
        metadata_list.append(extract_artifact_metadata(artifact))
        media_list.extend(extract_artifact_media(artifact))
        colors_list.extend(extract_artifact_colors(artifact))
    
    df_metadata = pd.DataFrame(metadata_list)
    df_media = pd.DataFrame(media_list)
    df_colors = pd.DataFrame(colors_list)
    
    return df_metadata, df_media, df_colors
python
import pandas as pd
import mysql.connector
from mysql.connector import Error

def extract_artifact_metadata(artifact):
    """Extract metadata from artifact JSON"""
    return {
        'id': artifact.get('id'),
        'title': artifact.get('title', '')[:500],
        'culture': artifact.get('culture', '')[:255],
        'century': artifact.get('century', '')[:100],
        'classification': artifact.get('classification', '')[:255],
        'division': artifact.get('division', '')[:255],
        'department': artifact.get('department', '')[:255],
        'dated': artifact.get('dated', '')[:255],
        'medium': artifact.get('medium', '')[:500],
        'technique': artifact.get('technique', '')[:500],
        'dimensiontext': artifact.get('dimensions'),
        'provenance': artifact.get('provenance'),
        'creditline': artifact.get('creditline'),
        'accessionyear': artifact.get('accessionyear')
    }

def extract_artifact_media(artifact):
    """Extract media/image data from artifact"""
    media_list = []
    artifact_id = artifact.get('id')
    
    images = artifact.get('images', [])
    for img in images:
        media_list.append({
            'artifact_id': artifact_id,
            'imageurl': img.get('baseimageurl'),
            'iiifbaseuri': img.get('iiifbaseuri')
        })
    
    return media_list

def extract_artifact_colors(artifact):
    """Extract color data from artifact"""
    colors_list = []
    artifact_id = artifact.get('id')
    
    colors = artifact.get('colors', [])
    for color in colors:
        colors_list.append({
            'artifact_id': artifact_id,
            'color': color.get('color'),
            'spectrum': color.get('spectrum'),
            'percent': color.get('percent')
        })
    
    return colors_list

def transform_artifacts(artifacts):
    """Transform raw artifact data into structured dataframes"""
    metadata_list = []
    media_list = []
    colors_list = []
    
    for artifact in artifacts:
        metadata_list.append(extract_artifact_metadata(artifact))
        media_list.extend(extract_artifact_media(artifact))
        colors_list.extend(extract_artifact_colors(artifact))
    
    df_metadata = pd.DataFrame(metadata_list)
    df_media = pd.DataFrame(media_list)
    df_colors = pd.DataFrame(colors_list)
    
    return df_metadata, df_media, df_colors

3. Database Loading

3. 数据库加载

python
def get_db_connection():
    """Create database connection"""
    try:
        connection = mysql.connector.connect(
            host=os.getenv('DB_HOST'),
            user=os.getenv('DB_USER'),
            password=os.getenv('DB_PASSWORD'),
            database=os.getenv('DB_NAME')
        )
        return connection
    except Error as e:
        print(f"Error connecting to database: {e}")
        return None

def load_metadata_to_db(df_metadata, connection):
    """Batch insert artifact metadata"""
    cursor = connection.cursor()
    
    insert_query = """
    INSERT INTO artifactmetadata 
    (id, title, culture, century, classification, division, department, 
     dated, medium, technique, dimensiontext, provenance, creditline, accessionyear)
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    ON DUPLICATE KEY UPDATE
    title=VALUES(title), culture=VALUES(culture), century=VALUES(century)
    """
    
    records = df_metadata.to_records(index=False)
    data = [tuple(record) for record in records]
    
    cursor.executemany(insert_query, data)
    connection.commit()
    cursor.close()

def load_media_to_db(df_media, connection):
    """Batch insert artifact media"""
    cursor = connection.cursor()
    
    insert_query = """
    INSERT INTO artifactmedia (artifact_id, imageurl, iiifbaseuri)
    VALUES (%s, %s, %s)
    """
    
    records = df_media.to_records(index=False)
    data = [tuple(record) for record in records]
    
    cursor.executemany(insert_query, data)
    connection.commit()
    cursor.close()

def load_colors_to_db(df_colors, connection):
    """Batch insert artifact colors"""
    cursor = connection.cursor()
    
    insert_query = """
    INSERT INTO artifactcolors (artifact_id, color, spectrum, percent)
    VALUES (%s, %s, %s, %s)
    """
    
    records = df_colors.to_records(index=False)
    data = [tuple(record) for record in records]
    
    cursor.executemany(insert_query, data)
    connection.commit()
    cursor.close()
python
def get_db_connection():
    """Create database connection"""
    try:
        connection = mysql.connector.connect(
            host=os.getenv('DB_HOST'),
            user=os.getenv('DB_USER'),
            password=os.getenv('DB_PASSWORD'),
            database=os.getenv('DB_NAME')
        )
        return connection
    except Error as e:
        print(f"Error connecting to database: {e}")
        return None

def load_metadata_to_db(df_metadata, connection):
    """Batch insert artifact metadata"""
    cursor = connection.cursor()
    
    insert_query = """
    INSERT INTO artifactmetadata 
    (id, title, culture, century, classification, division, department, 
     dated, medium, technique, dimensiontext, provenance, creditline, accessionyear)
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    ON DUPLICATE KEY UPDATE
    title=VALUES(title), culture=VALUES(culture), century=VALUES(century)
    """
    
    records = df_metadata.to_records(index=False)
    data = [tuple(record) for record in records]
    
    cursor.executemany(insert_query, data)
    connection.commit()
    cursor.close()

def load_media_to_db(df_media, connection):
    """Batch insert artifact media"""
    cursor = connection.cursor()
    
    insert_query = """
    INSERT INTO artifactmedia (artifact_id, imageurl, iiifbaseuri)
    VALUES (%s, %s, %s)
    """
    
    records = df_media.to_records(index=False)
    data = [tuple(record) for record in records]
    
    cursor.executemany(insert_query, data)
    connection.commit()
    cursor.close()

def load_colors_to_db(df_colors, connection):
    """Batch insert artifact colors"""
    cursor = connection.cursor()
    
    insert_query = """
    INSERT INTO artifactcolors (artifact_id, color, spectrum, percent)
    VALUES (%s, %s, %s, %s)
    """
    
    records = df_colors.to_records(index=False)
    data = [tuple(record) for record in records]
    
    cursor.executemany(insert_query, data)
    connection.commit()
    cursor.close()

4. Streamlit Analytics Dashboard

4. Streamlit分析仪表盘

python
import streamlit as st
import plotly.express as px

def run_analytics_query(query, connection):
    """Execute SQL query and return results as DataFrame"""
    return pd.read_sql(query, connection)
python
import streamlit as st
import plotly.express as px

def run_analytics_query(query, connection):
    """Execute SQL query and return results as DataFrame"""
    return pd.read_sql(query, connection)

Streamlit UI

Streamlit UI

st.title("Harvard Art Museum Analytics Dashboard")
st.title("Harvard Art Museum Analytics Dashboard")

Sidebar for query selection

Sidebar for query selection

queries = { "Artifacts by Culture": """ SELECT culture, COUNT() as count FROM artifactmetadata WHERE culture IS NOT NULL GROUP BY culture ORDER BY count DESC LIMIT 20 """, "Artifacts by Century": """ SELECT century, COUNT() as count FROM artifactmetadata WHERE century IS NOT NULL GROUP BY century ORDER BY count DESC """, "Department Distribution": """ SELECT department, COUNT() as count FROM artifactmetadata WHERE department IS NOT NULL GROUP BY department ORDER BY count DESC """, "Top Colors": """ SELECT color, COUNT() as frequency, AVG(percent) as avg_percent FROM artifactcolors WHERE color IS NOT NULL GROUP BY color ORDER BY frequency DESC LIMIT 15 """, "Artifacts with Images": """ SELECT CASE WHEN m.artifact_id IS NOT NULL THEN 'With Images' ELSE 'Without Images' END as status, COUNT(*) as count FROM artifactmetadata a LEFT JOIN artifactmedia m ON a.id = m.artifact_id GROUP BY status """ }
selected_query = st.sidebar.selectbox("Select Analysis", list(queries.keys()))
queries = { "Artifacts by Culture": """ SELECT culture, COUNT() as count FROM artifactmetadata WHERE culture IS NOT NULL GROUP BY culture ORDER BY count DESC LIMIT 20 """, "Artifacts by Century": """ SELECT century, COUNT() as count FROM artifactmetadata WHERE century IS NOT NULL GROUP BY century ORDER BY count DESC """, "Department Distribution": """ SELECT department, COUNT() as count FROM artifactmetadata WHERE department IS NOT NULL GROUP BY department ORDER BY count DESC """, "Top Colors": """ SELECT color, COUNT() as frequency, AVG(percent) as avg_percent FROM artifactcolors WHERE color IS NOT NULL GROUP BY color ORDER BY frequency DESC LIMIT 15 """, "Artifacts with Images": """ SELECT CASE WHEN m.artifact_id IS NOT NULL THEN 'With Images' ELSE 'Without Images' END as status, COUNT(*) as count FROM artifactmetadata a LEFT JOIN artifactmedia m ON a.id = m.artifact_id GROUP BY status """ }
selected_query = st.sidebar.selectbox("Select Analysis", list(queries.keys()))

Execute query

Execute query

if st.button("Run Analysis"): connection = get_db_connection()
if connection:
    df_result = run_analytics_query(queries[selected_query], connection)
    
    st.subheader(f"Results: {selected_query}")
    st.dataframe(df_result)
    
    # Visualization
    if len(df_result.columns) >= 2:
        fig = px.bar(
            df_result, 
            x=df_result.columns[0], 
            y=df_result.columns[1],
            title=selected_query
        )
        st.plotly_chart(fig)
    
    connection.close()
undefined
if st.button("Run Analysis"): connection = get_db_connection()
if connection:
    df_result = run_analytics_query(queries[selected_query], connection)
    
    st.subheader(f"Results: {selected_query}")
    st.dataframe(df_result)
    
    # Visualization
    if len(df_result.columns) >= 2:
        fig = px.bar(
            df_result, 
            x=df_result.columns[0], 
            y=df_result.columns[1],
            title=selected_query
        )
        st.plotly_chart(fig)
    
    connection.close()
undefined

Common Patterns

常见模式

Full ETL Pipeline Execution

完整ETL管道执行

python
def run_full_etl_pipeline(num_pages=5):
    """Execute complete ETL pipeline"""
    api_key = os.getenv('HARVARD_API_KEY')
    connection = get_db_connection()
    
    if not connection:
        print("Failed to connect to database")
        return
    
    for page in range(1, num_pages + 1):
        print(f"Processing page {page}...")
        
        # Extract
        data = fetch_artifacts(api_key, size=100, page=page)
        artifacts = data['records']
        
        # Transform
        df_metadata, df_media, df_colors = transform_artifacts(artifacts)
        
        # Load
        load_metadata_to_db(df_metadata, connection)
        load_media_to_db(df_media, connection)
        load_colors_to_db(df_colors, connection)
        
        print(f"Loaded {len(artifacts)} artifacts from page {page}")
    
    connection.close()
    print("ETL pipeline completed successfully")
python
def run_full_etl_pipeline(num_pages=5):
    """Execute complete ETL pipeline"""
    api_key = os.getenv('HARVARD_API_KEY')
    connection = get_db_connection()
    
    if not connection:
        print("Failed to connect to database")
        return
    
    for page in range(1, num_pages + 1):
        print(f"Processing page {page}...")
        
        # Extract
        data = fetch_artifacts(api_key, size=100, page=page)
        artifacts = data['records']
        
        # Transform
        df_metadata, df_media, df_colors = transform_artifacts(artifacts)
        
        # Load
        load_metadata_to_db(df_metadata, connection)
        load_media_to_db(df_media, connection)
        load_colors_to_db(df_colors, connection)
        
        print(f"Loaded {len(artifacts)} artifacts from page {page}")
    
    connection.close()
    print("ETL pipeline completed successfully")

Execute

Execute

run_full_etl_pipeline(num_pages=10)
undefined
run_full_etl_pipeline(num_pages=10)
undefined

Rate Limiting for API Calls

API调用速率限制

python
import time

def fetch_with_rate_limit(api_key, total_pages, delay=1):
    """Fetch data with rate limiting"""
    all_artifacts = []
    
    for page in range(1, total_pages + 1):
        data = fetch_artifacts(api_key, size=100, page=page)
        all_artifacts.extend(data['records'])
        
        time.sleep(delay)  # Respect API rate limits
        
        if page % 10 == 0:
            print(f"Fetched {len(all_artifacts)} artifacts so far...")
    
    return all_artifacts
python
import time

def fetch_with_rate_limit(api_key, total_pages, delay=1):
    """Fetch data with rate limiting"""
    all_artifacts = []
    
    for page in range(1, total_pages + 1):
        data = fetch_artifacts(api_key, size=100, page=page)
        all_artifacts.extend(data['records'])
        
        time.sleep(delay)  # Respect API rate limits
        
        if page % 10 == 0:
            print(f"Fetched {len(all_artifacts)} artifacts so far...")
    
    return all_artifacts

Troubleshooting

故障排查

API Key Issues

API密钥问题

  • Verify API key is valid at Harvard Art Museums developer portal
  • Check
    .env
    file is in project root and properly loaded
  • Ensure
    HARVARD_API_KEY
    environment variable is set
  • 在哈佛艺术博物馆开发者门户验证API密钥是否有效
  • 检查
    .env
    文件是否位于项目根目录且已正确加载
  • 确保已设置
    HARVARD_API_KEY
    环境变量

Database Connection Errors

数据库连接错误

  • Verify database credentials in environment variables
  • Check firewall rules allow connection to database host
  • For TiDB Cloud, ensure IP whitelist is configured
  • Test connection with:
    mysql -h HOST -u USER -p
  • 验证环境变量中的数据库凭据
  • 检查防火墙规则是否允许连接到数据库主机
  • 对于TiDB Cloud,确保已配置IP白名单
  • 使用以下命令测试连接:
    mysql -h HOST -u USER -p

Memory Issues with Large Datasets

大数据集内存问题

python
undefined
python
undefined

Process in smaller chunks

Process in smaller chunks

def chunked_etl(total_pages, chunk_size=10): for i in range(0, total_pages, chunk_size): end = min(i + chunk_size, total_pages) run_full_etl_pipeline(num_pages=end-i) print(f"Completed chunk {i}-{end}")
undefined
def chunked_etl(total_pages, chunk_size=10): for i in range(0, total_pages, chunk_size): end = min(i + chunk_size, total_pages) run_full_etl_pipeline(num_pages=end-i) print(f"Completed chunk {i}-{end}")
undefined

Streamlit Performance

Streamlit性能优化

  • Cache database connections: Use
    @st.cache_resource
  • Cache query results: Use
    @st.cache_data
  • Limit initial data load size
python
@st.cache_resource
def get_cached_connection():
    return get_db_connection()

@st.cache_data(ttl=3600)
def cached_query(query):
    conn = get_cached_connection()
    return pd.read_sql(query, conn)
  • 缓存数据库连接: 使用
    @st.cache_resource
  • 缓存查询结果: 使用
    @st.cache_data
  • 限制初始数据加载量
python
@st.cache_resource
def get_cached_connection():
    return get_db_connection()

@st.cache_data(ttl=3600)
def cached_query(query):
    conn = get_cached_connection()
    return pd.read_sql(query, conn)