harvard-art-museum-etl-analytics
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseHarvard Art Museum ETL Analytics Skill
哈佛艺术博物馆ETL分析技能
Overview
概述
The Harvard-Artifacts-Collection-Data-Engineering-Analytics-App is an end-to-end data engineering solution that demonstrates production-grade ETL pipelines using the Harvard Art Museums API. It extracts artifact metadata, transforms nested JSON into relational structures, loads data into SQL databases, and provides interactive analytics dashboards via Streamlit.
Architecture: API → ETL → SQL → Analytics → Visualization
Key Components:
- API integration with Harvard Art Museums
- ETL pipeline with pagination and rate limiting
- Relational database design (MySQL/TiDB Cloud)
- SQL analytics queries
- Streamlit dashboard with Plotly visualizations
Harvard-Artifacts-Collection-Data-Engineering-Analytics-App是一个端到端的数据工程解决方案,展示了基于哈佛艺术博物馆API的生产级ETL管道。它提取文物元数据,将嵌套JSON转换为关系型结构,将数据加载到SQL数据库,并通过Streamlit提供交互式分析仪表盘。
架构: API → ETL → SQL → 分析 → 可视化
核心组件:
- 与哈佛艺术博物馆的API集成
- 带有分页和速率限制的ETL管道
- 关系型数据库设计(MySQL/TiDB Cloud)
- SQL分析查询
- 带有Plotly可视化的Streamlit仪表盘
Installation
安装
bash
undefinedbash
undefinedClone the repository
克隆仓库
git clone https://github.com/Manali0711/Harvard-Artifacts-Collection-Data-Engineering-Analytics-App.git
cd Harvard-Artifacts-Collection-Data-Engineering-Analytics-App
git clone https://github.com/Manali0711/Harvard-Artifacts-Collection-Data-Engineering-Analytics-App.git
cd Harvard-Artifacts-Collection-Data-Engineering-Analytics-App
Install dependencies
安装依赖
pip install -r requirements.txt
**Core Dependencies**:
```txt
streamlit
pandas
requests
mysql-connector-python
plotly
python-dotenvpip install -r requirements.txt
**核心依赖**:
```txt
streamlit
pandas
requests
mysql-connector-python
plotly
python-dotenvConfiguration
配置
Environment Variables
环境变量
Create a file in the project root:
.envbash
undefined在项目根目录创建文件:
.envbash
undefinedHarvard Art Museums API
哈佛艺术博物馆API
HARVARD_API_KEY=your_api_key_here
HARVARD_API_KEY=your_api_key_here
Database Configuration
数据库配置
DB_HOST=your_database_host
DB_PORT=3306
DB_USER=your_db_username
DB_PASSWORD=your_db_password
DB_NAME=harvard_artifacts
undefinedDB_HOST=your_database_host
DB_PORT=3306
DB_USER=your_db_username
DB_PASSWORD=your_db_password
DB_NAME=harvard_artifacts
undefinedGet Harvard API Key
获取哈佛API密钥
Visit Harvard Art Museums API to request a free API key.
访问Harvard Art Museums API申请免费API密钥。
Database Setup
数据库设置
sql
CREATE DATABASE harvard_artifacts;
CREATE TABLE artifactmetadata (
id INT PRIMARY KEY,
title VARCHAR(500),
culture VARCHAR(255),
century VARCHAR(100),
classification VARCHAR(255),
department VARCHAR(255),
technique VARCHAR(500),
period VARCHAR(255),
dated VARCHAR(255),
url TEXT,
totalpageviews INT,
totaluniquepageviews INT
);
CREATE TABLE artifactmedia (
media_id INT AUTO_INCREMENT PRIMARY KEY,
artifact_id INT,
iiifbaseuri TEXT,
baseimageurl TEXT,
format VARCHAR(50),
FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(id)
);
CREATE TABLE artifactcolors (
color_id INT AUTO_INCREMENT PRIMARY KEY,
artifact_id INT,
color VARCHAR(50),
spectrum VARCHAR(50),
hue VARCHAR(50),
percent FLOAT,
FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(id)
);sql
CREATE DATABASE harvard_artifacts;
CREATE TABLE artifactmetadata (
id INT PRIMARY KEY,
title VARCHAR(500),
culture VARCHAR(255),
century VARCHAR(100),
classification VARCHAR(255),
department VARCHAR(255),
technique VARCHAR(500),
period VARCHAR(255),
dated VARCHAR(255),
url TEXT,
totalpageviews INT,
totaluniquepageviews INT
);
CREATE TABLE artifactmedia (
media_id INT AUTO_INCREMENT PRIMARY KEY,
artifact_id INT,
iiifbaseuri TEXT,
baseimageurl TEXT,
format VARCHAR(50),
FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(id)
);
CREATE TABLE artifactcolors (
color_id INT AUTO_INCREMENT PRIMARY KEY,
artifact_id INT,
color VARCHAR(50),
spectrum VARCHAR(50),
hue VARCHAR(50),
percent FLOAT,
FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(id)
);Running the Application
运行应用
bash
undefinedbash
undefinedStart the Streamlit app
启动Streamlit应用
streamlit run app.py
The app will open at `http://localhost:8501`streamlit run app.py
应用将在`http://localhost:8501`打开ETL Pipeline Implementation
ETL管道实现
Extract: Fetch Data from API
提取:从API获取数据
python
import requests
import os
from dotenv import load_dotenv
load_dotenv()
def fetch_artifacts(page=1, size=100):
"""
Fetch artifacts from Harvard Art Museums API with pagination
"""
api_key = os.getenv('HARVARD_API_KEY')
base_url = "https://api.harvardartmuseums.org/object"
params = {
'apikey': api_key,
'page': page,
'size': size,
'hasimage': 1 # Only artifacts with images
}
response = requests.get(base_url, params=params)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"API Error: {response.status_code}")
def fetch_all_artifacts(max_pages=10):
"""
Fetch multiple pages with rate limiting
"""
import time
all_artifacts = []
for page in range(1, max_pages + 1):
print(f"Fetching page {page}...")
data = fetch_artifacts(page=page, size=100)
all_artifacts.extend(data['records'])
# Rate limiting
time.sleep(1)
if not data.get('info', {}).get('next'):
break
return all_artifactspython
import requests
import os
from dotenv import load_dotenv
load_dotenv()
def fetch_artifacts(page=1, size=100):
"""
通过分页从哈佛艺术博物馆API获取文物数据
"""
api_key = os.getenv('HARVARD_API_KEY')
base_url = "https://api.harvardartmuseums.org/object"
params = {
'apikey': api_key,
'page': page,
'size': size,
'hasimage': 1 # 仅获取带有图片的文物
}
response = requests.get(base_url, params=params)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"API错误: {response.status_code}")
def fetch_all_artifacts(max_pages=10):
"""
通过速率限制获取多页数据
"""
import time
all_artifacts = []
for page in range(1, max_pages + 1):
print(f"正在获取第{page}页...")
data = fetch_artifacts(page=page, size=100)
all_artifacts.extend(data['records'])
# 速率限制
time.sleep(1)
if not data.get('info', {}).get('next'):
break
return all_artifactsTransform: Process JSON to Relational Format
转换:将JSON处理为关系型格式
python
import pandas as pd
def transform_metadata(artifacts):
"""
Transform artifact metadata into structured DataFrame
"""
metadata = []
for artifact in artifacts:
metadata.append({
'id': artifact.get('id'),
'title': artifact.get('title', '')[:500],
'culture': artifact.get('culture', ''),
'century': artifact.get('century', ''),
'classification': artifact.get('classification', ''),
'department': artifact.get('department', ''),
'technique': artifact.get('technique', '')[:500],
'period': artifact.get('period', ''),
'dated': artifact.get('dated', ''),
'url': artifact.get('url', ''),
'totalpageviews': artifact.get('totalpageviews', 0),
'totaluniquepageviews': artifact.get('totaluniquepageviews', 0)
})
return pd.DataFrame(metadata)
def transform_media(artifacts):
"""
Extract media/image information
"""
media = []
for artifact in artifacts:
artifact_id = artifact.get('id')
images = artifact.get('images', [])
for img in images:
media.append({
'artifact_id': artifact_id,
'iiifbaseuri': img.get('iiifbaseuri', ''),
'baseimageurl': img.get('baseimageurl', ''),
'format': img.get('format', '')
})
return pd.DataFrame(media)
def transform_colors(artifacts):
"""
Extract color information
"""
colors = []
for artifact in artifacts:
artifact_id = artifact.get('id')
color_data = artifact.get('colors', [])
for color in color_data:
colors.append({
'artifact_id': artifact_id,
'color': color.get('color', ''),
'spectrum': color.get('spectrum', ''),
'hue': color.get('hue', ''),
'percent': color.get('percent', 0.0)
})
return pd.DataFrame(colors)python
import pandas as pd
def transform_metadata(artifacts):
"""
将文物元数据转换为结构化DataFrame
"""
metadata = []
for artifact in artifacts:
metadata.append({
'id': artifact.get('id'),
'title': artifact.get('title', '')[:500],
'culture': artifact.get('culture', ''),
'century': artifact.get('century', ''),
'classification': artifact.get('classification', ''),
'department': artifact.get('department', ''),
'technique': artifact.get('technique', '')[:500],
'period': artifact.get('period', ''),
'dated': artifact.get('dated', ''),
'url': artifact.get('url', ''),
'totalpageviews': artifact.get('totalpageviews', 0),
'totaluniquepageviews': artifact.get('totaluniquepageviews', 0)
})
return pd.DataFrame(metadata)
def transform_media(artifacts):
"""
提取媒体/图片信息
"""
media = []
for artifact in artifacts:
artifact_id = artifact.get('id')
images = artifact.get('images', [])
for img in images:
media.append({
'artifact_id': artifact_id,
'iiifbaseuri': img.get('iiifbaseuri', ''),
'baseimageurl': img.get('baseimageurl', ''),
'format': img.get('format', '')
})
return pd.DataFrame(media)
def transform_colors(artifacts):
"""
提取颜色信息
"""
colors = []
for artifact in artifacts:
artifact_id = artifact.get('id')
color_data = artifact.get('colors', [])
for color in color_data:
colors.append({
'artifact_id': artifact_id,
'color': color.get('color', ''),
'spectrum': color.get('spectrum', ''),
'hue': color.get('hue', ''),
'percent': color.get('percent', 0.0)
})
return pd.DataFrame(colors)Load: Insert into SQL Database
加载:插入到SQL数据库
python
import mysql.connector
from mysql.connector import Error
def get_db_connection():
"""
Create database connection
"""
return mysql.connector.connect(
host=os.getenv('DB_HOST'),
port=int(os.getenv('DB_PORT', 3306)),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASSWORD'),
database=os.getenv('DB_NAME')
)
def load_metadata(df_metadata):
"""
Batch insert metadata using executemany for performance
"""
conn = get_db_connection()
cursor = conn.cursor()
insert_query = """
INSERT INTO artifactmetadata
(id, title, culture, century, classification, department,
technique, period, dated, url, totalpageviews, totaluniquepageviews)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
title=VALUES(title), culture=VALUES(culture)
"""
data = df_metadata.values.tolist()
cursor.executemany(insert_query, data)
conn.commit()
cursor.close()
conn.close()
print(f"Loaded {len(data)} metadata records")
def load_media(df_media):
"""
Load media/image data
"""
conn = get_db_connection()
cursor = conn.cursor()
insert_query = """
INSERT INTO artifactmedia
(artifact_id, iiifbaseuri, baseimageurl, format)
VALUES (%s, %s, %s, %s)
"""
data = df_media.values.tolist()
cursor.executemany(insert_query, data)
conn.commit()
cursor.close()
conn.close()
print(f"Loaded {len(data)} media records")
def load_colors(df_colors):
"""
Load color data
"""
conn = get_db_connection()
cursor = conn.cursor()
insert_query = """
INSERT INTO artifactcolors
(artifact_id, color, spectrum, hue, percent)
VALUES (%s, %s, %s, %s, %s)
"""
data = df_colors.values.tolist()
cursor.executemany(insert_query, data)
conn.commit()
cursor.close()
conn.close()
print(f"Loaded {len(data)} color records")python
import mysql.connector
from mysql.connector import Error
def get_db_connection():
"""
创建数据库连接
"""
return mysql.connector.connect(
host=os.getenv('DB_HOST'),
port=int(os.getenv('DB_PORT', 3306)),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASSWORD'),
database=os.getenv('DB_NAME')
)
def load_metadata(df_metadata):
"""
使用executemany批量插入元数据以提升性能
"""
conn = get_db_connection()
cursor = conn.cursor()
insert_query = """
INSERT INTO artifactmetadata
(id, title, culture, century, classification, department,
technique, period, dated, url, totalpageviews, totaluniquepageviews)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
title=VALUES(title), culture=VALUES(culture)
"""
data = df_metadata.values.tolist()
cursor.executemany(insert_query, data)
conn.commit()
cursor.close()
conn.close()
print(f"已加载{len(data)}条元数据记录")
def load_media(df_media):
"""
加载媒体/图片数据
"""
conn = get_db_connection()
cursor = conn.cursor()
insert_query = """
INSERT INTO artifactmedia
(artifact_id, iiifbaseuri, baseimageurl, format)
VALUES (%s, %s, %s, %s)
"""
data = df_media.values.tolist()
cursor.executemany(insert_query, data)
conn.commit()
cursor.close()
conn.close()
print(f"已加载{len(data)}条媒体记录")
def load_colors(df_colors):
"""
加载颜色数据
"""
conn = get_db_connection()
cursor = conn.cursor()
insert_query = """
INSERT INTO artifactcolors
(artifact_id, color, spectrum, hue, percent)
VALUES (%s, %s, %s, %s, %s)
"""
data = df_colors.values.tolist()
cursor.executemany(insert_query, data)
conn.commit()
cursor.close()
conn.close()
print(f"已加载{len(data)}条颜色记录")Complete ETL Script
完整ETL脚本
python
undefinedpython
undefinedetl_pipeline.py
etl_pipeline.py
import os
from dotenv import load_dotenv
load_dotenv()
def run_etl_pipeline(max_pages=5):
"""
Execute complete ETL pipeline
"""
print("Starting ETL Pipeline...")
# Extract
print("Step 1: Extracting data from API...")
artifacts = fetch_all_artifacts(max_pages=max_pages)
print(f"Extracted {len(artifacts)} artifacts")
# Transform
print("Step 2: Transforming data...")
df_metadata = transform_metadata(artifacts)
df_media = transform_media(artifacts)
df_colors = transform_colors(artifacts)
# Load
print("Step 3: Loading data to database...")
load_metadata(df_metadata)
load_media(df_media)
load_colors(df_colors)
print("ETL Pipeline completed successfully!")if name == "main":
run_etl_pipeline(max_pages=10)
undefinedimport os
from dotenv import load_dotenv
load_dotenv()
def run_etl_pipeline(max_pages=5):
"""
执行完整ETL管道
"""
print("正在启动ETL管道...")
# 提取
print("步骤1:从API提取数据...")
artifacts = fetch_all_artifacts(max_pages=max_pages)
print(f"已提取{len(artifacts)}件文物数据")
# 转换
print("步骤2:转换数据...")
df_metadata = transform_metadata(artifacts)
df_media = transform_media(artifacts)
df_colors = transform_colors(artifacts)
# 加载
print("步骤3:将数据加载到数据库...")
load_metadata(df_metadata)
load_media(df_media)
load_colors(df_colors)
print("ETL管道执行成功!")if name == "main":
run_etl_pipeline(max_pages=10)
undefinedStreamlit Dashboard Implementation
Streamlit仪表盘实现
python
undefinedpython
undefinedapp.py
app.py
import streamlit as st
import pandas as pd
import plotly.express as px
import mysql.connector
import os
from dotenv import load_dotenv
load_dotenv()
st.set_page_config(
page_title="Harvard Artifacts Analytics",
page_icon="🏛️",
layout="wide"
)
st.title("🏛️ Harvard Art Museums Analytics Dashboard")
def execute_query(query):
"""
Execute SQL query and return results as DataFrame
"""
conn = mysql.connector.connect(
host=os.getenv('DB_HOST'),
port=int(os.getenv('DB_PORT', 3306)),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASSWORD'),
database=os.getenv('DB_NAME')
)
df = pd.read_sql(query, conn)
conn.close()
return dfimport streamlit as st
import pandas as pd
import plotly.express as px
import mysql.connector
import os
from dotenv import load_dotenv
load_dotenv()
st.set_page_config(
page_title="哈佛文物分析",
page_icon="🏛️",
layout="wide"
)
st.title("🏛️ 哈佛艺术博物馆分析仪表盘")
def execute_query(query):
"""
执行SQL查询并将结果以DataFrame返回
"""
conn = mysql.connector.connect(
host=os.getenv('DB_HOST'),
port=int(os.getenv('DB_PORT', 3306)),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASSWORD'),
database=os.getenv('DB_NAME')
)
df = pd.read_sql(query, conn)
conn.close()
return dfSidebar with query selection
侧边栏查询选择
st.sidebar.header("Analytics Queries")
queries = {
"Artifacts by Culture": """
SELECT culture, COUNT() as count
FROM artifactmetadata
WHERE culture IS NOT NULL AND culture != ''
GROUP BY culture
ORDER BY count DESC
LIMIT 20
""",
"Artifacts by Century": """
SELECT century, COUNT() as count
FROM artifactmetadata
WHERE century IS NOT NULL AND century != ''
GROUP BY century
ORDER BY count DESC
""",
"Top Departments": """
SELECT department, COUNT() as artifact_count
FROM artifactmetadata
WHERE department IS NOT NULL
GROUP BY department
ORDER BY artifact_count DESC
""",
"Most Viewed Artifacts": """
SELECT title, culture, totalpageviews
FROM artifactmetadata
WHERE totalpageviews > 0
ORDER BY totalpageviews DESC
LIMIT 15
""",
"Color Distribution": """
SELECT color, COUNT() as frequency
FROM artifactcolors
GROUP BY color
ORDER BY frequency DESC
LIMIT 10
""",
"Media Format Analysis": """
SELECT format, COUNT(*) as count
FROM artifactmedia
WHERE format IS NOT NULL
GROUP BY format
ORDER BY count DESC
"""
}
selected_query = st.sidebar.selectbox("Select Analysis", list(queries.keys()))
st.sidebar.header("分析查询")
queries = {
"按文化分类的文物": """
SELECT culture, COUNT() as count
FROM artifactmetadata
WHERE culture IS NOT NULL AND culture != ''
GROUP BY culture
ORDER BY count DESC
LIMIT 20
""",
"按世纪分类的文物": """
SELECT century, COUNT() as count
FROM artifactmetadata
WHERE century IS NOT NULL AND century != ''
GROUP BY century
ORDER BY count DESC
""",
"热门部门": """
SELECT department, COUNT() as artifact_count
FROM artifactmetadata
WHERE department IS NOT NULL
GROUP BY department
ORDER BY artifact_count DESC
""",
"浏览量最高的文物": """
SELECT title, culture, totalpageviews
FROM artifactmetadata
WHERE totalpageviews > 0
ORDER BY totalpageviews DESC
LIMIT 15
""",
"颜色分布": """
SELECT color, COUNT() as frequency
FROM artifactcolors
GROUP BY color
ORDER BY frequency DESC
LIMIT 10
""",
"媒体格式分析": """
SELECT format, COUNT(*) as count
FROM artifactmedia
WHERE format IS NOT NULL
GROUP BY format
ORDER BY count DESC
"""
}
selected_query = st.sidebar.selectbox("选择分析类型", list(queries.keys()))
Execute and display results
执行并展示结果
if st.sidebar.button("Run Analysis"):
with st.spinner("Running query..."):
df = execute_query(queries[selected_query])
col1, col2 = st.columns([1, 2])
with col1:
st.subheader("Query Results")
st.dataframe(df, use_container_width=True)
with col2:
st.subheader("Visualization")
if len(df) > 0:
x_col = df.columns[0]
y_col = df.columns[1]
fig = px.bar(
df,
x=x_col,
y=y_col,
title=selected_query,
labels={x_col: x_col.title(), y_col: y_col.title()}
)
st.plotly_chart(fig, use_container_width=True)if st.sidebar.button("运行分析"):
with st.spinner("正在执行查询..."):
df = execute_query(queries[selected_query])
col1, col2 = st.columns([1, 2])
with col1:
st.subheader("查询结果")
st.dataframe(df, use_container_width=True)
with col2:
st.subheader("可视化图表")
if len(df) > 0:
x_col = df.columns[0]
y_col = df.columns[1]
fig = px.bar(
df,
x=x_col,
y=y_col,
title=selected_query,
labels={x_col: x_col.title(), y_col: y_col.title()}
)
st.plotly_chart(fig, use_container_width=True)Summary statistics
汇总统计
st.header("📊 Collection Summary")
col1, col2, col3 = st.columns(3)
with col1:
total_artifacts = execute_query("SELECT COUNT(*) as count FROM artifactmetadata")
st.metric("Total Artifacts", total_artifacts['count'].iloc[0])
with col2:
total_images = execute_query("SELECT COUNT(*) as count FROM artifactmedia")
st.metric("Total Images", total_images['count'].iloc[0])
with col3:
unique_cultures = execute_query("SELECT COUNT(DISTINCT culture) as count FROM artifactmetadata WHERE culture IS NOT NULL")
st.metric("Unique Cultures", unique_cultures['count'].iloc[0])
undefinedst.header("📊 藏品汇总")
col1, col2, col3 = st.columns(3)
with col1:
total_artifacts = execute_query("SELECT COUNT(*) as count FROM artifactmetadata")
st.metric("文物总数", total_artifacts['count'].iloc[0])
with col2:
total_images = execute_query("SELECT COUNT(*) as count FROM artifactmedia")
st.metric("图片总数", total_images['count'].iloc[0])
with col3:
unique_cultures = execute_query("SELECT COUNT(DISTINCT culture) as count FROM artifactmetadata WHERE culture IS NOT NULL")
st.metric("独特文化数量", unique_cultures['count'].iloc[0])
undefinedCommon Analytical Queries
常见分析查询
sql
-- Artifacts without images
SELECT COUNT(*)
FROM artifactmetadata m
LEFT JOIN artifactmedia media ON m.id = media.artifact_id
WHERE media.artifact_id IS NULL;
-- Most common color per culture
SELECT m.culture, c.color, COUNT(*) as frequency
FROM artifactmetadata m
JOIN artifactcolors c ON m.id = c.artifact_id
WHERE m.culture IS NOT NULL
GROUP BY m.culture, c.color
ORDER BY m.culture, frequency DESC;
-- Classification distribution by department
SELECT department, classification, COUNT(*) as count
FROM artifactmetadata
WHERE department IS NOT NULL AND classification IS NOT NULL
GROUP BY department, classification
ORDER BY department, count DESC;
-- Average pageviews by century
SELECT century, AVG(totalpageviews) as avg_views
FROM artifactmetadata
WHERE century IS NOT NULL AND totalpageviews > 0
GROUP BY century
ORDER BY avg_views DESC;sql
-- 无图片的文物数量
SELECT COUNT(*)
FROM artifactmetadata m
LEFT JOIN artifactmedia media ON m.id = media.artifact_id
WHERE media.artifact_id IS NULL;
-- 各文化最常见的颜色
SELECT m.culture, c.color, COUNT(*) as frequency
FROM artifactmetadata m
JOIN artifactcolors c ON m.id = c.artifact_id
WHERE m.culture IS NOT NULL
GROUP BY m.culture, c.color
ORDER BY m.culture, frequency DESC;
-- 各部门的分类分布
SELECT department, classification, COUNT(*) as count
FROM artifactmetadata
WHERE department IS NOT NULL AND classification IS NOT NULL
GROUP BY department, classification
ORDER BY department, count DESC;
-- 各世纪平均浏览量
SELECT century, AVG(totalpageviews) as avg_views
FROM artifactmetadata
WHERE century IS NOT NULL AND totalpageviews > 0
GROUP BY century
ORDER BY avg_views DESC;Troubleshooting
故障排除
API Rate Limiting
API速率限制
python
import time
from functools import wraps
def rate_limit(calls_per_second=1):
"""
Decorator to rate limit API calls
"""
min_interval = 1.0 / calls_per_second
def decorator(func):
last_called = [0.0]
@wraps(func)
def wrapper(*args, **kwargs):
elapsed = time.time() - last_called[0]
wait_time = min_interval - elapsed
if wait_time > 0:
time.sleep(wait_time)
result = func(*args, **kwargs)
last_called[0] = time.time()
return result
return wrapper
return decorator
@rate_limit(calls_per_second=1)
def fetch_artifacts_with_limit(page=1):
return fetch_artifacts(page=page)python
import time
from functools import wraps
def rate_limit(calls_per_second=1):
"""
用于限制API调用速率的装饰器
"""
min_interval = 1.0 / calls_per_second
def decorator(func):
last_called = [0.0]
@wraps(func)
def wrapper(*args, **kwargs):
elapsed = time.time() - last_called[0]
wait_time = min_interval - elapsed
if wait_time > 0:
time.sleep(wait_time)
result = func(*args, **kwargs)
last_called[0] = time.time()
return result
return wrapper
return decorator
@rate_limit(calls_per_second=1)
def fetch_artifacts_with_limit(page=1):
return fetch_artifacts(page=page)Database Connection Pooling
数据库连接池
python
from mysql.connector import pooling
db_pool = pooling.MySQLConnectionPool(
pool_name="harvard_pool",
pool_size=5,
host=os.getenv('DB_HOST'),
port=int(os.getenv('DB_PORT', 3306)),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASSWORD'),
database=os.getenv('DB_NAME')
)
def get_pooled_connection():
return db_pool.get_connection()python
from mysql.connector import pooling
db_pool = pooling.MySQLConnectionPool(
pool_name="harvard_pool",
pool_size=5,
host=os.getenv('DB_HOST'),
port=int(os.getenv('DB_PORT', 3306)),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASSWORD'),
database=os.getenv('DB_NAME')
)
def get_pooled_connection():
return db_pool.get_connection()Handling Large Datasets
处理大型数据集
python
def batch_load_data(df, table_name, batch_size=1000):
"""
Load data in batches to avoid memory issues
"""
conn = get_db_connection()
for i in range(0, len(df), batch_size):
batch = df.iloc[i:i+batch_size]
batch.to_sql(
table_name,
conn,
if_exists='append',
index=False,
method='multi'
)
print(f"Loaded batch {i//batch_size + 1}")
conn.close()python
def batch_load_data(df, table_name, batch_size=1000):
"""
分批加载数据以避免内存问题
"""
conn = get_db_connection()
for i in range(0, len(df), batch_size):
batch = df.iloc[i:i+batch_size]
batch.to_sql(
table_name,
conn,
if_exists='append',
index=False,
method='multi'
)
print(f"已加载第{i//batch_size + 1}批数据")
conn.close()Best Practices
最佳实践
- Always use environment variables for sensitive credentials
- Implement retry logic for API calls to handle transient failures
- Use batch inserts () for better database performance
executemany - Add error logging to track ETL pipeline failures
- Validate data before loading into database
- Create indexes on frequently queried columns (culture, century, department)
- Schedule ETL jobs using cron or Apache Airflow for production environments
- 始终使用环境变量存储敏感凭证
- 实现重试逻辑以处理API调用的临时故障
- 使用批量插入()提升数据库性能
executemany - 添加错误日志以跟踪ETL管道的故障
- 在加载到数据库前验证数据
- 在频繁查询的列上创建索引(文化、世纪、部门)
- 在生产环境中使用cron或Apache Airflow调度ETL任务