performance-expert

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Performance Expert

性能优化专家

Expert guidance for performance optimization, profiling, benchmarking, and system tuning.
为性能优化、性能分析、基准测试与系统调优提供专家级指导。

Core Concepts

核心概念

Performance Fundamentals

性能基础

  • Response time vs throughput
  • Latency vs bandwidth
  • CPU, memory, I/O bottlenecks
  • Concurrency vs parallelism
  • Caching strategies
  • Load balancing
  • 响应时间 vs 吞吐量
  • 延迟 vs 带宽
  • CPU、内存、I/O瓶颈
  • 并发 vs 并行
  • 缓存策略
  • 负载均衡

Optimization Areas

优化领域

  • Algorithm optimization
  • Database optimization
  • Network optimization
  • Frontend performance
  • Backend performance
  • Infrastructure tuning
  • 算法优化
  • 数据库优化
  • 网络优化
  • 前端性能
  • 后端性能
  • 基础设施调优

Profiling Tools

性能分析工具

  • CPU profilers
  • Memory profilers
  • Network profilers
  • Application Performance Monitoring (APM)
  • Load testing tools
  • CPU分析器
  • 内存分析器
  • 网络分析器
  • 应用性能监控(APM)
  • 负载测试工具

Python Performance

Python性能优化

python
import cProfile
import pstats
import timeit
import memory_profiler
from functools import lru_cache
from typing import List
import numpy as np
python
import cProfile
import pstats
import timeit
import memory_profiler
from functools import lru_cache
from typing import List
import numpy as np

Performance Profiling

性能分析装饰器

def profile_function(func): """Decorator for profiling function execution""" def wrapper(*args, **kwargs): profiler = cProfile.Profile() profiler.enable()
    result = func(*args, **kwargs)

    profiler.disable()
    stats = pstats.Stats(profiler)
    stats.sort_stats('cumulative')
    stats.print_stats(10)  # Top 10 functions

    return result
return wrapper
@profile_function def slow_function(): total = 0 for i in range(1000000): total += i return total
def profile_function(func): """用于分析函数执行的装饰器""" def wrapper(*args, **kwargs): profiler = cProfile.Profile() profiler.enable()
    result = func(*args, **kwargs)

    profiler.disable()
    stats = pstats.Stats(profiler)
    stats.sort_stats('cumulative')
    stats.print_stats(10)  # 输出前10个函数

    return result
return wrapper
@profile_function def slow_function(): total = 0 for i in range(1000000): total += i return total

Memoization for expensive computations

高开销计算的记忆化

@lru_cache(maxsize=128) def fibonacci(n: int) -> int: """Cached Fibonacci calculation""" if n < 2: return n return fibonacci(n-1) + fibonacci(n-2)
@lru_cache(maxsize=128) def fibonacci(n: int) -> int: """带缓存的斐波那契计算""" if n < 2: return n return fibonacci(n-1) + fibonacci(n-2)

Vectorization with NumPy

使用NumPy向量化

def slow_loop(data: List[float]) -> List[float]: """Slow: Using Python loops""" return [x ** 2 + 2 * x + 1 for x in data]
def fast_vectorized(data: np.ndarray) -> np.ndarray: """Fast: Using NumPy vectorization""" return data ** 2 + 2 * data + 1
def slow_loop(data: List[float]) -> List[float]: """低效:使用Python循环""" return [x ** 2 + 2 * x + 1 for x in data]
def fast_vectorized(data: np.ndarray) -> np.ndarray: """高效:使用NumPy向量化""" return data ** 2 + 2 * data + 1

Benchmarking

基准测试

def benchmark_function(func, *args, iterations=1000): """Benchmark function execution time""" total_time = timeit.timeit( lambda: func(*args), number=iterations ) avg_time = total_time / iterations
return {
    'total_time': total_time,
    'avg_time': avg_time,
    'iterations': iterations
}
def benchmark_function(func, *args, iterations=1000): """基准测试函数执行时间""" total_time = timeit.timeit( lambda: func(*args), number=iterations ) avg_time = total_time / iterations
return {
    'total_time': total_time,
    'avg_time': avg_time,
    'iterations': iterations
}

Memory profiling

内存分析

@memory_profiler.profile def memory_intensive_function(): """Function that uses significant memory""" data = [i for i in range(1000000)] return sum(data)
@memory_profiler.profile def memory_intensive_function(): """占用大量内存的函数""" data = [i for i in range(1000000)] return sum(data)

Efficient string concatenation

高效字符串拼接

def slow_string_concat(items: List[str]) -> str: """Slow: String concatenation in loop""" result = "" for item in items: result += item # Creates new string each time return result
def fast_string_concat(items: List[str]) -> str: """Fast: Using join""" return "".join(items)
def slow_string_concat(items: List[str]) -> str: """低效:循环中拼接字符串""" result = "" for item in items: result += item # 每次都会创建新字符串 return result
def fast_string_concat(items: List[str]) -> str: """高效:使用join方法""" return "".join(items)

Generator for memory efficiency

内存高效的生成器

def slow_list_comprehension(n: int) -> List[int]: """Returns all squares at once""" return [i ** 2 for i in range(n)]
def fast_generator(n: int): """Yields squares one at a time""" for i in range(n): yield i ** 2
undefined
def slow_list_comprehension(n: int) -> List[int]: """一次性返回所有平方数""" return [i ** 2 for i in range(n)]
def fast_generator(n: int): """逐个生成平方数""" for i in range(n): yield i ** 2
undefined

Database Optimization

数据库优化

python
from sqlalchemy import create_engine, Index, text
from sqlalchemy.orm import sessionmaker
import redis
python
from sqlalchemy import create_engine, Index, text
from sqlalchemy.orm import sessionmaker
import redis

Connection pooling

连接池

engine = create_engine( 'postgresql://user:pass@localhost/db', pool_size=20, max_overflow=0, pool_pre_ping=True, pool_recycle=3600 )
engine = create_engine( 'postgresql://user:pass@localhost/db', pool_size=20, max_overflow=0, pool_pre_ping=True, pool_recycle=3600 )

Query optimization

查询优化

class DatabaseOptimizer: def init(self, session): self.session = session
def bad_n_plus_one(self):
    """N+1 query problem"""
    users = self.session.query(User).all()
    for user in users:
        posts = user.posts  # Triggers additional query per user
        print(f"{user.name}: {len(posts)} posts")

def good_eager_loading(self):
    """Eager loading to avoid N+1"""
    from sqlalchemy.orm import joinedload

    users = self.session.query(User)\
        .options(joinedload(User.posts))\
        .all()

    for user in users:
        posts = user.posts  # No additional query
        print(f"{user.name}: {len(posts)} posts")

def use_indexes(self):
    """Create indexes for frequent queries"""
    # Index on frequently queried columns
    Index('idx_user_email', User.email)
    Index('idx_post_created_at', Post.created_at)

    # Composite index
    Index('idx_post_user_status', Post.user_id, Post.status)

def batch_operations(self, items: List[dict]):
    """Batch insert instead of individual inserts"""
    # Bad: Individual inserts
    # for item in items:
    #     self.session.add(User(**item))
    #     self.session.commit()

    # Good: Batch insert
    self.session.bulk_insert_mappings(User, items)
    self.session.commit()

def use_pagination(self, page: int = 1, page_size: int = 20):
    """Paginate large result sets"""
    offset = (page - 1) * page_size

    return self.session.query(User)\
        .order_by(User.created_at.desc())\
        .limit(page_size)\
        .offset(offset)\
        .all()
class DatabaseOptimizer: def init(self, session): self.session = session
def bad_n_plus_one(self):
    """N+1查询问题"""
    users = self.session.query(User).all()
    for user in users:
        posts = user.posts  # 每个用户都会触发额外查询
        print(f"{user.name}: {len(posts)} posts")

def good_eager_loading(self):
    """预加载避免N+1问题"""
    from sqlalchemy.orm import joinedload

    users = self.session.query(User)\
        .options(joinedload(User.posts))\
        .all()

    for user in users:
        posts = user.posts  # 无额外查询
        print(f"{user.name}: {len(posts)} posts")

def use_indexes(self):
    """为频繁查询创建索引"""
    # 为频繁查询的列创建索引
    Index('idx_user_email', User.email)
    Index('idx_post_created_at', Post.created_at)

    # 复合索引
    Index('idx_post_user_status', Post.user_id, Post.status)

def batch_operations(self, items: List[dict]):
    """批量插入而非单个插入"""
    # 低效:单个插入
    # for item in items:
    #     self.session.add(User(**item))
    #     self.session.commit()

    # 高效:批量插入
    self.session.bulk_insert_mappings(User, items)
    self.session.commit()

def use_pagination(self, page: int = 1, page_size: int = 20):
    """对大型结果集进行分页"""
    offset = (page - 1) * page_size

    return self.session.query(User)\
        .order_by(User.created_at.desc())\
        .limit(page_size)\
        .offset(offset)\
        .all()

Redis caching

Redis缓存

class CacheOptimizer: def init(self, redis_client: redis.Redis): self.redis = redis_client
def cache_query_result(self, key: str, query_func, ttl: int = 3600):
    """Cache database query results"""
    # Check cache first
    cached = self.redis.get(key)
    if cached:
        return json.loads(cached)

    # Execute query
    result = query_func()

    # Cache result
    self.redis.setex(key, ttl, json.dumps(result))

    return result

def cache_aside_pattern(self, key: str, fetch_func, ttl: int = 3600):
    """Implement cache-aside pattern"""
    data = self.redis.get(key)

    if data is None:
        data = fetch_func()
        self.redis.setex(key, ttl, json.dumps(data))
    else:
        data = json.loads(data)

    return data
undefined
class CacheOptimizer: def init(self, redis_client: redis.Redis): self.redis = redis_client
def cache_query_result(self, key: str, query_func, ttl: int = 3600):
    """缓存数据库查询结果"""
    # 先检查缓存
    cached = self.redis.get(key)
    if cached:
        return json.loads(cached)

    # 执行查询
    result = query_func()

    # 缓存结果
    self.redis.setex(key, ttl, json.dumps(result))

    return result

def cache_aside_pattern(self, key: str, fetch_func, ttl: int = 3600):
    """实现旁路缓存模式"""
    data = self.redis.get(key)

    if data is None:
        data = fetch_func()
        self.redis.setex(key, ttl, json.dumps(data))
    else:
        data = json.loads(data)

    return data
undefined

Frontend Performance

前端性能优化

javascript
// Debouncing for expensive operations
function debounce(func, wait) {
    let timeout;
    return function executedFunction(...args) {
        const later = () => {
            clearTimeout(timeout);
            func(...args);
        };
        clearTimeout(timeout);
        timeout = setTimeout(later, wait);
    };
}

// Example: Debounce search input
const searchInput = document.getElementById('search');
const debouncedSearch = debounce((query) => {
    // Expensive search operation
    fetchSearchResults(query);
}, 300);

searchInput.addEventListener('input', (e) => {
    debouncedSearch(e.target.value);
});

// Lazy loading images
const lazyLoadImages = () => {
    const images = document.querySelectorAll('img[data-src]');

    const imageObserver = new IntersectionObserver((entries) => {
        entries.forEach(entry => {
            if (entry.isIntersecting) {
                const img = entry.target;
                img.src = img.dataset.src;
                img.removeAttribute('data-src');
                imageObserver.unobserve(img);
            }
        });
    });

    images.forEach(img => imageObserver.observe(img));
};

// Virtual scrolling for large lists
class VirtualScroller {
    constructor(container, items, itemHeight) {
        this.container = container;
        this.items = items;
        this.itemHeight = itemHeight;
        this.visibleItems = Math.ceil(container.clientHeight / itemHeight);
        this.render();
    }

    render() {
        const scrollTop = this.container.scrollTop;
        const startIndex = Math.floor(scrollTop / this.itemHeight);
        const endIndex = startIndex + this.visibleItems;

        // Only render visible items
        const visibleData = this.items.slice(startIndex, endIndex);

        this.container.innerHTML = visibleData
            .map(item => `<div style="height: ${this.itemHeight}px">${item}</div>`)
            .join('');
    }
}

// Code splitting with dynamic imports
async function loadModule() {
    const module = await import('./heavy-module.js');
    module.init();
}
javascript
// 针对高开销操作的防抖处理
function debounce(func, wait) {
    let timeout;
    return function executedFunction(...args) {
        const later = () => {
            clearTimeout(timeout);
            func(...args);
        };
        clearTimeout(timeout);
        timeout = setTimeout(later, wait);
    };
}

// 示例:搜索输入防抖
const searchInput = document.getElementById('search');
const debouncedSearch = debounce((query) => {
    // 高开销搜索操作
    fetchSearchResults(query);
}, 300);

searchInput.addEventListener('input', (e) => {
    debouncedSearch(e.target.value);
});

// 图片懒加载
const lazyLoadImages = () => {
    const images = document.querySelectorAll('img[data-src]');

    const imageObserver = new IntersectionObserver((entries) => {
        entries.forEach(entry => {
            if (entry.isIntersecting) {
                const img = entry.target;
                img.src = img.dataset.src;
                img.removeAttribute('data-src');
                imageObserver.unobserve(img);
            }
        });
    });

    images.forEach(img => imageObserver.observe(img));
};

// 大型列表虚拟滚动
class VirtualScroller {
    constructor(container, items, itemHeight) {
        this.container = container;
        this.items = items;
        this.itemHeight = itemHeight;
        this.visibleItems = Math.ceil(container.clientHeight / itemHeight);
        this.render();
    }

    render() {
        const scrollTop = this.container.scrollTop;
        const startIndex = Math.floor(scrollTop / this.itemHeight);
        const endIndex = startIndex + this.visibleItems;

        // 仅渲染可见项
        const visibleData = this.items.slice(startIndex, endIndex);

        this.container.innerHTML = visibleData
            .map(item => `<div style="height: ${this.itemHeight}px">${item}</div>`)
            .join('');
    }
}

// 动态导入实现代码分割
async function loadModule() {
    const module = await import('./heavy-module.js');
    module.init();
}

API Performance

API性能优化

python
from fastapi import FastAPI, BackgroundTasks
from fastapi.responses import StreamingResponse
import asyncio
from typing import AsyncGenerator

app = FastAPI()
python
from fastapi import FastAPI, BackgroundTasks
from fastapi.responses import StreamingResponse
import asyncio
from typing import AsyncGenerator

app = FastAPI()

Async endpoints for I/O bound operations

针对I/O密集型操作的异步端点

@app.get("/users/{user_id}") async def get_user(user_id: str): """Async endpoint for database queries""" user = await db.fetch_user(user_id) return user
@app.get("/users/{user_id}") async def get_user(user_id: str): """用于数据库查询的异步端点""" user = await db.fetch_user(user_id) return user

Streaming responses for large data

大型数据流式响应

async def generate_large_file() -> AsyncGenerator[bytes, None]: """Stream large file in chunks""" chunk_size = 8192 with open("large_file.csv", "rb") as f: while chunk := f.read(chunk_size): yield chunk await asyncio.sleep(0) # Allow other tasks to run
@app.get("/download") async def download_large_file(): return StreamingResponse( generate_large_file(), media_type="text/csv" )
async def generate_large_file() -> AsyncGenerator[bytes, None]: """分块流式传输大型文件""" chunk_size = 8192 with open("large_file.csv", "rb") as f: while chunk := f.read(chunk_size): yield chunk await asyncio.sleep(0) # 允许其他任务执行
@app.get("/download") async def download_large_file(): return StreamingResponse( generate_large_file(), media_type="text/csv" )

Background tasks for long-running operations

后台任务处理长时间运行的操作

@app.post("/process") async def process_data(background_tasks: BackgroundTasks): """Offload heavy processing to background""" background_tasks.add_task(heavy_processing_task) return {"status": "processing started"}
@app.post("/process") async def process_data(background_tasks: BackgroundTasks): """将重处理任务卸载到后台""" background_tasks.add_task(heavy_processing_task) return {"status": "processing started"}

Connection pooling

连接池

import aiohttp
class APIClient: def init(self): self.session = None
async def __aenter__(self):
    self.session = aiohttp.ClientSession(
        connector=aiohttp.TCPConnector(limit=100)
    )
    return self

async def __aexit__(self, *args):
    await self.session.close()

async def fetch(self, url: str):
    async with self.session.get(url) as response:
        return await response.json()
undefined
import aiohttp
class APIClient: def init(self): self.session = None
async def __aenter__(self):
    self.session = aiohttp.ClientSession(
        connector=aiohttp.TCPConnector(limit=100)
    )
    return self

async def __aexit__(self, *args):
    await self.session.close()

async def fetch(self, url: str):
    async with self.session.get(url) as response:
        return await response.json()
undefined

Load Testing

负载测试

python
from locust import HttpUser, task, between

class PerformanceTest(HttpUser):
    wait_time = between(1, 3)

    @task(3)
    def get_users(self):
        """High frequency endpoint"""
        self.client.get("/api/users")

    @task(1)
    def create_user(self):
        """Lower frequency endpoint"""
        self.client.post("/api/users", json={
            "email": "test@example.com",
            "name": "Test User"
        })

    def on_start(self):
        """Login once per user"""
        response = self.client.post("/api/login", json={
            "username": "test",
            "password": "password"
        })
        self.token = response.json()["token"]
python
from locust import HttpUser, task, between

class PerformanceTest(HttpUser):
    wait_time = between(1, 3)

    @task(3)
    def get_users(self):
        """高频端点"""
        self.client.get("/api/users")

    @task(1)
    def create_user(self):
        """低频端点"""
        self.client.post("/api/users", json={
            "email": "test@example.com",
            "name": "Test User"
        })

    def on_start(self):
        """每个用户登录一次"""
        response = self.client.post("/api/login", json={
            "username": "test",
            "password": "password"
        })
        self.token = response.json()["token"]

Best Practices

最佳实践

General

通用建议

  • Measure before optimizing
  • Focus on bottlenecks
  • Use appropriate data structures
  • Cache expensive computations
  • Minimize I/O operations
  • Use connection pooling
  • Implement pagination
  • 先测量再优化
  • 聚焦瓶颈
  • 使用合适的数据结构
  • 缓存高开销计算
  • 最小化I/O操作
  • 使用连接池
  • 实现分页

Database

数据库

  • Create proper indexes
  • Avoid N+1 queries
  • Use eager loading
  • Batch operations
  • Optimize queries
  • Use read replicas
  • Implement caching
  • 创建合适的索引
  • 避免N+1查询
  • 使用预加载
  • 批量操作
  • 优化查询
  • 使用只读副本
  • 实现缓存

Frontend

前端

  • Minimize bundle size
  • Code splitting
  • Lazy loading
  • Compress assets
  • Use CDN
  • Optimize images
  • Debounce/throttle events
  • 最小化包体积
  • 代码分割
  • 懒加载
  • 压缩资源
  • 使用CDN
  • 优化图片
  • 防抖/节流事件

Backend

后端

  • Use async for I/O
  • Implement caching
  • Connection pooling
  • Background processing
  • Horizontal scaling
  • Load balancing
  • 针对I/O操作使用异步
  • 实现缓存
  • 连接池
  • 后台处理
  • 水平扩展
  • 负载均衡

Anti-Patterns

反模式

❌ Premature optimization ❌ No profiling/measurement ❌ Optimizing wrong bottlenecks ❌ Ignoring caching ❌ Synchronous I/O operations ❌ No database indexes ❌ Loading all data at once
❌ 过早优化 ❌ 未进行分析/测量 ❌ 优化错误的瓶颈 ❌ 忽略缓存 ❌ 同步I/O操作 ❌ 未创建数据库索引 ❌ 一次性加载所有数据

Resources

参考资源