performance-expert
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChinesePerformance Expert
性能优化专家
Expert guidance for performance optimization, profiling, benchmarking, and system tuning.
为性能优化、性能分析、基准测试与系统调优提供专家级指导。
Core Concepts
核心概念
Performance Fundamentals
性能基础
- Response time vs throughput
- Latency vs bandwidth
- CPU, memory, I/O bottlenecks
- Concurrency vs parallelism
- Caching strategies
- Load balancing
- 响应时间 vs 吞吐量
- 延迟 vs 带宽
- CPU、内存、I/O瓶颈
- 并发 vs 并行
- 缓存策略
- 负载均衡
Optimization Areas
优化领域
- Algorithm optimization
- Database optimization
- Network optimization
- Frontend performance
- Backend performance
- Infrastructure tuning
- 算法优化
- 数据库优化
- 网络优化
- 前端性能
- 后端性能
- 基础设施调优
Profiling Tools
性能分析工具
- CPU profilers
- Memory profilers
- Network profilers
- Application Performance Monitoring (APM)
- Load testing tools
- CPU分析器
- 内存分析器
- 网络分析器
- 应用性能监控(APM)
- 负载测试工具
Python Performance
Python性能优化
python
import cProfile
import pstats
import timeit
import memory_profiler
from functools import lru_cache
from typing import List
import numpy as nppython
import cProfile
import pstats
import timeit
import memory_profiler
from functools import lru_cache
from typing import List
import numpy as npPerformance Profiling
性能分析装饰器
def profile_function(func):
"""Decorator for profiling function execution"""
def wrapper(*args, **kwargs):
profiler = cProfile.Profile()
profiler.enable()
result = func(*args, **kwargs)
profiler.disable()
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(10) # Top 10 functions
return result
return wrapper@profile_function
def slow_function():
total = 0
for i in range(1000000):
total += i
return total
def profile_function(func):
"""用于分析函数执行的装饰器"""
def wrapper(*args, **kwargs):
profiler = cProfile.Profile()
profiler.enable()
result = func(*args, **kwargs)
profiler.disable()
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(10) # 输出前10个函数
return result
return wrapper@profile_function
def slow_function():
total = 0
for i in range(1000000):
total += i
return total
Memoization for expensive computations
高开销计算的记忆化
@lru_cache(maxsize=128)
def fibonacci(n: int) -> int:
"""Cached Fibonacci calculation"""
if n < 2:
return n
return fibonacci(n-1) + fibonacci(n-2)
@lru_cache(maxsize=128)
def fibonacci(n: int) -> int:
"""带缓存的斐波那契计算"""
if n < 2:
return n
return fibonacci(n-1) + fibonacci(n-2)
Vectorization with NumPy
使用NumPy向量化
def slow_loop(data: List[float]) -> List[float]:
"""Slow: Using Python loops"""
return [x ** 2 + 2 * x + 1 for x in data]
def fast_vectorized(data: np.ndarray) -> np.ndarray:
"""Fast: Using NumPy vectorization"""
return data ** 2 + 2 * data + 1
def slow_loop(data: List[float]) -> List[float]:
"""低效:使用Python循环"""
return [x ** 2 + 2 * x + 1 for x in data]
def fast_vectorized(data: np.ndarray) -> np.ndarray:
"""高效:使用NumPy向量化"""
return data ** 2 + 2 * data + 1
Benchmarking
基准测试
def benchmark_function(func, *args, iterations=1000):
"""Benchmark function execution time"""
total_time = timeit.timeit(
lambda: func(*args),
number=iterations
)
avg_time = total_time / iterations
return {
'total_time': total_time,
'avg_time': avg_time,
'iterations': iterations
}def benchmark_function(func, *args, iterations=1000):
"""基准测试函数执行时间"""
total_time = timeit.timeit(
lambda: func(*args),
number=iterations
)
avg_time = total_time / iterations
return {
'total_time': total_time,
'avg_time': avg_time,
'iterations': iterations
}Memory profiling
内存分析
@memory_profiler.profile
def memory_intensive_function():
"""Function that uses significant memory"""
data = [i for i in range(1000000)]
return sum(data)
@memory_profiler.profile
def memory_intensive_function():
"""占用大量内存的函数"""
data = [i for i in range(1000000)]
return sum(data)
Efficient string concatenation
高效字符串拼接
def slow_string_concat(items: List[str]) -> str:
"""Slow: String concatenation in loop"""
result = ""
for item in items:
result += item # Creates new string each time
return result
def fast_string_concat(items: List[str]) -> str:
"""Fast: Using join"""
return "".join(items)
def slow_string_concat(items: List[str]) -> str:
"""低效:循环中拼接字符串"""
result = ""
for item in items:
result += item # 每次都会创建新字符串
return result
def fast_string_concat(items: List[str]) -> str:
"""高效:使用join方法"""
return "".join(items)
Generator for memory efficiency
内存高效的生成器
def slow_list_comprehension(n: int) -> List[int]:
"""Returns all squares at once"""
return [i ** 2 for i in range(n)]
def fast_generator(n: int):
"""Yields squares one at a time"""
for i in range(n):
yield i ** 2
undefineddef slow_list_comprehension(n: int) -> List[int]:
"""一次性返回所有平方数"""
return [i ** 2 for i in range(n)]
def fast_generator(n: int):
"""逐个生成平方数"""
for i in range(n):
yield i ** 2
undefinedDatabase Optimization
数据库优化
python
from sqlalchemy import create_engine, Index, text
from sqlalchemy.orm import sessionmaker
import redispython
from sqlalchemy import create_engine, Index, text
from sqlalchemy.orm import sessionmaker
import redisConnection pooling
连接池
engine = create_engine(
'postgresql://user:pass@localhost/db',
pool_size=20,
max_overflow=0,
pool_pre_ping=True,
pool_recycle=3600
)
engine = create_engine(
'postgresql://user:pass@localhost/db',
pool_size=20,
max_overflow=0,
pool_pre_ping=True,
pool_recycle=3600
)
Query optimization
查询优化
class DatabaseOptimizer:
def init(self, session):
self.session = session
def bad_n_plus_one(self):
"""N+1 query problem"""
users = self.session.query(User).all()
for user in users:
posts = user.posts # Triggers additional query per user
print(f"{user.name}: {len(posts)} posts")
def good_eager_loading(self):
"""Eager loading to avoid N+1"""
from sqlalchemy.orm import joinedload
users = self.session.query(User)\
.options(joinedload(User.posts))\
.all()
for user in users:
posts = user.posts # No additional query
print(f"{user.name}: {len(posts)} posts")
def use_indexes(self):
"""Create indexes for frequent queries"""
# Index on frequently queried columns
Index('idx_user_email', User.email)
Index('idx_post_created_at', Post.created_at)
# Composite index
Index('idx_post_user_status', Post.user_id, Post.status)
def batch_operations(self, items: List[dict]):
"""Batch insert instead of individual inserts"""
# Bad: Individual inserts
# for item in items:
# self.session.add(User(**item))
# self.session.commit()
# Good: Batch insert
self.session.bulk_insert_mappings(User, items)
self.session.commit()
def use_pagination(self, page: int = 1, page_size: int = 20):
"""Paginate large result sets"""
offset = (page - 1) * page_size
return self.session.query(User)\
.order_by(User.created_at.desc())\
.limit(page_size)\
.offset(offset)\
.all()class DatabaseOptimizer:
def init(self, session):
self.session = session
def bad_n_plus_one(self):
"""N+1查询问题"""
users = self.session.query(User).all()
for user in users:
posts = user.posts # 每个用户都会触发额外查询
print(f"{user.name}: {len(posts)} posts")
def good_eager_loading(self):
"""预加载避免N+1问题"""
from sqlalchemy.orm import joinedload
users = self.session.query(User)\
.options(joinedload(User.posts))\
.all()
for user in users:
posts = user.posts # 无额外查询
print(f"{user.name}: {len(posts)} posts")
def use_indexes(self):
"""为频繁查询创建索引"""
# 为频繁查询的列创建索引
Index('idx_user_email', User.email)
Index('idx_post_created_at', Post.created_at)
# 复合索引
Index('idx_post_user_status', Post.user_id, Post.status)
def batch_operations(self, items: List[dict]):
"""批量插入而非单个插入"""
# 低效:单个插入
# for item in items:
# self.session.add(User(**item))
# self.session.commit()
# 高效:批量插入
self.session.bulk_insert_mappings(User, items)
self.session.commit()
def use_pagination(self, page: int = 1, page_size: int = 20):
"""对大型结果集进行分页"""
offset = (page - 1) * page_size
return self.session.query(User)\
.order_by(User.created_at.desc())\
.limit(page_size)\
.offset(offset)\
.all()Redis caching
Redis缓存
class CacheOptimizer:
def init(self, redis_client: redis.Redis):
self.redis = redis_client
def cache_query_result(self, key: str, query_func, ttl: int = 3600):
"""Cache database query results"""
# Check cache first
cached = self.redis.get(key)
if cached:
return json.loads(cached)
# Execute query
result = query_func()
# Cache result
self.redis.setex(key, ttl, json.dumps(result))
return result
def cache_aside_pattern(self, key: str, fetch_func, ttl: int = 3600):
"""Implement cache-aside pattern"""
data = self.redis.get(key)
if data is None:
data = fetch_func()
self.redis.setex(key, ttl, json.dumps(data))
else:
data = json.loads(data)
return dataundefinedclass CacheOptimizer:
def init(self, redis_client: redis.Redis):
self.redis = redis_client
def cache_query_result(self, key: str, query_func, ttl: int = 3600):
"""缓存数据库查询结果"""
# 先检查缓存
cached = self.redis.get(key)
if cached:
return json.loads(cached)
# 执行查询
result = query_func()
# 缓存结果
self.redis.setex(key, ttl, json.dumps(result))
return result
def cache_aside_pattern(self, key: str, fetch_func, ttl: int = 3600):
"""实现旁路缓存模式"""
data = self.redis.get(key)
if data is None:
data = fetch_func()
self.redis.setex(key, ttl, json.dumps(data))
else:
data = json.loads(data)
return dataundefinedFrontend Performance
前端性能优化
javascript
// Debouncing for expensive operations
function debounce(func, wait) {
let timeout;
return function executedFunction(...args) {
const later = () => {
clearTimeout(timeout);
func(...args);
};
clearTimeout(timeout);
timeout = setTimeout(later, wait);
};
}
// Example: Debounce search input
const searchInput = document.getElementById('search');
const debouncedSearch = debounce((query) => {
// Expensive search operation
fetchSearchResults(query);
}, 300);
searchInput.addEventListener('input', (e) => {
debouncedSearch(e.target.value);
});
// Lazy loading images
const lazyLoadImages = () => {
const images = document.querySelectorAll('img[data-src]');
const imageObserver = new IntersectionObserver((entries) => {
entries.forEach(entry => {
if (entry.isIntersecting) {
const img = entry.target;
img.src = img.dataset.src;
img.removeAttribute('data-src');
imageObserver.unobserve(img);
}
});
});
images.forEach(img => imageObserver.observe(img));
};
// Virtual scrolling for large lists
class VirtualScroller {
constructor(container, items, itemHeight) {
this.container = container;
this.items = items;
this.itemHeight = itemHeight;
this.visibleItems = Math.ceil(container.clientHeight / itemHeight);
this.render();
}
render() {
const scrollTop = this.container.scrollTop;
const startIndex = Math.floor(scrollTop / this.itemHeight);
const endIndex = startIndex + this.visibleItems;
// Only render visible items
const visibleData = this.items.slice(startIndex, endIndex);
this.container.innerHTML = visibleData
.map(item => `<div style="height: ${this.itemHeight}px">${item}</div>`)
.join('');
}
}
// Code splitting with dynamic imports
async function loadModule() {
const module = await import('./heavy-module.js');
module.init();
}javascript
// 针对高开销操作的防抖处理
function debounce(func, wait) {
let timeout;
return function executedFunction(...args) {
const later = () => {
clearTimeout(timeout);
func(...args);
};
clearTimeout(timeout);
timeout = setTimeout(later, wait);
};
}
// 示例:搜索输入防抖
const searchInput = document.getElementById('search');
const debouncedSearch = debounce((query) => {
// 高开销搜索操作
fetchSearchResults(query);
}, 300);
searchInput.addEventListener('input', (e) => {
debouncedSearch(e.target.value);
});
// 图片懒加载
const lazyLoadImages = () => {
const images = document.querySelectorAll('img[data-src]');
const imageObserver = new IntersectionObserver((entries) => {
entries.forEach(entry => {
if (entry.isIntersecting) {
const img = entry.target;
img.src = img.dataset.src;
img.removeAttribute('data-src');
imageObserver.unobserve(img);
}
});
});
images.forEach(img => imageObserver.observe(img));
};
// 大型列表虚拟滚动
class VirtualScroller {
constructor(container, items, itemHeight) {
this.container = container;
this.items = items;
this.itemHeight = itemHeight;
this.visibleItems = Math.ceil(container.clientHeight / itemHeight);
this.render();
}
render() {
const scrollTop = this.container.scrollTop;
const startIndex = Math.floor(scrollTop / this.itemHeight);
const endIndex = startIndex + this.visibleItems;
// 仅渲染可见项
const visibleData = this.items.slice(startIndex, endIndex);
this.container.innerHTML = visibleData
.map(item => `<div style="height: ${this.itemHeight}px">${item}</div>`)
.join('');
}
}
// 动态导入实现代码分割
async function loadModule() {
const module = await import('./heavy-module.js');
module.init();
}API Performance
API性能优化
python
from fastapi import FastAPI, BackgroundTasks
from fastapi.responses import StreamingResponse
import asyncio
from typing import AsyncGenerator
app = FastAPI()python
from fastapi import FastAPI, BackgroundTasks
from fastapi.responses import StreamingResponse
import asyncio
from typing import AsyncGenerator
app = FastAPI()Async endpoints for I/O bound operations
针对I/O密集型操作的异步端点
@app.get("/users/{user_id}")
async def get_user(user_id: str):
"""Async endpoint for database queries"""
user = await db.fetch_user(user_id)
return user
@app.get("/users/{user_id}")
async def get_user(user_id: str):
"""用于数据库查询的异步端点"""
user = await db.fetch_user(user_id)
return user
Streaming responses for large data
大型数据流式响应
async def generate_large_file() -> AsyncGenerator[bytes, None]:
"""Stream large file in chunks"""
chunk_size = 8192
with open("large_file.csv", "rb") as f:
while chunk := f.read(chunk_size):
yield chunk
await asyncio.sleep(0) # Allow other tasks to run
@app.get("/download")
async def download_large_file():
return StreamingResponse(
generate_large_file(),
media_type="text/csv"
)
async def generate_large_file() -> AsyncGenerator[bytes, None]:
"""分块流式传输大型文件"""
chunk_size = 8192
with open("large_file.csv", "rb") as f:
while chunk := f.read(chunk_size):
yield chunk
await asyncio.sleep(0) # 允许其他任务执行
@app.get("/download")
async def download_large_file():
return StreamingResponse(
generate_large_file(),
media_type="text/csv"
)
Background tasks for long-running operations
后台任务处理长时间运行的操作
@app.post("/process")
async def process_data(background_tasks: BackgroundTasks):
"""Offload heavy processing to background"""
background_tasks.add_task(heavy_processing_task)
return {"status": "processing started"}
@app.post("/process")
async def process_data(background_tasks: BackgroundTasks):
"""将重处理任务卸载到后台"""
background_tasks.add_task(heavy_processing_task)
return {"status": "processing started"}
Connection pooling
连接池
import aiohttp
class APIClient:
def init(self):
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession(
connector=aiohttp.TCPConnector(limit=100)
)
return self
async def __aexit__(self, *args):
await self.session.close()
async def fetch(self, url: str):
async with self.session.get(url) as response:
return await response.json()undefinedimport aiohttp
class APIClient:
def init(self):
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession(
connector=aiohttp.TCPConnector(limit=100)
)
return self
async def __aexit__(self, *args):
await self.session.close()
async def fetch(self, url: str):
async with self.session.get(url) as response:
return await response.json()undefinedLoad Testing
负载测试
python
from locust import HttpUser, task, between
class PerformanceTest(HttpUser):
wait_time = between(1, 3)
@task(3)
def get_users(self):
"""High frequency endpoint"""
self.client.get("/api/users")
@task(1)
def create_user(self):
"""Lower frequency endpoint"""
self.client.post("/api/users", json={
"email": "test@example.com",
"name": "Test User"
})
def on_start(self):
"""Login once per user"""
response = self.client.post("/api/login", json={
"username": "test",
"password": "password"
})
self.token = response.json()["token"]python
from locust import HttpUser, task, between
class PerformanceTest(HttpUser):
wait_time = between(1, 3)
@task(3)
def get_users(self):
"""高频端点"""
self.client.get("/api/users")
@task(1)
def create_user(self):
"""低频端点"""
self.client.post("/api/users", json={
"email": "test@example.com",
"name": "Test User"
})
def on_start(self):
"""每个用户登录一次"""
response = self.client.post("/api/login", json={
"username": "test",
"password": "password"
})
self.token = response.json()["token"]Best Practices
最佳实践
General
通用建议
- Measure before optimizing
- Focus on bottlenecks
- Use appropriate data structures
- Cache expensive computations
- Minimize I/O operations
- Use connection pooling
- Implement pagination
- 先测量再优化
- 聚焦瓶颈
- 使用合适的数据结构
- 缓存高开销计算
- 最小化I/O操作
- 使用连接池
- 实现分页
Database
数据库
- Create proper indexes
- Avoid N+1 queries
- Use eager loading
- Batch operations
- Optimize queries
- Use read replicas
- Implement caching
- 创建合适的索引
- 避免N+1查询
- 使用预加载
- 批量操作
- 优化查询
- 使用只读副本
- 实现缓存
Frontend
前端
- Minimize bundle size
- Code splitting
- Lazy loading
- Compress assets
- Use CDN
- Optimize images
- Debounce/throttle events
- 最小化包体积
- 代码分割
- 懒加载
- 压缩资源
- 使用CDN
- 优化图片
- 防抖/节流事件
Backend
后端
- Use async for I/O
- Implement caching
- Connection pooling
- Background processing
- Horizontal scaling
- Load balancing
- 针对I/O操作使用异步
- 实现缓存
- 连接池
- 后台处理
- 水平扩展
- 负载均衡
Anti-Patterns
反模式
❌ Premature optimization
❌ No profiling/measurement
❌ Optimizing wrong bottlenecks
❌ Ignoring caching
❌ Synchronous I/O operations
❌ No database indexes
❌ Loading all data at once
❌ 过早优化
❌ 未进行分析/测量
❌ 优化错误的瓶颈
❌ 忽略缓存
❌ 同步I/O操作
❌ 未创建数据库索引
❌ 一次性加载所有数据
Resources
参考资源
- Python Performance Tips: https://wiki.python.org/moin/PythonSpeed/PerformanceTips
- Web Performance: https://web.dev/performance/
- Locust Load Testing: https://locust.io/
- py-spy Profiler: https://github.com/benfred/py-spy
- Chrome DevTools: https://developer.chrome.com/docs/devtools/
- Python性能优化技巧: https://wiki.python.org/moin/PythonSpeed/PerformanceTips
- Web性能: https://web.dev/performance/
- Locust负载测试: https://locust.io/
- py-spy性能分析器: https://github.com/benfred/py-spy
- Chrome开发者工具: https://developer.chrome.com/docs/devtools/