rest-api-design-patterns

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

REST API Design Patterns

REST API设计模式

A comprehensive skill for designing, implementing, and maintaining RESTful APIs. Master resource modeling, HTTP methods, versioning strategies, pagination, filtering, error handling, and best practices for building scalable, maintainable APIs using FastAPI, Express.js, and modern frameworks.
这是一份关于设计、实现和维护RESTful API的综合指南。掌握资源建模、HTTP方法、版本控制策略、分页、过滤、错误处理,以及使用FastAPI、Express.js和现代框架构建可扩展、可维护API的最佳实践。

When to Use This Skill

何时使用此技能

Use this skill when:
  • Designing a new RESTful API from scratch
  • Building microservices with HTTP/REST interfaces
  • Refactoring existing APIs for better design and consistency
  • Implementing CRUD operations with proper HTTP semantics
  • Adding versioning to an existing API
  • Designing resource relationships and nested endpoints
  • Implementing pagination, filtering, and sorting
  • Handling errors and validation consistently
  • Building hypermedia-driven APIs (HATEOAS)
  • Optimizing API performance with caching and compression
  • Documenting APIs with OpenAPI/Swagger specifications
  • Ensuring API security with authentication and authorization patterns
在以下场景中使用此技能:
  • 从零开始设计新的RESTful API
  • 构建带有HTTP/REST接口的微服务
  • 重构现有API以提升设计一致性
  • 使用正确的HTTP语义实现CRUD操作
  • 为现有API添加版本控制
  • 设计资源关系和嵌套端点
  • 实现分页、过滤和排序功能
  • 统一处理错误和验证逻辑
  • 构建超媒体驱动的API(HATEOAS)
  • 通过缓存和压缩优化API性能
  • 使用OpenAPI/Swagger规范编写API文档
  • 通过认证和授权模式保障API安全

Core REST Principles

REST核心原则

What is REST?

什么是REST?

REST (Representational State Transfer) is an architectural style for distributed systems that emphasizes:
  1. Resource-Based: Everything is a resource with a unique identifier (URI)
  2. Standard Methods: Use standard HTTP methods (GET, POST, PUT, DELETE, PATCH)
  3. Stateless: Each request contains all information needed to process it
  4. Client-Server: Clear separation between client and server
  5. Cacheable: Responses can be cached for performance
  6. Uniform Interface: Consistent patterns across the API
REST(Representational State Transfer,表述性状态转移)是分布式系统的一种架构风格,强调:
  1. 基于资源:一切都是带有唯一标识符(URI)的资源
  2. 标准方法:使用标准HTTP方法(GET、POST、PUT、DELETE、PATCH)
  3. 无状态:每个请求包含处理所需的全部信息
  4. 客户端-服务端分离:客户端与服务端职责清晰分离
  5. 可缓存:响应可被缓存以提升性能
  6. 统一接口:API采用一致的设计模式

REST Maturity Model (Richardson Maturity Model)

REST成熟度模型(Richardson成熟度模型)

Level 0 - The Swamp of POX: Single URI, single HTTP method (usually POST)
  • Example:
    /api
    with all operations in POST body
Level 1 - Resources: Multiple URIs, each representing a resource
  • Example:
    /users
    ,
    /posts
    ,
    /products
Level 2 - HTTP Verbs: Proper use of HTTP methods
  • Example: GET
    /users/123
    , POST
    /users
    , PUT
    /users/123
Level 3 - Hypermedia Controls (HATEOAS): API responses include links to related resources
  • Example: Response includes
    "_links": {"self": "/users/123", "posts": "/users/123/posts"}
Level 0 - POX沼泽:单个URI,单个HTTP方法(通常为POST)
  • 示例:
    /api
    ,所有操作都在POST请求体中
Level 1 - 资源:多个URI,每个代表一个资源
  • 示例:
    /users
    /posts
    /products
Level 2 - HTTP动词:正确使用HTTP方法
  • 示例:GET
    /users/123
    、POST
    /users
    、PUT
    /users/123
Level 3 - 超媒体控制(HATEOAS):API响应中包含关联资源的链接
  • 示例:响应包含
    "_links": {"self": "/users/123", "posts": "/users/123/posts"}

Resource Modeling

资源建模

Resource Naming Conventions

资源命名规范

1. Use Nouns, Not Verbs
Good:
  GET /users
  GET /products
  POST /orders

Bad:
  GET /getUsers
  GET /getAllProducts
  POST /createOrder
2. Use Plural Nouns for Collections
Good:
  GET /users          # Collection
  GET /users/123      # Individual resource

Bad:
  GET /user
  GET /user/123
3. Use Lowercase and Hyphens
Good:
  /user-profiles
  /order-items
  /payment-methods

Bad:
  /userProfiles
  /OrderItems
  /payment_methods
4. Hierarchy for Related Resources
Good:
  /users/123/posts
  /users/123/posts/456
  /users/123/posts/456/comments

Avoid Deep Nesting (max 2-3 levels):
  /organizations/1/departments/2/teams/3/members/4/tasks/5  # Too deep!
1. 使用名词,而非动词
推荐:
  GET /users
  GET /products
  POST /orders

不推荐:
  GET /getUsers
  GET /getAllProducts
  POST /createOrder
2. 集合资源使用复数名词
推荐:
  GET /users          # 集合
  GET /users/123      # 单个资源

不推荐:
  GET /user
  GET /user/123
3. 使用小写字母和连字符
推荐:
  /user-profiles
  /order-items
  /payment-methods

不推荐:
  /userProfiles
  /OrderItems
  /payment_methods
4. 使用层级结构表示关联资源
推荐:
  /users/123/posts
  /users/123/posts/456
  /users/123/posts/456/comments

避免过深嵌套(最多2-3层):
  /organizations/1/departments/2/teams/3/members/4/tasks/5  # 嵌套过深!

Resource Design Patterns

资源设计模式

Pattern 1: Collection and Item Resources

模式1:集合与单个资源

Collection Resource:
  GET    /products          # List all products
  POST   /products          # Create new product

Item Resource:
  GET    /products/123      # Get specific product
  PUT    /products/123      # Replace product (full update)
  PATCH  /products/123      # Partial update
  DELETE /products/123      # Delete product
集合资源:
  GET    /products          # 列出所有产品
  POST   /products          # 创建新产品

单个资源:
  GET    /products/123      # 获取指定产品
  PUT    /products/123      # 替换产品(全量更新)
  PATCH  /products/123      # 部分更新
  DELETE /products/123      # 删除产品

Pattern 2: Nested Resources (Parent-Child Relationships)

模式2:嵌套资源(父子关系)

undefined
undefined

Comments belong to posts

评论属于帖子

GET /posts/42/comments # List comments for post 42 POST /posts/42/comments # Create comment on post 42 GET /posts/42/comments/7 # Get specific comment DELETE /posts/42/comments/7 # Delete specific comment
GET /posts/42/comments # 列出帖子42的所有评论 POST /posts/42/comments # 在帖子42下创建评论 GET /posts/42/comments/7 # 获取指定评论 DELETE /posts/42/comments/7 # 删除指定评论

Alternative for accessing comments directly

直接访问评论的替代方式

GET /comments/7 # Get comment by ID (if you have it)
undefined
GET /comments/7 # 通过ID获取评论(如果已知ID)
undefined

Pattern 3: Filtering Collections (Query Parameters)

模式3:集合过滤(查询参数)

GET /products?category=electronics
GET /products?price_min=100&price_max=500
GET /products?sort=price&order=desc
GET /users?status=active&role=admin
GET /posts?author=123&published=true
GET /products?category=electronics
GET /products?price_min=100&price_max=500
GET /products?sort=price&order=desc
GET /users?status=active&role=admin
GET /posts?author=123&published=true

Pattern 4: Actions on Resources (Controllers)

模式4:资源操作(控制器)

For operations that don't fit standard CRUD:
POST /users/123/activate          # Activate user account
POST /orders/456/cancel           # Cancel order
POST /payments/789/refund         # Refund payment
POST /documents/321/publish       # Publish document
POST /subscriptions/654/renew     # Renew subscription
针对不适合标准CRUD的操作:
POST /users/123/activate          # 激活用户账号
POST /orders/456/cancel           # 取消订单
POST /payments/789/refund         # 退款
POST /documents/321/publish       # 发布文档
POST /subscriptions/654/renew     # 续订订阅

Pattern 5: Bulk Operations

模式5:批量操作

POST /users/bulk-create           # Create multiple users
PATCH /products/bulk-update       # Update multiple products
DELETE /orders/bulk-delete        # Delete multiple orders
POST /users/bulk-create           # 批量创建用户
PATCH /products/bulk-update       # 批量更新产品
DELETE /orders/bulk-delete        # 批量删除订单

Or using query parameters

或使用查询参数

DELETE /orders?ids=1,2,3,4,5
undefined
DELETE /orders?ids=1,2,3,4,5
undefined

HTTP Methods Deep Dive

HTTP方法深入解析

GET - Retrieve Resources

GET - 获取资源

Characteristics:
  • Safe: No side effects
  • Idempotent: Multiple identical requests have the same effect
  • Cacheable: Responses can be cached
FastAPI Example:
python
from fastapi import FastAPI, HTTPException
from typing import List, Optional

app = FastAPI()
特性:
  • 安全:无副作用
  • 幂等:多次相同请求效果一致
  • 可缓存:响应可被缓存
FastAPI示例:
python
from fastapi import FastAPI, HTTPException
from typing import List, Optional

app = FastAPI()

Collection endpoint

集合端点

@app.get("/items/") async def list_items( skip: int = 0, limit: int = 10, category: Optional[str] = None ) -> List[dict]: """List items with pagination and filtering.""" # Filter and paginate items = get_items_from_db(skip=skip, limit=limit, category=category) return items
@app.get("/items/") async def list_items( skip: int = 0, limit: int = 10, category: Optional[str] = None ) -> List[dict]: """带分页和过滤的物品列表。""" # 过滤和分页 items = get_items_from_db(skip=skip, limit=limit, category=category) return items

Individual resource endpoint

单个资源端点

@app.get("/items/{item_id}") async def get_item(item_id: int) -> dict: """Get a specific item by ID.""" item = get_item_from_db(item_id) if not item: raise HTTPException(status_code=404, detail="Item not found") return item

**Express.js Example:**

```javascript
const express = require('express');
const app = express();

// Collection endpoint
app.get('/items', async (req, res) => {
  try {
    const { skip = 0, limit = 10, category } = req.query;
    const items = await getItemsFromDB({ skip, limit, category });
    res.json(items);
  } catch (error) {
    res.status(500).json({ error: 'Internal server error' });
  }
});

// Individual resource endpoint
app.get('/items/:id', async (req, res) => {
  try {
    const item = await getItemFromDB(req.params.id);
    if (!item) {
      return res.status(404).json({ error: 'Item not found' });
    }
    res.json(item);
  } catch (error) {
    res.status(500).json({ error: 'Internal server error' });
  }
});
@app.get("/items/{item_id}") async def get_item(item_id: int) -> dict: """通过ID获取指定物品。""" item = get_item_from_db(item_id) if not item: raise HTTPException(status_code=404, detail="Item not found") return item

**Express.js示例:**

```javascript
const express = require('express');
const app = express();

// 集合端点
app.get('/items', async (req, res) => {
  try {
    const { skip = 0, limit = 10, category } = req.query;
    const items = await getItemsFromDB({ skip, limit, category });
    res.json(items);
  } catch (error) {
    res.status(500).json({ error: 'Internal server error' });
  }
});

// 单个资源端点
app.get('/items/:id', async (req, res) => {
  try {
    const item = await getItemFromDB(req.params.id);
    if (!item) {
      return res.status(404).json({ error: 'Item not found' });
    }
    res.json(item);
  } catch (error) {
    res.status(500).json({ error: 'Internal server error' });
  }
});

POST - Create Resources

POST - 创建资源

Characteristics:
  • Not safe: Has side effects (creates resource)
  • Not idempotent: Multiple requests create multiple resources
  • Response should include
    Location
    header with new resource URI
FastAPI Example:
python
from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel

class ItemCreate(BaseModel):
    name: str
    price: float
    category: str
    description: Optional[str] = None

@app.post("/items/", status_code=status.HTTP_201_CREATED)
async def create_item(item: ItemCreate, response: Response) -> dict:
    """Create a new item."""
    # Validate and create
    new_item = create_item_in_db(item)

    # Set Location header
    response.headers["Location"] = f"/items/{new_item.id}"

    return new_item
Express.js Example:
javascript
app.use(express.json());

app.post('/items', async (req, res) => {
  try {
    const { name, price, category, description } = req.body;

    // Validate
    if (!name || !price || !category) {
      return res.status(400).json({
        error: 'Missing required fields: name, price, category'
      });
    }

    // Create resource
    const newItem = await createItemInDB({ name, price, category, description });

    // Set Location header and return 201
    res.location(`/items/${newItem.id}`)
       .status(201)
       .json(newItem);
  } catch (error) {
    res.status(500).json({ error: 'Internal server error' });
  }
});
特性:
  • 不安全:有副作用(创建资源)
  • 非幂等:多次请求会创建多个资源
  • 响应应包含
    Location
    头,指向新资源的URI
FastAPI示例:
python
from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel

class ItemCreate(BaseModel):
    name: str
    price: float
    category: str
    description: Optional[str] = None

@app.post("/items/", status_code=status.HTTP_201_CREATED)
async def create_item(item: ItemCreate, response: Response) -> dict:
    """创建新物品。"""
    # 验证并创建
    new_item = create_item_in_db(item)

    # 设置Location头
    response.headers["Location"] = f"/items/{new_item.id}"

    return new_item
Express.js示例:
javascript
app.use(express.json());

app.post('/items', async (req, res) => {
  try {
    const { name, price, category, description } = req.body;

    // 验证
    if (!name || !price || !category) {
      return res.status(400).json({
        error: 'Missing required fields: name, price, category'
      });
    }

    // 创建资源
    const newItem = await createItemInDB({ name, price, category, description });

    // 设置Location头并返回201
    res.location(`/items/${newItem.id}`)
       .status(201)
       .json(newItem);
  } catch (error) {
    res.status(500).json({ error: 'Internal server error' });
  }
});

PUT - Replace Resource

PUT - 替换资源

Characteristics:
  • Not safe: Has side effects
  • Idempotent: Multiple identical requests have the same effect
  • Replaces entire resource (all fields required)
FastAPI Example:
python
class ItemUpdate(BaseModel):
    name: str
    price: float
    category: str
    description: str

@app.put("/items/{item_id}")
async def replace_item(item_id: int, item: ItemUpdate) -> dict:
    """Replace an entire item (all fields required)."""
    existing_item = get_item_from_db(item_id)
    if not existing_item:
        raise HTTPException(status_code=404, detail="Item not found")

    # Replace entire resource
    updated_item = replace_item_in_db(item_id, item)
    return updated_item
Express.js Example:
javascript
app.put('/items/:id', async (req, res) => {
  try {
    const { name, price, category, description } = req.body;

    // All fields required for PUT
    if (!name || !price || !category || description === undefined) {
      return res.status(400).json({
        error: 'PUT requires all fields: name, price, category, description'
      });
    }

    const existingItem = await getItemFromDB(req.params.id);
    if (!existingItem) {
      return res.status(404).json({ error: 'Item not found' });
    }

    // Replace entire resource
    const updatedItem = await replaceItemInDB(req.params.id, req.body);
    res.json(updatedItem);
  } catch (error) {
    res.status(500).json({ error: 'Internal server error' });
  }
});
特性:
  • 不安全:有副作用
  • 幂等:多次相同请求效果一致
  • 替换整个资源(需要所有字段)
FastAPI示例:
python
class ItemUpdate(BaseModel):
    name: str
    price: float
    category: str
    description: str

@app.put("/items/{item_id}")
async def replace_item(item_id: int, item: ItemUpdate) -> dict:
    """替换整个物品(需要所有字段)。"""
    existing_item = get_item_from_db(item_id)
    if not existing_item:
        raise HTTPException(status_code=404, detail="Item not found")

    # 替换整个资源
    updated_item = replace_item_in_db(item_id, item)
    return updated_item
Express.js示例:
javascript
app.put('/items/:id', async (req, res) => {
  try {
    const { name, price, category, description } = req.body;

    // PUT需要所有字段
    if (!name || !price || !category || description === undefined) {
      return res.status(400).json({
        error: 'PUT requires all fields: name, price, category, description'
      });
    }

    const existingItem = await getItemFromDB(req.params.id);
    if (!existingItem) {
      return res.status(404).json({ error: 'Item not found' });
    }

    // 替换整个资源
    const updatedItem = await replaceItemInDB(req.params.id, req.body);
    res.json(updatedItem);
  } catch (error) {
    res.status(500).json({ error: 'Internal server error' });
  }
});

PATCH - Partial Update

PATCH - 部分更新

Characteristics:
  • Not safe: Has side effects
  • Idempotent: Multiple identical requests have the same effect
  • Updates only specified fields (partial update)
FastAPI Example:
python
class ItemPatch(BaseModel):
    name: Optional[str] = None
    price: Optional[float] = None
    category: Optional[str] = None
    description: Optional[str] = None

@app.patch("/items/{item_id}")
async def update_item(item_id: int, item: ItemPatch) -> dict:
    """Partially update an item (only provided fields)."""
    existing_item = get_item_from_db(item_id)
    if not existing_item:
        raise HTTPException(status_code=404, detail="Item not found")

    # Update only provided fields
    update_data = item.model_dump(exclude_unset=True)
    updated_item = update_item_in_db(item_id, update_data)
    return updated_item
Express.js Example:
javascript
app.patch('/items/:id', async (req, res) => {
  try {
    const existingItem = await getItemFromDB(req.params.id);
    if (!existingItem) {
      return res.status(404).json({ error: 'Item not found' });
    }

    // Update only provided fields
    const updatedItem = await updateItemInDB(req.params.id, req.body);
    res.json(updatedItem);
  } catch (error) {
    res.status(500).json({ error: 'Internal server error' });
  }
});
特性:
  • 不安全:有副作用
  • 幂等:多次相同请求效果一致
  • 仅更新指定字段(部分更新)
FastAPI示例:
python
class ItemPatch(BaseModel):
    name: Optional[str] = None
    price: Optional[float] = None
    category: Optional[str] = None
    description: Optional[str] = None

@app.patch("/items/{item_id}")
async def update_item(item_id: int, item: ItemPatch) -> dict:
    """部分更新物品(仅更新提供的字段)。"""
    existing_item = get_item_from_db(item_id)
    if not existing_item:
        raise HTTPException(status_code=404, detail="Item not found")

    # 仅更新提供的字段
    update_data = item.model_dump(exclude_unset=True)
    updated_item = update_item_in_db(item_id, update_data)
    return updated_item
Express.js示例:
javascript
app.patch('/items/:id', async (req, res) => {
  try {
    const existingItem = await getItemFromDB(req.params.id);
    if (!existingItem) {
      return res.status(404).json({ error: 'Item not found' });
    }

    // 仅更新提供的字段
    const updatedItem = await updateItemInDB(req.params.id, req.body);
    res.json(updatedItem);
  } catch (error) {
    res.status(500).json({ error: 'Internal server error' });
  }
});

DELETE - Remove Resource

DELETE - 删除资源

Characteristics:
  • Not safe: Has side effects
  • Idempotent: Multiple identical requests have the same effect
  • Returns 204 No Content or 200 OK with response body
FastAPI Example:
python
@app.delete("/items/{item_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_item(item_id: int):
    """Delete an item."""
    existing_item = get_item_from_db(item_id)
    if not existing_item:
        raise HTTPException(status_code=404, detail="Item not found")

    delete_item_from_db(item_id)
    return None  # 204 No Content
特性:
  • 不安全:有副作用
  • 幂等:多次相同请求效果一致
  • 返回204 No Content或带响应体的200 OK
FastAPI示例:
python
@app.delete("/items/{item_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_item(item_id: int):
    """删除物品。"""
    existing_item = get_item_from_db(item_id)
    if not existing_item:
        raise HTTPException(status_code=404, detail="Item not found")

    delete_item_from_db(item_id)
    return None  # 204 No Content

Alternative: Return deleted resource

替代方案:返回被删除的资源

@app.delete("/items/{item_id}") async def delete_item_with_response(item_id: int) -> dict: """Delete an item and return it.""" existing_item = get_item_from_db(item_id) if not existing_item: raise HTTPException(status_code=404, detail="Item not found")
delete_item_from_db(item_id)
return existing_item  # 200 OK with body

**Express.js Example:**

```javascript
// 204 No Content approach
app.delete('/items/:id', async (req, res) => {
  try {
    const existingItem = await getItemFromDB(req.params.id);
    if (!existingItem) {
      return res.status(404).json({ error: 'Item not found' });
    }

    await deleteItemFromDB(req.params.id);
    res.status(204).send();
  } catch (error) {
    res.status(500).json({ error: 'Internal server error' });
  }
});

// 200 OK with response body approach
app.delete('/items/:id', async (req, res) => {
  try {
    const existingItem = await getItemFromDB(req.params.id);
    if (!existingItem) {
      return res.status(404).json({ error: 'Item not found' });
    }

    await deleteItemFromDB(req.params.id);
    res.json({ message: 'Item deleted successfully', item: existingItem });
  } catch (error) {
    res.status(500).json({ error: 'Internal server error' });
  }
});
@app.delete("/items/{item_id}") async def delete_item_with_response(item_id: int) -> dict: """删除物品并返回该资源。""" existing_item = get_item_from_db(item_id) if not existing_item: raise HTTPException(status_code=404, detail="Item not found")
delete_item_from_db(item_id)
return existing_item  # 200 OK 带响应体

**Express.js示例:**

```javascript
// 204 No Content 方案
app.delete('/items/:id', async (req, res) => {
  try {
    const existingItem = await getItemFromDB(req.params.id);
    if (!existingItem) {
      return res.status(404).json({ error: 'Item not found' });
    }

    await deleteItemFromDB(req.params.id);
    res.status(204).send();
  } catch (error) {
    res.status(500).json({ error: 'Internal server error' });
  }
});

// 200 OK 带响应体方案
app.delete('/items/:id', async (req, res) => {
  try {
    const existingItem = await getItemFromDB(req.params.id);
    if (!existingItem) {
      return res.status(404).json({ error: 'Item not found' });
    }

    await deleteItemFromDB(req.params.id);
    res.json({ message: 'Item deleted successfully', item: existingItem });
  } catch (error) {
    res.status(500).json({ error: 'Internal server error' });
  }
});

API Versioning Strategies

API版本控制策略

Strategy 1: URI Versioning (Most Common)

策略1:URI版本控制(最常用)

Version in the URI path - clear, explicit, easy to understand.
Pros:
  • Explicit and visible
  • Easy to route to different code versions
  • Browser-friendly
  • Simple for documentation
Cons:
  • Creates multiple endpoints
  • Can lead to code duplication
  • URLs change between versions
FastAPI Implementation:
python
from fastapi import FastAPI, APIRouter

app = FastAPI()
在URI路径中包含版本 - 清晰、明确、易于理解。
优点:
  • 明确可见
  • 易于路由到不同代码版本
  • 浏览器友好
  • 文档编写简单
缺点:
  • 创建多个端点
  • 可能导致代码重复
  • 版本间URL会变化
FastAPI实现:
python
from fastapi import FastAPI, APIRouter

app = FastAPI()

Version 1 router

版本1路由

v1_router = APIRouter(prefix="/api/v1")
@v1_router.get("/users") async def get_users_v1(): return {"users": ["user1", "user2"], "version": "1.0"}
@v1_router.get("/users/{user_id}") async def get_user_v1(user_id: int): return {"id": user_id, "name": "John", "version": "1.0"}
v1_router = APIRouter(prefix="/api/v1")
@v1_router.get("/users") async def get_users_v1(): return {"users": ["user1", "user2"], "version": "1.0"}
@v1_router.get("/users/{user_id}") async def get_user_v1(user_id: int): return {"id": user_id, "name": "John", "version": "1.0"}

Version 2 router

版本2路由

v2_router = APIRouter(prefix="/api/v2")
@v2_router.get("/users") async def get_users_v2(limit: int = 10, offset: int = 0): """V2 adds pagination""" return { "users": ["user1", "user2"], "pagination": {"limit": limit, "offset": offset}, "version": "2.0" }
@v2_router.get("/users/{user_id}") async def get_user_v2(user_id: int): """V2 returns more fields""" return { "id": user_id, "name": "John", "email": "john@example.com", "created_at": "2024-01-01", "version": "2.0" }
app.include_router(v1_router) app.include_router(v2_router)

**Express.js Implementation:**

```javascript
const express = require('express');
const app = express();

// Version 1 routes
const v1Router = express.Router();

v1Router.get('/users', (req, res) => {
  res.json({ users: ['user1', 'user2'], version: '1.0' });
});

v1Router.get('/users/:id', (req, res) => {
  res.json({ id: req.params.id, name: 'John', version: '1.0' });
});

// Version 2 routes
const v2Router = express.Router();

v2Router.get('/users', (req, res) => {
  const { limit = 10, offset = 0 } = req.query;
  res.json({
    users: ['user1', 'user2'],
    pagination: { limit, offset },
    version: '2.0'
  });
});

v2Router.get('/users/:id', (req, res) => {
  res.json({
    id: req.params.id,
    name: 'John',
    email: 'john@example.com',
    created_at: '2024-01-01',
    version: '2.0'
  });
});

app.use('/api/v1', v1Router);
app.use('/api/v2', v2Router);
v2_router = APIRouter(prefix="/api/v2")
@v2_router.get("/users") async def get_users_v2(limit: int = 10, offset: int = 0): """V2新增分页""" return { "users": ["user1", "user2"], "pagination": {"limit": limit, "offset": offset}, "version": "2.0" }
@v2_router.get("/users/{user_id}") async def get_user_v2(user_id: int): """V2返回更多字段""" return { "id": user_id, "name": "John", "email": "john@example.com", "created_at": "2024-01-01", "version": "2.0" }
app.include_router(v1_router) app.include_router(v2_router)

**Express.js实现:**

```javascript
const express = require('express');
const app = express();

// 版本1路由
const v1Router = express.Router();

v1Router.get('/users', (req, res) => {
  res.json({ users: ['user1', 'user2'], version: '1.0' });
});

v1Router.get('/users/:id', (req, res) => {
  res.json({ id: req.params.id, name: 'John', version: '1.0' });
});

// 版本2路由
const v2Router = express.Router();

v2Router.get('/users', (req, res) => {
  const { limit = 10, offset = 0 } = req.query;
  res.json({
    users: ['user1', 'user2'],
    pagination: { limit, offset },
    version: '2.0'
  });
});

v2Router.get('/users/:id', (req, res) => {
  res.json({
    id: req.params.id,
    name: 'John',
    email: 'john@example.com',
    created_at: '2024-01-01',
    version: '2.0'
  });
});

app.use('/api/v1', v1Router);
app.use('/api/v2', v2Router);

Strategy 2: Header Versioning

策略2:Header版本控制

Version specified in custom header or Accept header.
Pros:
  • Clean URIs
  • No URL pollution
  • More "RESTful" (resources have single URI)
Cons:
  • Less visible
  • Harder to test in browser
  • More complex routing logic
FastAPI Implementation:
python
from fastapi import FastAPI, Header, HTTPException

app = FastAPI()

@app.get("/users")
async def get_users(api_version: str = Header(default="1.0", alias="X-API-Version")):
    """Handle multiple versions based on header"""
    if api_version == "1.0":
        return {"users": ["user1", "user2"], "version": "1.0"}
    elif api_version == "2.0":
        return {
            "users": ["user1", "user2"],
            "pagination": {"limit": 10, "offset": 0},
            "version": "2.0"
        }
    else:
        raise HTTPException(
            status_code=400,
            detail=f"Unsupported API version: {api_version}"
        )

@app.get("/users/{user_id}")
async def get_user(
    user_id: int,
    api_version: str = Header(default="1.0", alias="X-API-Version")
):
    """User endpoint with version handling"""
    if api_version == "1.0":
        return {"id": user_id, "name": "John", "version": "1.0"}
    elif api_version == "2.0":
        return {
            "id": user_id,
            "name": "John",
            "email": "john@example.com",
            "created_at": "2024-01-01",
            "version": "2.0"
        }
    else:
        raise HTTPException(
            status_code=400,
            detail=f"Unsupported API version: {api_version}"
        )
Express.js Implementation:
javascript
app.get('/users', (req, res) => {
  const version = req.get('X-API-Version') || '1.0';

  if (version === '1.0') {
    res.json({ users: ['user1', 'user2'], version: '1.0' });
  } else if (version === '2.0') {
    res.json({
      users: ['user1', 'user2'],
      pagination: { limit: 10, offset: 0 },
      version: '2.0'
    });
  } else {
    res.status(400).json({ error: `Unsupported API version: ${version}` });
  }
});

app.get('/users/:id', (req, res) => {
  const version = req.get('X-API-Version') || '1.0';

  if (version === '1.0') {
    res.json({ id: req.params.id, name: 'John', version: '1.0' });
  } else if (version === '2.0') {
    res.json({
      id: req.params.id,
      name: 'John',
      email: 'john@example.com',
      created_at: '2024-01-01',
      version: '2.0'
    });
  } else {
    res.status(400).json({ error: `Unsupported API version: ${version}` });
  }
});
在自定义Header或Accept Header中指定版本。
优点:
  • URI简洁
  • 无URL冗余
  • 更符合REST风格(资源只有单个URI)
缺点:
  • 可见性低
  • 浏览器中测试较难
  • 路由逻辑更复杂
FastAPI实现:
python
from fastapi import FastAPI, Header, HTTPException

app = FastAPI()

@app.get("/users")
async def get_users(api_version: str = Header(default="1.0", alias="X-API-Version")):
    """基于Header处理多版本"""
    if api_version == "1.0":
        return {"users": ["user1", "user2"], "version": "1.0"}
    elif api_version == "2.0":
        return {
            "users": ["user1", "user2"],
            "pagination": {"limit": 10, "offset": 0},
            "version": "2.0"
        }
    else:
        raise HTTPException(
            status_code=400,
            detail=f"Unsupported API version: {api_version}"
        )

@app.get("/users/{user_id}")
async def get_user(
    user_id: int,
    api_version: str = Header(default="1.0", alias="X-API-Version")
):
    """带版本处理的用户端点"""
    if api_version == "1.0":
        return {"id": user_id, "name": "John", "version": "1.0"}
    elif api_version == "2.0":
        return {
            "id": user_id,
            "name": "John",
            "email": "john@example.com",
            "created_at": "2024-01-01",
            "version": "2.0"
        }
    else:
        raise HTTPException(
            status_code=400,
            detail=f"Unsupported API version: {api_version}"
        )
Express.js实现:
javascript
app.get('/users', (req, res) => {
  const version = req.get('X-API-Version') || '1.0';

  if (version === '1.0') {
    res.json({ users: ['user1', 'user2'], version: '1.0' });
  } else if (version === '2.0') {
    res.json({
      users: ['user1', 'user2'],
      pagination: { limit: 10, offset: 0 },
      version: '2.0'
    });
  } else {
    res.status(400).json({ error: `Unsupported API version: ${version}` });
  }
});

app.get('/users/:id', (req, res) => {
  const version = req.get('X-API-Version') || '1.0';

  if (version === '1.0') {
    res.json({ id: req.params.id, name: 'John', version: '1.0' });
  } else if (version === '2.0') {
    res.json({
      id: req.params.id,
      name: 'John',
      email: 'john@example.com',
      created_at: '2024-01-01',
      version: '2.0'
    });
  } else {
    res.status(400).json({ error: `Unsupported API version: ${version}` });
  }
});

Strategy 3: Content Negotiation (Accept Header)

策略3:内容协商(Accept Header)

Version specified in Accept header with custom media types.
FastAPI Implementation:
python
from fastapi import FastAPI, Request, HTTPException

app = FastAPI()

@app.get("/users")
async def get_users(request: Request):
    """Handle versioning via Accept header"""
    accept = request.headers.get("accept", "application/vnd.api.v1+json")

    if "vnd.api.v1+json" in accept:
        return {"users": ["user1", "user2"], "version": "1.0"}
    elif "vnd.api.v2+json" in accept:
        return {
            "users": ["user1", "user2"],
            "pagination": {"limit": 10, "offset": 0},
            "version": "2.0"
        }
    else:
        raise HTTPException(
            status_code=406,
            detail="Not Acceptable: Unsupported media type"
        )
Express.js Implementation:
javascript
app.get('/users', (req, res) => {
  const accept = req.get('Accept') || 'application/vnd.api.v1+json';

  if (accept.includes('vnd.api.v1+json')) {
    res.type('application/vnd.api.v1+json')
       .json({ users: ['user1', 'user2'], version: '1.0' });
  } else if (accept.includes('vnd.api.v2+json')) {
    res.type('application/vnd.api.v2+json')
       .json({
         users: ['user1', 'user2'],
         pagination: { limit: 10, offset: 0 },
         version: '2.0'
       });
  } else {
    res.status(406).json({ error: 'Not Acceptable: Unsupported media type' });
  }
});
在Accept Header中使用自定义媒体类型指定版本。
FastAPI实现:
python
from fastapi import FastAPI, Request, HTTPException

app = FastAPI()

@app.get("/users")
async def get_users(request: Request):
    """通过Accept Header处理版本控制"""
    accept = request.headers.get("accept", "application/vnd.api.v1+json")

    if "vnd.api.v1+json" in accept:
        return {"users": ["user1", "user2"], "version": "1.0"}
    elif "vnd.api.v2+json" in accept:
        return {
            "users": ["user1", "user2"],
            "pagination": {"limit": 10, "offset": 0},
            "version": "2.0"
        }
    else:
        raise HTTPException(
            status_code=406,
            detail="Not Acceptable: Unsupported media type"
        )
Express.js实现:
javascript
app.get('/users', (req, res) => {
  const accept = req.get('Accept') || 'application/vnd.api.v1+json';

  if (accept.includes('vnd.api.v1+json')) {
    res.type('application/vnd.api.v1+json')
       .json({ users: ['user1', 'user2'], version: '1.0' });
  } else if (accept.includes('vnd.api.v2+json')) {
    res.type('application/vnd.api.v2+json')
       .json({
         users: ['user1', 'user2'],
         pagination: { limit: 10, offset: 0 },
         version: '2.0'
       });
  } else {
    res.status(406).json({ error: 'Not Acceptable: Unsupported media type' });
  }
});

Strategy 4: Query Parameter Versioning

策略4:查询参数版本控制

Version as query parameter (least recommended).
GET /users?version=2.0
GET /users/123?v=2
Cons:
  • Mixes versioning with filtering
  • Harder to cache
  • Less clear separation of concerns
版本作为查询参数(最不推荐)。
GET /users?version=2.0
GET /users/123?v=2
缺点:
  • 版本控制与过滤逻辑混合
  • 缓存难度大
  • 关注点分离不清晰

Pagination Patterns

分页模式

Pattern 1: Offset-Based Pagination (Traditional)

模式1:基于偏移量的分页(传统方式)

Simple but can have performance issues with large datasets.
FastAPI Implementation:
python
from fastapi import FastAPI, Query
from typing import List
from pydantic import BaseModel

class PaginatedResponse(BaseModel):
    items: List[dict]
    total: int
    limit: int
    offset: int
    has_more: bool

@app.get("/items", response_model=PaginatedResponse)
async def list_items(
    limit: int = Query(default=10, ge=1, le=100),
    offset: int = Query(default=0, ge=0)
):
    """Offset-based pagination"""
    # Get total count
    total = count_items_in_db()

    # Get paginated items
    items = get_items_from_db(limit=limit, offset=offset)

    # Check if there are more items
    has_more = (offset + limit) < total

    return {
        "items": items,
        "total": total,
        "limit": limit,
        "offset": offset,
        "has_more": has_more
    }
Express.js Implementation:
javascript
app.get('/items', async (req, res) => {
  try {
    const limit = Math.min(parseInt(req.query.limit) || 10, 100);
    const offset = parseInt(req.query.offset) || 0;

    const total = await countItemsInDB();
    const items = await getItemsFromDB({ limit, offset });
    const hasMore = (offset + limit) < total;

    res.json({
      items,
      total,
      limit,
      offset,
      has_more: hasMore
    });
  } catch (error) {
    res.status(500).json({ error: 'Internal server error' });
  }
});
简单但在大数据集下可能存在性能问题。
FastAPI实现:
python
from fastapi import FastAPI, Query
from typing import List
from pydantic import BaseModel

class PaginatedResponse(BaseModel):
    items: List[dict]
    total: int
    limit: int
    offset: int
    has_more: bool

@app.get("/items", response_model=PaginatedResponse)
async def list_items(
    limit: int = Query(default=10, ge=1, le=100),
    offset: int = Query(default=0, ge=0)
):
    """基于偏移量的分页"""
    # 获取总数
    total = count_items_in_db()

    # 获取分页后的物品
    items = get_items_from_db(limit=limit, offset=offset)

    # 检查是否还有更多物品
    has_more = (offset + limit) < total

    return {
        "items": items,
        "total": total,
        "limit": limit,
        "offset": offset,
        "has_more": has_more
    }
Express.js实现:
javascript
app.get('/items', async (req, res) => {
  try {
    const limit = Math.min(parseInt(req.query.limit) || 10, 100);
    const offset = parseInt(req.query.offset) || 0;

    const total = await countItemsInDB();
    const items = await getItemsFromDB({ limit, offset });
    const hasMore = (offset + limit) < total;

    res.json({
      items,
      total,
      limit,
      offset,
      has_more: hasMore
    });
  } catch (error) {
    res.status(500).json({ error: 'Internal server error' });
  }
});

Pattern 2: Cursor-Based Pagination (Recommended for Large Datasets)

模式2:基于游标(Cursor)的分页(推荐用于大数据集)

More efficient for large datasets, prevents issues with data changes during pagination.
FastAPI Implementation:
python
from typing import Optional

class CursorPaginatedResponse(BaseModel):
    items: List[dict]
    next_cursor: Optional[str] = None
    has_more: bool

@app.get("/items", response_model=CursorPaginatedResponse)
async def list_items_cursor(
    limit: int = Query(default=10, ge=1, le=100),
    cursor: Optional[str] = None
):
    """Cursor-based pagination"""
    # Get items after cursor
    items = get_items_after_cursor(cursor=cursor, limit=limit + 1)

    # Check if there are more items
    has_more = len(items) > limit

    # Get next cursor from last item
    next_cursor = None
    if has_more:
        items = items[:limit]  # Remove extra item
        next_cursor = items[-1]["id"]  # Use last item ID as cursor

    return {
        "items": items,
        "next_cursor": next_cursor,
        "has_more": has_more
    }
Express.js Implementation:
javascript
app.get('/items', async (req, res) => {
  try {
    const limit = Math.min(parseInt(req.query.limit) || 10, 100);
    const cursor = req.query.cursor || null;

    // Get one extra item to check if there are more
    const items = await getItemsAfterCursor({ cursor, limit: limit + 1 });

    const hasMore = items.length > limit;
    let nextCursor = null;

    if (hasMore) {
      items.pop(); // Remove extra item
      nextCursor = items[items.length - 1].id; // Last item ID as cursor
    }

    res.json({
      items,
      next_cursor: nextCursor,
      has_more: hasMore
    });
  } catch (error) {
    res.status(500).json({ error: 'Internal server error' });
  }
});
在大数据集下更高效,避免分页期间数据变化导致的问题。
FastAPI实现:
python
from typing import Optional

class CursorPaginatedResponse(BaseModel):
    items: List[dict]
    next_cursor: Optional[str] = None
    has_more: bool

@app.get("/items", response_model=CursorPaginatedResponse)
async def list_items_cursor(
    limit: int = Query(default=10, ge=1, le=100),
    cursor: Optional[str] = None
):
    """基于游标的分页"""
    # 获取游标之后的物品
    items = get_items_after_cursor(cursor=cursor, limit=limit + 1)

    # 检查是否还有更多物品
    has_more = len(items) > limit

    # 从最后一个物品获取下一个游标
    next_cursor = None
    if has_more:
        items = items[:limit]  # 移除多余的物品
        next_cursor = items[-1]["id"]  # 使用最后一个物品的ID作为游标

    return {
        "items": items,
        "next_cursor": next_cursor,
        "has_more": has_more
    }
Express.js实现:
javascript
app.get('/items', async (req, res) => {
  try {
    const limit = Math.min(parseInt(req.query.limit) || 10, 100);
    const cursor = req.query.cursor || null;

    // 多获取一个物品以检查是否还有更多
    const items = await getItemsAfterCursor({ cursor, limit: limit + 1 });

    const hasMore = items.length > limit;
    let nextCursor = null;

    if (hasMore) {
      items.pop(); // 移除多余的物品
      nextCursor = items[items.length - 1].id; // 最后一个物品的ID作为游标
    }

    res.json({
      items,
      next_cursor: nextCursor,
      has_more: hasMore
    });
  } catch (error) {
    res.status(500).json({ error: 'Internal server error' });
  }
});

Pattern 3: Page-Based Pagination

模式3:基于页码的分页

User-friendly for UIs with page numbers.
FastAPI Implementation:
python
class PagePaginatedResponse(BaseModel):
    items: List[dict]
    page: int
    page_size: int
    total_pages: int
    total_items: int

@app.get("/items", response_model=PagePaginatedResponse)
async def list_items_pages(
    page: int = Query(default=1, ge=1),
    page_size: int = Query(default=10, ge=1, le=100)
):
    """Page-based pagination"""
    # Calculate offset
    offset = (page - 1) * page_size

    # Get total count and items
    total_items = count_items_in_db()
    items = get_items_from_db(limit=page_size, offset=offset)

    # Calculate total pages
    total_pages = (total_items + page_size - 1) // page_size

    return {
        "items": items,
        "page": page,
        "page_size": page_size,
        "total_pages": total_pages,
        "total_items": total_items
    }
Express.js Implementation:
javascript
app.get('/items', async (req, res) => {
  try {
    const page = Math.max(parseInt(req.query.page) || 1, 1);
    const pageSize = Math.min(parseInt(req.query.page_size) || 10, 100);

    const offset = (page - 1) * pageSize;

    const totalItems = await countItemsInDB();
    const items = await getItemsFromDB({ limit: pageSize, offset });
    const totalPages = Math.ceil(totalItems / pageSize);

    res.json({
      items,
      page,
      page_size: pageSize,
      total_pages: totalPages,
      total_items: totalItems
    });
  } catch (error) {
    res.status(500).json({ error: 'Internal server error' });
  }
});
对带页码的UI更友好。
FastAPI实现:
python
class PagePaginatedResponse(BaseModel):
    items: List[dict]
    page: int
    page_size: int
    total_pages: int
    total_items: int

@app.get("/items", response_model=PagePaginatedResponse)
async def list_items_pages(
    page: int = Query(default=1, ge=1),
    page_size: int = Query(default=10, ge=1, le=100)
):
    """基于页码的分页"""
    # 计算偏移量
    offset = (page - 1) * page_size

    # 获取总数和物品
    total_items = count_items_in_db()
    items = get_items_from_db(limit=page_size, offset=offset)

    # 计算总页数
    total_pages = (total_items + page_size - 1) // page_size

    return {
        "items": items,
        "page": page,
        "page_size": page_size,
        "total_pages": total_pages,
        "total_items": total_items
    }
Express.js实现:
javascript
app.get('/items', async (req, res) => {
  try {
    const page = Math.max(parseInt(req.query.page) || 1, 1);
    const pageSize = Math.min(parseInt(req.query.page_size) || 10, 100);

    const offset = (page - 1) * pageSize;

    const totalItems = await countItemsInDB();
    const items = await getItemsFromDB({ limit: pageSize, offset });
    const totalPages = Math.ceil(totalItems / pageSize);

    res.json({
      items,
      page,
      page_size: pageSize,
      total_pages: totalPages,
      total_items: totalItems
    });
  } catch (error) {
    res.status(500).json({ error: 'Internal server error' });
  }
});

Filtering and Sorting

过滤与排序

Advanced Filtering

高级过滤

FastAPI Implementation:
python
from enum import Enum
from typing import Optional, List

class SortOrder(str, Enum):
    asc = "asc"
    desc = "desc"

@app.get("/products")
async def list_products(
    # Filtering
    category: Optional[str] = None,
    min_price: Optional[float] = None,
    max_price: Optional[float] = None,
    in_stock: Optional[bool] = None,
    tags: Optional[List[str]] = Query(None),
    search: Optional[str] = None,
    # Sorting
    sort_by: Optional[str] = Query(default="created_at"),
    order: SortOrder = SortOrder.desc,
    # Pagination
    limit: int = Query(default=10, ge=1, le=100),
    offset: int = Query(default=0, ge=0)
):
    """Advanced filtering and sorting"""
    filters = {
        "category": category,
        "min_price": min_price,
        "max_price": max_price,
        "in_stock": in_stock,
        "tags": tags,
        "search": search
    }

    # Remove None values
    filters = {k: v for k, v in filters.items() if v is not None}

    # Query database
    products = query_products(
        filters=filters,
        sort_by=sort_by,
        order=order.value,
        limit=limit,
        offset=offset
    )

    return {
        "products": products,
        "filters": filters,
        "sort": {"by": sort_by, "order": order.value},
        "pagination": {"limit": limit, "offset": offset}
    }
Express.js Implementation:
javascript
app.get('/products', async (req, res) => {
  try {
    const {
      category,
      min_price,
      max_price,
      in_stock,
      tags,
      search,
      sort_by = 'created_at',
      order = 'desc',
      limit = 10,
      offset = 0
    } = req.query;

    // Build filters
    const filters = {};
    if (category) filters.category = category;
    if (min_price) filters.min_price = parseFloat(min_price);
    if (max_price) filters.max_price = parseFloat(max_price);
    if (in_stock !== undefined) filters.in_stock = in_stock === 'true';
    if (tags) filters.tags = Array.isArray(tags) ? tags : [tags];
    if (search) filters.search = search;

    // Query database
    const products = await queryProducts({
      filters,
      sortBy: sort_by,
      order,
      limit: Math.min(parseInt(limit), 100),
      offset: parseInt(offset)
    });

    res.json({
      products,
      filters,
      sort: { by: sort_by, order },
      pagination: { limit, offset }
    });
  } catch (error) {
    res.status(500).json({ error: 'Internal server error' });
  }
});
FastAPI实现:
python
from enum import Enum
from typing import Optional, List

class SortOrder(str, Enum):
    asc = "asc"
    desc = "desc"

@app.get("/products")
async def list_products(
    # 过滤
    category: Optional[str] = None,
    min_price: Optional[float] = None,
    max_price: Optional[float] = None,
    in_stock: Optional[bool] = None,
    tags: Optional[List[str]] = Query(None),
    search: Optional[str] = None,
    # 排序
    sort_by: Optional[str] = Query(default="created_at"),
    order: SortOrder = SortOrder.desc,
    # 分页
    limit: int = Query(default=10, ge=1, le=100),
    offset: int = Query(default=0, ge=0)
):
    """高级过滤与排序"""
    filters = {
        "category": category,
        "min_price": min_price,
        "max_price": max_price,
        "in_stock": in_stock,
        "tags": tags,
        "search": search
    }

    # 移除None值
    filters = {k: v for k, v in filters.items() if v is not None}

    # 查询数据库
    products = query_products(
        filters=filters,
        sort_by=sort_by,
        order=order.value,
        limit=limit,
        offset=offset
    )

    return {
        "products": products,
        "filters": filters,
        "sort": {"by": sort_by, "order": order.value},
        "pagination": {"limit": limit, "offset": offset}
    }
Express.js实现:
javascript
app.get('/products', async (req, res) => {
  try {
    const {
      category,
      min_price,
      max_price,
      in_stock,
      tags,
      search,
      sort_by = 'created_at',
      order = 'desc',
      limit = 10,
      offset = 0
    } = req.query;

    // 构建过滤条件
    const filters = {};
    if (category) filters.category = category;
    if (min_price) filters.min_price = parseFloat(min_price);
    if (max_price) filters.max_price = parseFloat(max_price);
    if (in_stock !== undefined) filters.in_stock = in_stock === 'true';
    if (tags) filters.tags = Array.isArray(tags) ? tags : [tags];
    if (search) filters.search = search;

    // 查询数据库
    const products = await queryProducts({
      filters,
      sortBy: sort_by,
      order,
      limit: Math.min(parseInt(limit), 100),
      offset: parseInt(offset)
    });

    res.json({
      products,
      filters,
      sort: { by: sort_by, order },
      pagination: { limit, offset }
    });
  } catch (error) {
    res.status(500).json({ error: 'Internal server error' });
  }
});

Error Handling Best Practices

错误处理最佳实践

Consistent Error Response Format

统一的错误响应格式

Standard Error Response Structure:
json
{
  "error": {
    "code": "RESOURCE_NOT_FOUND",
    "message": "The requested user was not found",
    "details": {
      "user_id": "123",
      "resource": "user"
    },
    "timestamp": "2024-01-15T10:30:00Z"
  }
}
标准错误响应结构:
json
{
  "error": {
    "code": "RESOURCE_NOT_FOUND",
    "message": "请求的用户不存在",
    "details": {
      "user_id": "123",
      "resource": "user"
    },
    "timestamp": "2024-01-15T10:30:00Z"
  }
}

HTTP Status Codes

HTTP状态码

Success Codes:
  • 200 OK
    : Successful GET, PUT, PATCH, DELETE with response
  • 201 Created
    : Successful POST creating a resource
  • 202 Accepted
    : Request accepted for async processing
  • 204 No Content
    : Successful DELETE with no response body
Client Error Codes:
  • 400 Bad Request
    : Invalid request syntax or validation error
  • 401 Unauthorized
    : Authentication required
  • 403 Forbidden
    : Authenticated but not authorized
  • 404 Not Found
    : Resource doesn't exist
  • 405 Method Not Allowed
    : HTTP method not supported
  • 409 Conflict
    : Request conflicts with current state
  • 422 Unprocessable Entity
    : Validation failed
  • 429 Too Many Requests
    : Rate limit exceeded
Server Error Codes:
  • 500 Internal Server Error
    : Generic server error
  • 502 Bad Gateway
    : Invalid response from upstream server
  • 503 Service Unavailable
    : Server temporarily unavailable
  • 504 Gateway Timeout
    : Upstream server timeout
成功状态码:
  • 200 OK
    :成功的GET、PUT、PATCH、DELETE请求(带响应体)
  • 201 Created
    :成功创建资源的POST请求
  • 202 Accepted
    :请求已接受,将异步处理
  • 204 No Content
    :成功的DELETE请求(无响应体)
客户端错误状态码:
  • 400 Bad Request
    :请求语法无效或验证错误
  • 401 Unauthorized
    :需要认证
  • 403 Forbidden
    :已认证但无权限
  • 404 Not Found
    :资源不存在
  • 405 Method Not Allowed
    :不支持该HTTP方法
  • 409 Conflict
    :请求与当前资源状态冲突
  • 422 Unprocessable Entity
    :验证失败
  • 429 Too Many Requests
    :超出请求频率限制
服务端错误状态码:
  • 500 Internal Server Error
    :通用服务端错误
  • 502 Bad Gateway
    :上游服务器响应无效
  • 503 Service Unavailable
    :服务暂时不可用
  • 504 Gateway Timeout
    :上游服务器超时

FastAPI Error Handling

FastAPI错误处理

python
from fastapi import FastAPI, HTTPException, status, Request
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from datetime import datetime

app = FastAPI()
python
from fastapi import FastAPI, HTTPException, status, Request
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from datetime import datetime

app = FastAPI()

Custom exception

自定义异常

class ResourceNotFoundError(Exception): def init(self, resource: str, resource_id: str): self.resource = resource self.resource_id = resource_id
class ResourceNotFoundError(Exception): def init(self, resource: str, resource_id: str): self.resource = resource self.resource_id = resource_id

Global exception handler

全局异常处理器

@app.exception_handler(ResourceNotFoundError) async def resource_not_found_handler(request: Request, exc: ResourceNotFoundError): return JSONResponse( status_code=404, content={ "error": { "code": "RESOURCE_NOT_FOUND", "message": f"The requested {exc.resource} was not found", "details": { "resource": exc.resource, "resource_id": exc.resource_id }, "timestamp": datetime.utcnow().isoformat() + "Z" } } )
@app.exception_handler(ResourceNotFoundError) async def resource_not_found_handler(request: Request, exc: ResourceNotFoundError): return JSONResponse( status_code=404, content={ "error": { "code": "RESOURCE_NOT_FOUND", "message": f"请求的{exc.resource}不存在", "details": { "resource": exc.resource, "resource_id": exc.resource_id }, "timestamp": datetime.utcnow().isoformat() + "Z" } } )

Validation error handler

验证错误处理器

@app.exception_handler(RequestValidationError) async def validation_exception_handler(request: Request, exc: RequestValidationError): return JSONResponse( status_code=422, content={ "error": { "code": "VALIDATION_ERROR", "message": "Request validation failed", "details": exc.errors(), "timestamp": datetime.utcnow().isoformat() + "Z" } } )
@app.exception_handler(RequestValidationError) async def validation_exception_handler(request: Request, exc: RequestValidationError): return JSONResponse( status_code=422, content={ "error": { "code": "VALIDATION_ERROR", "message": "请求验证失败", "details": exc.errors(), "timestamp": datetime.utcnow().isoformat() + "Z" } } )

Using exceptions

使用异常

@app.get("/users/{user_id}") async def get_user(user_id: int): user = get_user_from_db(user_id) if not user: raise ResourceNotFoundError("user", str(user_id)) return user
@app.get("/users/{user_id}") async def get_user(user_id: int): user = get_user_from_db(user_id) if not user: raise ResourceNotFoundError("user", str(user_id)) return user

Manual error responses

手动返回错误响应

@app.post("/users") async def create_user(email: str): if user_exists(email): raise HTTPException( status_code=409, detail={ "code": "DUPLICATE_EMAIL", "message": "A user with this email already exists", "details": {"email": email} } ) return create_user_in_db(email)
undefined
@app.post("/users") async def create_user(email: str): if user_exists(email): raise HTTPException( status_code=409, detail={ "code": "DUPLICATE_EMAIL", "message": "该邮箱已被注册", "details": {"email": email} } ) return create_user_in_db(email)
undefined

Express.js Error Handling

Express.js错误处理

javascript
const express = require('express');
const app = express();

app.use(express.json());

// Custom error class
class ResourceNotFoundError extends Error {
  constructor(resource, resourceId) {
    super(`${resource} not found`);
    this.name = 'ResourceNotFoundError';
    this.resource = resource;
    this.resourceId = resourceId;
    this.statusCode = 404;
  }
}

class ValidationError extends Error {
  constructor(message, details) {
    super(message);
    this.name = 'ValidationError';
    this.details = details;
    this.statusCode = 422;
  }
}

// Routes
app.get('/users/:id', async (req, res, next) => {
  try {
    const user = await getUserFromDB(req.params.id);
    if (!user) {
      throw new ResourceNotFoundError('user', req.params.id);
    }
    res.json(user);
  } catch (error) {
    next(error);
  }
});

app.post('/users', async (req, res, next) => {
  try {
    const { email } = req.body;

    if (!email) {
      throw new ValidationError('Validation failed', {
        field: 'email',
        message: 'Email is required'
      });
    }

    const userExists = await checkUserExists(email);
    if (userExists) {
      const error = new Error('Duplicate email');
      error.statusCode = 409;
      error.code = 'DUPLICATE_EMAIL';
      error.details = { email };
      throw error;
    }

    const newUser = await createUserInDB(email);
    res.status(201).json(newUser);
  } catch (error) {
    next(error);
  }
});

// Global error handler (must be last)
app.use((err, req, res, next) => {
  const statusCode = err.statusCode || 500;
  const code = err.code || err.name || 'INTERNAL_SERVER_ERROR';

  const errorResponse = {
    error: {
      code,
      message: err.message,
      timestamp: new Date().toISOString()
    }
  };

  // Add details if available
  if (err.details) {
    errorResponse.error.details = err.details;
  } else if (err.resource && err.resourceId) {
    errorResponse.error.details = {
      resource: err.resource,
      resource_id: err.resourceId
    };
  }

  // Log error for debugging (don't expose in production)
  if (process.env.NODE_ENV !== 'production') {
    errorResponse.error.stack = err.stack;
  }

  res.status(statusCode).json(errorResponse);
});
javascript
const express = require('express');
const app = express();

app.use(express.json());

// 自定义错误类
class ResourceNotFoundError extends Error {
  constructor(resource, resourceId) {
    super(`${resource} not found`);
    this.name = 'ResourceNotFoundError';
    this.resource = resource;
    this.resourceId = resourceId;
    this.statusCode = 404;
  }
}

class ValidationError extends Error {
  constructor(message, details) {
    super(message);
    this.name = 'ValidationError';
    this.details = details;
    this.statusCode = 422;
  }
}

// 路由
app.get('/users/:id', async (req, res, next) => {
  try {
    const user = await getUserFromDB(req.params.id);
    if (!user) {
      throw new ResourceNotFoundError('user', req.params.id);
    }
    res.json(user);
  } catch (error) {
    next(error);
  }
});

app.post('/users', async (req, res, next) => {
  try {
    const { email } = req.body;

    if (!email) {
      throw new ValidationError('Validation failed', {
        field: 'email',
        message: 'Email is required'
      });
    }

    const userExists = await checkUserExists(email);
    if (userExists) {
      const error = new Error('Duplicate email');
      error.statusCode = 409;
      error.code = 'DUPLICATE_EMAIL';
      error.details = { email };
      throw error;
    }

    const newUser = await createUserInDB(email);
    res.status(201).json(newUser);
  } catch (error) {
    next(error);
  }
});

// 全局错误处理器(必须放在最后)
app.use((err, req, res, next) => {
  const statusCode = err.statusCode || 500;
  const code = err.code || err.name || 'INTERNAL_SERVER_ERROR';

  const errorResponse = {
    error: {
      code,
      message: err.message,
      timestamp: new Date().toISOString()
    }
  };

  // 如果有详情则添加
  if (err.details) {
    errorResponse.error.details = err.details;
  } else if (err.resource && err.resourceId) {
    errorResponse.error.details = {
      resource: err.resource,
      resource_id: err.resourceId
    };
  }

  // 调试用日志(生产环境不要暴露)
  if (process.env.NODE_ENV !== 'production') {
    errorResponse.error.stack = err.stack;
  }

  res.status(statusCode).json(errorResponse);
});

HATEOAS and Hypermedia

HATEOAS与超媒体

What is HATEOAS?

什么是HATEOAS?

HATEOAS (Hypermedia as the Engine of Application State) means including links to related resources in API responses.
Benefits:
  • Self-documenting API
  • Client doesn't need to construct URLs
  • Easier API evolution
  • Better discoverability
HATEOAS(Hypermedia as the Engine of Application State,超媒体作为应用状态引擎)指在API响应中包含关联资源的链接。
优点:
  • API自文档化
  • 客户端无需构造URL
  • API演进更简单
  • 资源可发现性更好

FastAPI HATEOAS Implementation

FastAPI HATEOAS实现

python
from fastapi import FastAPI
from pydantic import BaseModel, HttpUrl
from typing import List, Optional

class Link(BaseModel):
    rel: str
    href: str
    method: str = "GET"

class UserResponse(BaseModel):
    id: int
    name: str
    email: str
    links: List[Link]

class UserListResponse(BaseModel):
    users: List[UserResponse]
    links: List[Link]

app = FastAPI()

def build_user_links(user_id: int, base_url: str = "http://api.example.com") -> List[Link]:
    """Build HATEOAS links for a user"""
    return [
        Link(rel="self", href=f"{base_url}/users/{user_id}", method="GET"),
        Link(rel="update", href=f"{base_url}/users/{user_id}", method="PUT"),
        Link(rel="delete", href=f"{base_url}/users/{user_id}", method="DELETE"),
        Link(rel="posts", href=f"{base_url}/users/{user_id}/posts", method="GET"),
        Link(rel="create_post", href=f"{base_url}/users/{user_id}/posts", method="POST")
    ]

@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
    """Get user with HATEOAS links"""
    user = get_user_from_db(user_id)

    return {
        "id": user.id,
        "name": user.name,
        "email": user.email,
        "links": build_user_links(user.id)
    }

@app.get("/users", response_model=UserListResponse)
async def list_users():
    """List users with HATEOAS links"""
    users = get_users_from_db()

    users_with_links = [
        {
            "id": user.id,
            "name": user.name,
            "email": user.email,
            "links": build_user_links(user.id)
        }
        for user in users
    ]

    collection_links = [
        Link(rel="self", href="http://api.example.com/users", method="GET"),
        Link(rel="create", href="http://api.example.com/users", method="POST")
    ]

    return {
        "users": users_with_links,
        "links": collection_links
    }
python
from fastapi import FastAPI
from pydantic import BaseModel, HttpUrl
from typing import List, Optional

class Link(BaseModel):
    rel: str
    href: str
    method: str = "GET"

class UserResponse(BaseModel):
    id: int
    name: str
    email: str
    links: List[Link]

class UserListResponse(BaseModel):
    users: List[UserResponse]
    links: List[Link]

app = FastAPI()

def build_user_links(user_id: int, base_url: str = "http://api.example.com") -> List[Link]:
    """为用户构建HATEOAS链接"""
    return [
        Link(rel="self", href=f"{base_url}/users/{user_id}", method="GET"),
        Link(rel="update", href=f"{base_url}/users/{user_id}", method="PUT"),
        Link(rel="delete", href=f"{base_url}/users/{user_id}", method="DELETE"),
        Link(rel="posts", href=f"{base_url}/users/{user_id}/posts", method="GET"),
        Link(rel="create_post", href=f"{base_url}/users/{user_id}/posts", method="POST")
    ]

@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
    """获取带HATEOAS链接的用户"""
    user = get_user_from_db(user_id)

    return {
        "id": user.id,
        "name": user.name,
        "email": user.email,
        "links": build_user_links(user.id)
    }

@app.get("/users", response_model=UserListResponse)
async def list_users():
    """列出带HATEOAS链接的用户"""
    users = get_users_from_db()

    users_with_links = [
        {
            "id": user.id,
            "name": user.name,
            "email": user.email,
            "links": build_user_links(user.id)
        }
        for user in users
    ]

    collection_links = [
        Link(rel="self", href="http://api.example.com/users", method="GET"),
        Link(rel="create", href="http://api.example.com/users", method="POST")
    ]

    return {
        "users": users_with_links,
        "links": collection_links
    }

Express.js HATEOAS Implementation

Express.js HATEOAS实现

javascript
app.get('/users/:id', async (req, res) => {
  try {
    const user = await getUserFromDB(req.params.id);
    if (!user) {
      return res.status(404).json({ error: 'User not found' });
    }

    const baseUrl = `${req.protocol}://${req.get('host')}`;

    res.json({
      id: user.id,
      name: user.name,
      email: user.email,
      _links: {
        self: { href: `${baseUrl}/users/${user.id}`, method: 'GET' },
        update: { href: `${baseUrl}/users/${user.id}`, method: 'PUT' },
        delete: { href: `${baseUrl}/users/${user.id}`, method: 'DELETE' },
        posts: { href: `${baseUrl}/users/${user.id}/posts`, method: 'GET' },
        create_post: { href: `${baseUrl}/users/${user.id}/posts`, method: 'POST' }
      }
    });
  } catch (error) {
    res.status(500).json({ error: 'Internal server error' });
  }
});

app.get('/users', async (req, res) => {
  try {
    const users = await getUsersFromDB();
    const baseUrl = `${req.protocol}://${req.get('host')}`;

    const usersWithLinks = users.map(user => ({
      id: user.id,
      name: user.name,
      email: user.email,
      _links: {
        self: { href: `${baseUrl}/users/${user.id}`, method: 'GET' },
        update: { href: `${baseUrl}/users/${user.id}`, method: 'PUT' },
        delete: { href: `${baseUrl}/users/${user.id}`, method: 'DELETE' }
      }
    }));

    res.json({
      users: usersWithLinks,
      _links: {
        self: { href: `${baseUrl}/users`, method: 'GET' },
        create: { href: `${baseUrl}/users`, method: 'POST' }
      }
    });
  } catch (error) {
    res.status(500).json({ error: 'Internal server error' });
  }
});
javascript
app.get('/users/:id', async (req, res) => {
  try {
    const user = await getUserFromDB(req.params.id);
    if (!user) {
      return res.status(404).json({ error: 'User not found' });
    }

    const baseUrl = `${req.protocol}://${req.get('host')}`;

    res.json({
      id: user.id,
      name: user.name,
      email: user.email,
      _links: {
        self: { href: `${baseUrl}/users/${user.id}`, method: 'GET' },
        update: { href: `${baseUrl}/users/${user.id}`, method: 'PUT' },
        delete: { href: `${baseUrl}/users/${user.id}`, method: 'DELETE' },
        posts: { href: `${baseUrl}/users/${user.id}/posts`, method: 'GET' },
        create_post: { href: `${baseUrl}/users/${user.id}/posts`, method: 'POST' }
      }
    });
  } catch (error) {
    res.status(500).json({ error: 'Internal server error' });
  }
});

app.get('/users', async (req, res) => {
  try {
    const users = await getUsersFromDB();
    const baseUrl = `${req.protocol}://${req.get('host')}`;

    const usersWithLinks = users.map(user => ({
      id: user.id,
      name: user.name,
      email: user.email,
      _links: {
        self: { href: `${baseUrl}/users/${user.id}`, method: 'GET' },
        update: { href: `${baseUrl}/users/${user.id}`, method: 'PUT' },
        delete: { href: `${baseUrl}/users/${user.id}`, method: 'DELETE' }
      }
    }));

    res.json({
      users: usersWithLinks,
      _links: {
        self: { href: `${baseUrl}/users`, method: 'GET' },
        create: { href: `${baseUrl}/users`, method: 'POST' }
      }
    });
  } catch (error) {
    res.status(500).json({ error: 'Internal server error' });
  }
});

Performance Optimization

性能优化

Caching with ETags

使用ETags缓存

FastAPI Implementation:
python
from fastapi import FastAPI, Request, Response
from fastapi.responses import JSONResponse
import hashlib

app = FastAPI()

def generate_etag(data: dict) -> str:
    """Generate ETag from response data"""
    content = str(data).encode('utf-8')
    return hashlib.md5(content).hexdigest()

@app.get("/users/{user_id}")
async def get_user_cached(user_id: int, request: Request):
    """Get user with ETag caching"""
    user = get_user_from_db(user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")

    # Generate ETag
    etag = generate_etag(user)

    # Check If-None-Match header
    if_none_match = request.headers.get("if-none-match")
    if if_none_match == etag:
        return Response(status_code=304)  # Not Modified

    # Return with ETag header
    return JSONResponse(
        content=user,
        headers={"ETag": etag, "Cache-Control": "max-age=300"}
    )
Express.js Implementation:
javascript
const crypto = require('crypto');

function generateETag(data) {
  const content = JSON.stringify(data);
  return crypto.createHash('md5').update(content).digest('hex');
}

app.get('/users/:id', async (req, res) => {
  try {
    const user = await getUserFromDB(req.params.id);
    if (!user) {
      return res.status(404).json({ error: 'User not found' });
    }

    const etag = generateETag(user);

    // Check If-None-Match header
    if (req.get('If-None-Match') === etag) {
      return res.status(304).send(); // Not Modified
    }

    res.set('ETag', etag)
       .set('Cache-Control', 'max-age=300')
       .json(user);
  } catch (error) {
    res.status(500).json({ error: 'Internal server error' });
  }
});
FastAPI实现:
python
from fastapi import FastAPI, Request, Response
from fastapi.responses import JSONResponse
import hashlib

app = FastAPI()

def generate_etag(data: dict) -> str:
    """从响应数据生成ETag"""
    content = str(data).encode('utf-8')
    return hashlib.md5(content).hexdigest()

@app.get("/users/{user_id}")
async def get_user_cached(user_id: int, request: Request):
    """带ETag缓存的用户获取接口"""
    user = get_user_from_db(user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")

    # 生成ETag
    etag = generate_etag(user)

    # 检查If-None-Match Header
    if_none_match = request.headers.get("if-none-match")
    if if_none_match == etag:
        return Response(status_code=304)  # 未修改

    # 返回带ETag Header的响应
    return JSONResponse(
        content=user,
        headers={"ETag": etag, "Cache-Control": "max-age=300"}
    )
Express.js实现:
javascript
const crypto = require('crypto');

function generateETag(data) {
  const content = JSON.stringify(data);
  return crypto.createHash('md5').update(content).digest('hex');
}

app.get('/users/:id', async (req, res) => {
  try {
    const user = await getUserFromDB(req.params.id);
    if (!user) {
      return res.status(404).json({ error: 'User not found' });
    }

    const etag = generateETag(user);

    // 检查If-None-Match Header
    if (req.get('If-None-Match') === etag) {
      return res.status(304).send(); // 未修改
    }

    res.set('ETag', etag)
       .set('Cache-Control', 'max-age=300')
       .json(user);
  } catch (error) {
    res.status(500).json({ error: 'Internal server error' });
  }
});

Rate Limiting

请求频率限制

Express.js Implementation:
javascript
const rateLimit = require('express-rate-limit');

// Create rate limiter
const apiLimiter = rateLimit({
  windowMs: 15 * 60 * 1000, // 15 minutes
  max: 100, // Limit each IP to 100 requests per windowMs
  message: {
    error: {
      code: 'RATE_LIMIT_EXCEEDED',
      message: 'Too many requests, please try again later'
    }
  },
  standardHeaders: true, // Return rate limit info in headers
  legacyHeaders: false
});

// Apply to all routes
app.use('/api/', apiLimiter);

// Or create specific limiters
const createAccountLimiter = rateLimit({
  windowMs: 60 * 60 * 1000, // 1 hour
  max: 5, // 5 requests per hour
  message: 'Too many accounts created, please try again later'
});

app.post('/api/users', createAccountLimiter, async (req, res) => {
  // Create user
});
Express.js实现:
javascript
const rateLimit = require('express-rate-limit');

// 创建频率限制器
const apiLimiter = rateLimit({
  windowMs: 15 * 60 * 1000, // 15分钟
  max: 100, // 每个IP在窗口内最多100次请求
  message: {
    error: {
      code: 'RATE_LIMIT_EXCEEDED',
      message: '请求过于频繁,请稍后再试'
    }
  },
  standardHeaders: true, // 在Header中返回频率限制信息
  legacyHeaders: false
});

// 应用到所有路由
app.use('/api/', apiLimiter);

// 或创建特定的频率限制器
const createAccountLimiter = rateLimit({
  windowMs: 60 * 60 * 1000, // 1小时
  max: 5, // 每小时最多5次请求
  message: '创建账号过于频繁,请稍后再试'
});

app.post('/api/users', createAccountLimiter, async (req, res) => {
  // 创建用户
});

Compression

压缩

FastAPI Implementation:
python
from fastapi import FastAPI
from fastapi.middleware.gzip import GZipMiddleware

app = FastAPI()
FastAPI实现:
python
from fastapi import FastAPI
from fastapi.middleware.gzip import GZipMiddleware

app = FastAPI()

Add compression middleware

添加压缩中间件

app.add_middleware(GZipMiddleware, minimum_size=1000)
@app.get("/large-data") async def get_large_data(): """This response will be compressed if > 1000 bytes""" return {"data": [{"id": i, "value": f"item_{i}"} for i in range(1000)]}

**Express.js Implementation:**

```javascript
const compression = require('compression');

// Add compression middleware
app.use(compression({
  threshold: 1024, // Only compress responses > 1KB
  level: 6 // Compression level (0-9)
}));

app.get('/large-data', (req, res) => {
  const data = Array.from({ length: 1000 }, (_, i) => ({
    id: i,
    value: `item_${i}`
  }));
  res.json({ data });
});
app.add_middleware(GZipMiddleware, minimum_size=1000)
@app.get("/large-data") async def get_large_data(): """响应大小超过1000字节时会被压缩""" return {"data": [{"id": i, "value": f"item_{i}"} for i in range(1000)]}

**Express.js实现:**

```javascript
const compression = require('compression');

// 添加压缩中间件
app.use(compression({
  threshold: 1024, // 仅压缩大于1KB的响应
  level: 6 // 压缩级别(0-9)
}));

app.get('/large-data', (req, res) => {
  const data = Array.from({ length: 1000 }, (_, i) => ({
    id: i,
    value: `item_${i}`
  }));
  res.json({ data });
});

Security Best Practices

安全最佳实践

Authentication Patterns

认证模式

JWT Authentication in FastAPI:
python
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
from datetime import datetime, timedelta

app = FastAPI()
security = HTTPBearer()

SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"

def create_access_token(data: dict, expires_delta: timedelta = None):
    """Create JWT token"""
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
    """Verify JWT token"""
    token = credentials.credentials
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return payload
    except jwt.ExpiredSignatureError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Token has expired"
        )
    except jwt.JWTError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Could not validate credentials"
        )

@app.post("/login")
async def login(email: str, password: str):
    """Login and get access token"""
    user = authenticate_user(email, password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect email or password"
        )

    access_token = create_access_token(
        data={"sub": user.email, "user_id": user.id}
    )

    return {"access_token": access_token, "token_type": "bearer"}

@app.get("/users/me")
async def get_current_user(payload: dict = Depends(verify_token)):
    """Protected endpoint - requires authentication"""
    user_id = payload.get("user_id")
    user = get_user_from_db(user_id)
    return user
JWT Authentication in Express.js:
javascript
const jwt = require('jsonwebtoken');
const express = require('express');
const app = express();

const SECRET_KEY = 'your-secret-key';

function createAccessToken(data, expiresIn = '15m') {
  return jwt.sign(data, SECRET_KEY, { expiresIn });
}

function verifyToken(req, res, next) {
  const authHeader = req.get('Authorization');

  if (!authHeader || !authHeader.startsWith('Bearer ')) {
    return res.status(401).json({ error: 'Missing or invalid authorization header' });
  }

  const token = authHeader.substring(7);

  try {
    const payload = jwt.verify(token, SECRET_KEY);
    req.user = payload;
    next();
  } catch (error) {
    if (error.name === 'TokenExpiredError') {
      return res.status(401).json({ error: 'Token has expired' });
    }
    return res.status(401).json({ error: 'Invalid token' });
  }
}

app.post('/login', async (req, res) => {
  const { email, password } = req.body;

  const user = await authenticateUser(email, password);
  if (!user) {
    return res.status(401).json({ error: 'Incorrect email or password' });
  }

  const accessToken = createAccessToken({
    sub: user.email,
    user_id: user.id
  });

  res.json({ access_token: accessToken, token_type: 'bearer' });
});

app.get('/users/me', verifyToken, async (req, res) => {
  const user = await getUserFromDB(req.user.user_id);
  res.json(user);
});
FastAPI中的JWT认证:
python
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
from datetime import datetime, timedelta

app = FastAPI()
security = HTTPBearer()

SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"

def create_access_token(data: dict, expires_delta: timedelta = None):
    """创建JWT令牌"""
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
    """验证JWT令牌"""
    token = credentials.credentials
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return payload
    except jwt.ExpiredSignatureError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="令牌已过期"
        )
    except jwt.JWTError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="凭证验证失败"
        )

@app.post("/login")
async def login(email: str, password: str):
    """登录并获取访问令牌"""
    user = authenticate_user(email, password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="邮箱或密码错误"
        )

    access_token = create_access_token(
        data={"sub": user.email, "user_id": user.id}
    )

    return {"access_token": access_token, "token_type": "bearer"}

@app.get("/users/me")
async def get_current_user(payload: dict = Depends(verify_token)):
    """受保护的端点 - 需要认证"""
    user_id = payload.get("user_id")
    user = get_user_from_db(user_id)
    return user
Express.js中的JWT认证:
javascript
const jwt = require('jsonwebtoken');
const express = require('express');
const app = express();

const SECRET_KEY = 'your-secret-key';

function createAccessToken(data, expiresIn = '15m') {
  return jwt.sign(data, SECRET_KEY, { expiresIn });
}

function verifyToken(req, res, next) {
  const authHeader = req.get('Authorization');

  if (!authHeader || !authHeader.startsWith('Bearer ')) {
    return res.status(401).json({ error: 'Missing or invalid authorization header' });
  }

  const token = authHeader.substring(7);

  try {
    const payload = jwt.verify(token, SECRET_KEY);
    req.user = payload;
    next();
  } catch (error) {
    if (error.name === 'TokenExpiredError') {
      return res.status(401).json({ error: '令牌已过期' });
    }
    return res.status(401).json({ error: '无效令牌' });
  }
}

app.post('/login', async (req, res) => {
  const { email, password } = req.body;

  const user = await authenticateUser(email, password);
  if (!user) {
    return res.status(401).json({ error: '邮箱或密码错误' });
  }

  const accessToken = createAccessToken({
    sub: user.email,
    user_id: user.id
  });

  res.json({ access_token: accessToken, token_type: 'bearer' });
});

app.get('/users/me', verifyToken, async (req, res) => {
  const user = await getUserFromDB(req.user.user_id);
  res.json(user);
});

Input Validation and Sanitization

输入验证与清理

FastAPI Validation:
python
from fastapi import FastAPI
from pydantic import BaseModel, EmailStr, Field, validator
from typing import Optional

class UserCreate(BaseModel):
    email: EmailStr
    username: str = Field(..., min_length=3, max_length=50, pattern="^[a-zA-Z0-9_-]+$")
    password: str = Field(..., min_length=8)
    age: Optional[int] = Field(None, ge=0, le=150)

    @validator('password')
    def password_strength(cls, v):
        """Validate password strength"""
        if not any(char.isdigit() for char in v):
            raise ValueError('Password must contain at least one digit')
        if not any(char.isupper() for char in v):
            raise ValueError('Password must contain at least one uppercase letter')
        return v

@app.post("/users")
async def create_user(user: UserCreate):
    """Automatically validates input"""
    # Input is already validated by Pydantic
    hashed_password = hash_password(user.password)
    new_user = create_user_in_db(user.email, user.username, hashed_password)
    return new_user
FastAPI验证:
python
from fastapi import FastAPI
from pydantic import BaseModel, EmailStr, Field, validator
from typing import Optional

class UserCreate(BaseModel):
    email: EmailStr
    username: str = Field(..., min_length=3, max_length=50, pattern="^[a-zA-Z0-9_-]+$")
    password: str = Field(..., min_length=8)
    age: Optional[int] = Field(None, ge=0, le=150)

    @validator('password')
    def password_strength(cls, v):
        """验证密码强度"""
        if not any(char.isdigit() for char in v):
            raise ValueError('密码必须包含至少一个数字')
        if not any(char.isupper() for char in v):
            raise ValueError('密码必须包含至少一个大写字母')
        return v

@app.post("/users")
async def create_user(user: UserCreate):
    """自动验证输入"""
    # 输入已被Pydantic验证
    hashed_password = hash_password(user.password)
    new_user = create_user_in_db(user.email, user.username, hashed_password)
    return new_user

API Documentation

API文档

OpenAPI/Swagger with FastAPI

FastAPI中的OpenAPI/Swagger

FastAPI automatically generates OpenAPI documentation:
python
from fastapi import FastAPI
from pydantic import BaseModel, Field

app = FastAPI(
    title="My API",
    description="Comprehensive API for managing resources",
    version="1.0.0",
    docs_url="/api/docs",
    redoc_url="/api/redoc"
)

class Item(BaseModel):
    """Item model with rich documentation"""
    name: str = Field(..., description="The name of the item", example="Widget")
    price: float = Field(..., description="Price in USD", example=19.99, gt=0)
    description: str = Field(None, description="Optional item description")

    model_config = {
        "json_schema_extra": {
            "examples": [
                {
                    "name": "Super Widget",
                    "price": 29.99,
                    "description": "An amazing widget"
                }
            ]
        }
    }

@app.post(
    "/items",
    response_model=Item,
    status_code=201,
    summary="Create a new item",
    description="Create a new item with name, price, and optional description",
    response_description="The created item",
    tags=["items"]
)
async def create_item(item: Item):
    """
    Create a new item with all the information:

    - **name**: The item name (required)
    - **price**: The item price in USD (required, must be positive)
    - **description**: Optional description of the item
    """
    return item
FastAPI会自动生成OpenAPI文档:
python
from fastapi import FastAPI
from pydantic import BaseModel, Field

app = FastAPI(
    title="My API",
    description="管理资源的综合API",
    version="1.0.0",
    docs_url="/api/docs",
    redoc_url="/api/redoc"
)

class Item(BaseModel):
    """带有详细文档的物品模型"""
    name: str = Field(..., description="物品名称", example="Widget")
    price: float = Field(..., description="美元价格", example=19.99, gt=0)
    description: str = Field(None, description="可选的物品描述")

    model_config = {
        "json_schema_extra": {
            "examples": [
                {
                    "name": "Super Widget",
                    "price": 29.99,
                    "description": "An amazing widget"
                }
            ]
        }
    }

@app.post(
    "/items",
    response_model=Item,
    status_code=201,
    summary="创建新物品",
    description="使用名称、价格和可选描述创建新物品",
    response_description="已创建的物品",
    tags=["items"]
)
async def create_item(item: Item):
    """
    创建包含所有信息的新物品:

    - **name**: 物品名称(必填)
    - **price**: 物品的美元价格(必填,必须为正数)
    - **description**: 可选的物品描述
    """
    return item

Best Practices Summary

最佳实践总结

API Design Principles

API设计原则

  1. Use nouns for resources, not verbs
  2. Use plural nouns for collections
  3. Use HTTP methods correctly (GET, POST, PUT, PATCH, DELETE)
  4. Use proper HTTP status codes
  5. Version your API (URI versioning recommended)
  6. Support pagination for collections
  7. Allow filtering and sorting with query parameters
  8. Return consistent error responses
  9. Use HATEOAS for better discoverability
  10. Document your API with OpenAPI/Swagger
  1. 资源使用名词而非动词
  2. 集合资源使用复数名词
  3. 正确使用HTTP方法(GET、POST、PUT、PATCH、DELETE)
  4. 使用正确的HTTP状态码
  5. 为API添加版本控制(推荐使用URI版本控制)
  6. 集合资源支持分页
  7. 允许通过查询参数进行过滤和排序
  8. 返回统一的错误响应
  9. 使用HATEOAS提升可发现性
  10. 使用OpenAPI/Swagger编写API文档

Security Principles

安全原则

  1. Always use HTTPS in production
  2. Implement authentication (JWT, OAuth, API keys)
  3. Validate all inputs thoroughly
  4. Use rate limiting to prevent abuse
  5. Sanitize outputs to prevent XSS
  6. Implement CORS correctly
  7. Use security headers (CSP, X-Frame-Options, etc.)
  8. Log security events for monitoring
  9. Keep dependencies updated
  10. Never expose sensitive data in responses
  1. 生产环境始终使用HTTPS
  2. 实现认证机制(JWT、OAuth、API密钥)
  3. 彻底验证所有输入
  4. 使用频率限制防止滥用
  5. 清理输出以防止XSS攻击
  6. 正确实现CORS
  7. 使用安全Header(CSP、X-Frame-Options等)
  8. 记录安全事件以便监控
  9. 保持依赖库更新
  10. 响应中绝不暴露敏感数据

Performance Principles

性能原则

  1. Use caching (ETags, Cache-Control headers)
  2. Implement compression for large responses
  3. Use pagination for large datasets
  4. Optimize database queries (indexes, N+1 prevention)
  5. Use async/await for I/O operations
  6. Implement connection pooling
  7. Monitor API performance (response times, error rates)
  8. Use CDNs for static content
  9. Implement proper logging without blocking
  10. Load test your API regularly

Skill Version: 1.0.0 Last Updated: October 2025 Skill Category: API Design, Backend Development, REST Architecture Compatible With: FastAPI, Express.js, Node.js, Python, HTTP Frameworks
  1. 使用缓存(ETags、Cache-Control Headers)
  2. 对大响应进行压缩
  3. 大数据集使用分页
  4. 优化数据库查询(索引、避免N+1问题)
  5. I/O操作使用async/await
  6. 实现连接池
  7. 监控API性能(响应时间、错误率)
  8. 静态内容使用CDN
  9. 实现非阻塞的正确日志
  10. 定期对API进行负载测试

技能版本: 1.0.0 最后更新: 2025年10月 技能分类: API设计、后端开发、REST架构 兼容框架: FastAPI、Express.js、Node.js、Python、HTTP框架