fastapi-microservices-development

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

FastAPI Microservices Development

FastAPI 微服务开发

A comprehensive skill for building production-ready microservices using FastAPI. This skill covers REST API design patterns, asynchronous operations, dependency injection, testing strategies, and deployment best practices for scalable Python applications.
这是一份使用FastAPI构建生产级微服务的综合指南,涵盖REST API设计模式、异步操作、依赖注入、测试策略以及可扩展Python应用的部署最佳实践。

When to Use This Skill

何时使用本技能

Use this skill when:
  • Building RESTful microservices with Python
  • Developing high-performance async APIs
  • Creating production-grade web services with comprehensive validation
  • Implementing service-oriented architectures
  • Building APIs requiring advanced dependency injection
  • Developing services with complex authentication/authorization
  • Creating scalable, maintainable backend services
  • Building APIs with automatic OpenAPI documentation
  • Implementing WebSocket services alongside REST APIs
  • Deploying containerized Python services to production
在以下场景使用本技能:
  • 使用Python构建RESTful微服务
  • 开发高性能异步API
  • 创建具备全面验证能力的生产级Web服务
  • 实现面向服务的架构
  • 构建需要高级依赖注入的API
  • 开发具备复杂认证/授权机制的服务
  • 构建可扩展、可维护的后端服务
  • 构建自动生成OpenAPI文档的API
  • 在REST API之外实现WebSocket服务
  • 将容器化的Python服务部署到生产环境

Core Concepts

核心概念

FastAPI Fundamentals

FastAPI 基础

FastAPI is a modern, high-performance web framework for building APIs with Python 3.7+ based on standard Python type hints.
Key Features:
  • Fast: Very high performance, on par with NodeJS and Go (powered by Starlette and Pydantic)
  • Fast to code: Increase development speed by 200-300%
  • Fewer bugs: Reduce human-induced errors by about 40%
  • Intuitive: Great editor support with autocompletion everywhere
  • Easy: Designed to be easy to learn and use
  • Short: Minimize code duplication
  • Robust: Production-ready code with automatic interactive documentation
  • Standards-based: Based on OpenAPI and JSON Schema
FastAPI是一个现代、高性能的Web框架,用于使用Python 3.7+基于标准Python类型提示构建API。
关键特性:
  • 快速:极高的性能,与NodeJS和Go相当(基于Starlette和Pydantic)
  • 编码快速:将开发速度提升200-300%
  • 更少错误:减少约40%的人为错误
  • 直观:出色的编辑器支持,全场景自动补全
  • 易用:设计为易于学习和使用
  • 简洁:最小化代码重复
  • 健壮:具备自动交互式文档的生产级代码
  • 基于标准:基于OpenAPI和JSON Schema

Async/Await Programming

Async/Await 编程

FastAPI fully supports asynchronous request handling using Python's
async
/
await
syntax:
python
from fastapi import FastAPI

app = FastAPI()

@app.get('/burgers')
async def read_burgers():
    burgers = await get_burgers(2)
    return burgers
When to use
async def
:
  • Database queries with async drivers
  • External API calls
  • File I/O operations
  • Long-running computations that can be awaited
  • WebSocket connections
  • Background task processing
When to use regular
def
:
  • Simple CRUD operations
  • Synchronous database libraries
  • CPU-bound operations
  • Quick data transformations
FastAPI完全支持使用Python的
async
/
await
语法处理异步请求:
python
from fastapi import FastAPI

app = FastAPI()

@app.get('/burgers')
async def read_burgers():
    burgers = await get_burgers(2)
    return burgers
何时使用
async def
  • 使用异步驱动的数据库查询
  • 外部API调用
  • 文件I/O操作
  • 可等待的长时间计算
  • WebSocket连接
  • 后台任务处理
何时使用常规
def
  • 简单CRUD操作
  • 同步数据库库
  • CPU密集型操作
  • 快速数据转换

Dependency Injection System

依赖注入系统

FastAPI's dependency injection is one of its most powerful features, enabling:
  • Code reusability across endpoints
  • Shared logic implementation
  • Database connection management
  • Authentication and authorization
  • Request validation
  • Background task scheduling
Basic Dependency Pattern:
python
from typing import Annotated, Union
from fastapi import Depends, FastAPI

app = FastAPI()
FastAPI的依赖注入是其最强大的特性之一,支持:
  • 端点间的代码复用
  • 共享逻辑实现
  • 数据库连接管理
  • 认证与授权
  • 请求验证
  • 后台任务调度
基础依赖模式:
python
from typing import Annotated, Union
from fastapi import Depends, FastAPI

app = FastAPI()

Dependency function

依赖函数

async def common_parameters( q: Union[str, None] = None, skip: int = 0, limit: int = 100 ): return {"q": q, "skip": skip, "limit": limit}
async def common_parameters( q: Union[str, None] = None, skip: int = 0, limit: int = 100 ): return {"q": q, "skip": skip, "limit": limit}

Using dependency in multiple endpoints

在多个端点中使用依赖

@app.get("/items/") async def read_items(commons: Annotated[dict, Depends(common_parameters)]): return {"params": commons, "items": ["item1", "item2"]}
@app.get("/users/") async def read_users(commons: Annotated[dict, Depends(common_parameters)]): return {"params": commons, "users": ["user1", "user2"]}
undefined
@app.get("/items/") async def read_items(commons: Annotated[dict, Depends(common_parameters)]): return {"params": commons, "items": ["item1", "item2"]}
@app.get("/users/") async def read_users(commons: Annotated[dict, Depends(common_parameters)]): return {"params": commons, "users": ["user1", "user2"]}
undefined

Microservices Architecture Patterns

微服务架构模式

Service Design Principles

服务设计原则

1. Single Responsibility
  • Each microservice handles one business capability
  • Clear boundaries and minimal coupling
  • Independent deployment and scaling
2. API-First Design
  • Design APIs before implementation
  • Use OpenAPI schemas for contracts
  • Version APIs appropriately
3. Database Per Service
  • Each service owns its data
  • No direct database sharing
  • Use APIs for cross-service data access
4. Stateless Services
  • Services don't maintain client session state
  • Enables horizontal scaling
  • Use external storage for session data
1. 单一职责
  • 每个微服务负责一项业务能力
  • 清晰的边界和最小耦合
  • 独立部署和扩展
2. API优先设计
  • 在实现前设计API
  • 使用OpenAPI schema作为契约
  • 合理版本化API
3. 服务专属数据库
  • 每个服务拥有自己的数据
  • 不直接共享数据库
  • 使用API进行跨服务数据访问
4. 无状态服务
  • 服务不维护客户端会话状态
  • 支持水平扩展
  • 使用外部存储保存会话数据

Service Communication Patterns

服务通信模式

Synchronous Communication (REST APIs):
python
import httpx
from fastapi import FastAPI, HTTPException

app = FastAPI()

@app.get("/orders/{order_id}")
async def get_order(order_id: str):
    # Call another microservice
    async with httpx.AsyncClient() as client:
        try:
            response = await client.get(f"http://inventory-service/stock/{order_id}")
            inventory_data = response.json()
        except httpx.HTTPError:
            raise HTTPException(status_code=503, detail="Inventory service unavailable")

    return {"order_id": order_id, "inventory": inventory_data}
Event-Driven Communication:
  • Use message brokers (RabbitMQ, Kafka, Redis)
  • Publish/Subscribe patterns
  • Asynchronous processing
  • Loose coupling between services
同步通信(REST APIs):
python
import httpx
from fastapi import FastAPI, HTTPException

app = FastAPI()

@app.get("/orders/{order_id}")
async def get_order(order_id: str):
    # 调用另一个微服务
    async with httpx.AsyncClient() as client:
        try:
            response = await client.get(f"http://inventory-service/stock/{order_id}")
            inventory_data = response.json()
        except httpx.HTTPError:
            raise HTTPException(status_code=503, detail="Inventory service unavailable")

    return {"order_id": order_id, "inventory": inventory_data}
事件驱动通信:
  • 使用消息中间件(RabbitMQ、Kafka、Redis)
  • 发布/订阅模式
  • 异步处理
  • 服务间松耦合

Service Discovery

服务发现

Options:
  • Environment variables for simple setups
  • Consul, Eureka for dynamic discovery
  • Kubernetes DNS for K8s deployments
  • API Gateway for centralized routing
可选方案:
  • 简单环境下使用环境变量
  • 动态发现使用Consul、Eureka
  • Kubernetes部署使用K8s DNS
  • 集中式路由使用API网关

REST API Design Patterns

REST API设计模式

Resource Modeling

资源建模

RESTful Resource Design:
python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional

app = FastAPI()
RESTful资源设计:
python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional

app = FastAPI()

Resource Models

资源模型

class ItemBase(BaseModel): name: str description: Optional[str] = None price: float tax: Optional[float] = None
class ItemCreate(ItemBase): pass
class Item(ItemBase): id: int owner_id: int
class Config:
    from_attributes = True
class ItemBase(BaseModel): name: str description: Optional[str] = None price: float tax: Optional[float] = None
class ItemCreate(ItemBase): pass
class Item(ItemBase): id: int owner_id: int
class Config:
    from_attributes = True

Collection Endpoints

集合端点

@app.get("/items/", response_model=List[Item]) async def list_items(skip: int = 0, limit: int = 100): """List all items with pagination""" items = await get_items_from_db(skip=skip, limit=limit) return items
@app.post("/items/", response_model=Item, status_code=201) async def create_item(item: ItemCreate): """Create a new item""" new_item = await save_item_to_db(item) return new_item
@app.get("/items/", response_model=List[Item]) async def list_items(skip: int = 0, limit: int = 100): """分页列出所有条目""" items = await get_items_from_db(skip=skip, limit=limit) return items
@app.post("/items/", response_model=Item, status_code=201) async def create_item(item: ItemCreate): """创建新条目""" new_item = await save_item_to_db(item) return new_item

Resource Endpoints

资源端点

@app.get("/items/{item_id}", response_model=Item) async def read_item(item_id: int): """Get a specific item by ID""" item = await get_item_from_db(item_id) if item is None: raise HTTPException(status_code=404, detail="Item not found") return item
@app.put("/items/{item_id}", response_model=Item) async def update_item(item_id: int, item: ItemCreate): """Update an existing item""" updated_item = await update_item_in_db(item_id, item) if updated_item is None: raise HTTPException(status_code=404, detail="Item not found") return updated_item
@app.delete("/items/{item_id}", status_code=204) async def delete_item(item_id: int): """Delete an item""" success = await delete_item_from_db(item_id) if not success: raise HTTPException(status_code=404, detail="Item not found")
undefined
@app.get("/items/{item_id}", response_model=Item) async def read_item(item_id: int): """通过ID获取特定条目""" item = await get_item_from_db(item_id) if item is None: raise HTTPException(status_code=404, detail="Item not found") return item
@app.put("/items/{item_id}", response_model=Item) async def update_item(item_id: int, item: ItemCreate): """更新现有条目""" updated_item = await update_item_in_db(item_id, item) if updated_item is None: raise HTTPException(status_code=404, detail="Item not found") return updated_item
@app.delete("/items/{item_id}", status_code=204) async def delete_item(item_id: int): """删除条目""" success = await delete_item_from_db(item_id) if not success: raise HTTPException(status_code=404, detail="Item not found")
undefined

API Versioning

API版本化

URL Path Versioning (Recommended):
python
from fastapi import FastAPI, APIRouter

app = FastAPI()
URL路径版本化(推荐):
python
from fastapi import FastAPI, APIRouter

app = FastAPI()

V1 API Router

V1 API路由

v1_router = APIRouter(prefix="/api/v1")
@v1_router.get("/users/") async def list_users_v1(): return {"version": "v1", "users": []}
v1_router = APIRouter(prefix="/api/v1")
@v1_router.get("/users/") async def list_users_v1(): return {"version": "v1", "users": []}

V2 API Router

V2 API路由

v2_router = APIRouter(prefix="/api/v2")
@v2_router.get("/users/") async def list_users_v2(): return {"version": "v2", "users": [], "metadata": {}}
app.include_router(v1_router) app.include_router(v2_router)
undefined
v2_router = APIRouter(prefix="/api/v2")
@v2_router.get("/users/") async def list_users_v2(): return {"version": "v2", "users": [], "metadata": {}}
app.include_router(v1_router) app.include_router(v2_router)
undefined

Request/Response Validation

请求/响应验证

FastAPI uses Pydantic for automatic validation:
python
from pydantic import BaseModel, Field, EmailStr, validator
from typing import Optional
from datetime import datetime

class UserCreate(BaseModel):
    username: str = Field(..., min_length=3, max_length=50)
    email: EmailStr
    password: str = Field(..., min_length=8)
    age: Optional[int] = Field(None, ge=0, le=150)

    @validator('username')
    def username_alphanumeric(cls, v):
        assert v.isalnum(), 'must be alphanumeric'
        return v

    @validator('password')
    def password_strength(cls, v):
        if not any(char.isdigit() for char in v):
            raise ValueError('must contain at least one digit')
        if not any(char.isupper() for char in v):
            raise ValueError('must contain at least one uppercase letter')
        return v

class UserResponse(BaseModel):
    id: int
    username: str
    email: EmailStr
    created_at: datetime

    class Config:
        from_attributes = True

@app.post("/users/", response_model=UserResponse, status_code=201)
async def create_user(user: UserCreate):
    # Automatic validation of request body
    new_user = await save_user(user)
    return new_user
FastAPI使用Pydantic实现自动验证:
python
from pydantic import BaseModel, Field, EmailStr, validator
from typing import Optional
from datetime import datetime

class UserCreate(BaseModel):
    username: str = Field(..., min_length=3, max_length=50)
    email: EmailStr
    password: str = Field(..., min_length=8)
    age: Optional[int] = Field(None, ge=0, le=150)

    @validator('username')
    def username_alphanumeric(cls, v):
        assert v.isalnum(), 'must be alphanumeric'
        return v

    @validator('password')
    def password_strength(cls, v):
        if not any(char.isdigit() for char in v):
            raise ValueError('must contain at least one digit')
        if not any(char.isupper() for char in v):
            raise ValueError('must contain at least one uppercase letter')
        return v

class UserResponse(BaseModel):
    id: int
    username: str
    email: EmailStr
    created_at: datetime

    class Config:
        from_attributes = True

@app.post("/users/", response_model=UserResponse, status_code=201)
async def create_user(user: UserCreate):
    # 自动验证请求体
    new_user = await save_user(user)
    return new_user

Advanced Dependency Injection

高级依赖注入

Dependencies with Yield

带Yield的依赖

Dependencies can use
yield
for setup/teardown operations:
python
from fastapi import FastAPI, Depends

app = FastAPI()
依赖可以使用
yield
实现初始化/清理操作:
python
from fastapi import FastAPI, Depends

app = FastAPI()

Database dependency with cleanup

带清理操作的数据库依赖

async def get_db(): db = await connect_to_database() try: yield db finally: await db.close()
@app.get("/items/") async def read_items(db = Depends(get_db)): items = await db.query("SELECT * FROM items") return items

**Advanced Resource Management:**

```python
from fastapi import Depends, HTTPException

async def get_database():
    with Session() as session:
        try:
            yield session
        except HTTPException:
            session.rollback()
            raise
        finally:
            session.close()

@app.post("/users/")
async def create_user(user: UserCreate, db = Depends(get_database)):
    try:
        new_user = db.add(User(**user.dict()))
        db.commit()
        return new_user
    except Exception as e:
        # Session automatically rolled back by dependency
        raise HTTPException(status_code=500, detail=str(e))
async def get_db(): db = await connect_to_database() try: yield db finally: await db.close()
@app.get("/items/") async def read_items(db = Depends(get_db)): items = await db.query("SELECT * FROM items") return items

**高级资源管理:**

```python
from fastapi import Depends, HTTPException

async def get_database():
    with Session() as session:
        try:
            yield session
        except HTTPException:
            session.rollback()
            raise
        finally:
            session.close()

@app.post("/users/")
async def create_user(user: UserCreate, db = Depends(get_database)):
    try:
        new_user = db.add(User(**user.dict()))
        db.commit()
        return new_user
    except Exception as e:
        # 依赖自动回滚会话
        raise HTTPException(status_code=500, detail=str(e))

Sub-Dependencies

子依赖

Dependencies can depend on other dependencies:
python
from typing import Optional
from fastapi import FastAPI, Depends, Cookie

app = FastAPI()

async def query_extractor(q: Optional[str] = None):
    return q

async def query_or_cookie_extractor(
    q: str = Depends(query_extractor),
    last_query: Optional[str] = Cookie(None)
):
    if not q:
        return last_query
    return q

@app.get('/items/')
async def read_items(query: str = Depends(query_or_cookie_extractor)):
    return {'query': query}
依赖可以依赖其他依赖:
python
from typing import Optional
from fastapi import FastAPI, Depends, Cookie

app = FastAPI()

async def query_extractor(q: Optional[str] = None):
    return q

async def query_or_cookie_extractor(
    q: str = Depends(query_extractor),
    last_query: Optional[str] = Cookie(None)
):
    if not q:
        return last_query
    return q

@app.get('/items/')
async def read_items(query: str = Depends(query_or_cookie_extractor)):
    return {'query': query}

Class-Based Dependencies

基于类的依赖

Use classes for complex dependency logic:
python
from typing import Optional
from fastapi import FastAPI, Depends

app = FastAPI()

class CommonQueryParams:
    def __init__(
        self,
        q: Optional[str] = None,
        skip: int = 0,
        limit: int = 100,
    ):
        self.q = q
        self.skip = skip
        self.limit = limit

@app.get("/items/")
async def read_items(commons: CommonQueryParams = Depends(CommonQueryParams)):
    return {"q": commons.q, "skip": commons.skip, "limit": commons.limit}
使用类实现复杂依赖逻辑:
python
from typing import Optional
from fastapi import FastAPI, Depends

app = FastAPI()

class CommonQueryParams:
    def __init__(
        self,
        q: Optional[str] = None,
        skip: int = 0,
        limit: int = 100,
    ):
        self.q = q
        self.skip = skip
        self.limit = limit

@app.get("/items/")
async def read_items(commons: CommonQueryParams = Depends(CommonQueryParams)):
    return {"q": commons.q, "skip": commons.skip, "limit": commons.limit}

Shortcut syntax

简化语法

@app.get("/users/") async def read_users(commons: CommonQueryParams = Depends()): return commons
undefined
@app.get("/users/") async def read_users(commons: CommonQueryParams = Depends()): return commons
undefined

Global Dependencies

全局依赖

Apply dependencies to all routes:
python
from fastapi import FastAPI, Depends, Header, HTTPException

async def verify_token(x_token: str = Header(...)):
    if x_token != "secret-token":
        raise HTTPException(status_code=400, detail="Invalid X-Token header")
    return x_token

async def verify_key(x_key: str = Header(...)):
    if x_key != "secret-key":
        raise HTTPException(status_code=400, detail="Invalid X-Key header")
    return x_key
将依赖应用于所有路由:
python
from fastapi import FastAPI, Depends, Header, HTTPException

async def verify_token(x_token: str = Header(...)):
    if x_token != "secret-token":
        raise HTTPException(status_code=400, detail="Invalid X-Token header")
    return x_token

async def verify_key(x_key: str = Header(...)):
    if x_key != "secret-key":
        raise HTTPException(status_code=400, detail="Invalid X-Key header")
    return x_key

Apply to entire application

应用于整个应用

app = FastAPI(dependencies=[Depends(verify_token), Depends(verify_key)])
app = FastAPI(dependencies=[Depends(verify_token), Depends(verify_key)])

Apply to router

应用于路由

from fastapi import APIRouter
router = APIRouter( prefix="/items", dependencies=[Depends(verify_token)] )
@router.get("/") async def read_items(): return [{"item_id": "Foo"}]
app.include_router(router)
undefined
from fastapi import APIRouter
router = APIRouter( prefix="/items", dependencies=[Depends(verify_token)] )
@router.get("/") async def read_items(): return [{"item_id": "Foo"}]
app.include_router(router)
undefined

Reusable Dependency Aliases

可复用依赖别名

Create type aliases for common dependencies:
python
from typing import Annotated
from fastapi import Depends
为常用依赖创建类型别名:
python
from typing import Annotated
from fastapi import Depends

Define reusable dependency types

定义可复用依赖类型

async def get_current_user(): return {"username": "johndoe"}
CurrentUser = Annotated[dict, Depends(get_current_user)]
async def get_current_user(): return {"username": "johndoe"}
CurrentUser = Annotated[dict, Depends(get_current_user)]

Use across multiple endpoints

在多个端点中使用

@app.get("/items/") def read_items(user: CurrentUser): return {"user": user, "items": []}
@app.post("/items/") def create_item(user: CurrentUser, item: Item): return {"user": user, "item": item}
@app.delete("/items/{item_id}") def delete_item(user: CurrentUser, item_id: int): return {"user": user, "deleted": item_id}
undefined
@app.get("/items/") def read_items(user: CurrentUser): return {"user": user, "items": []}
@app.post("/items/") def create_item(user: CurrentUser, item: Item): return {"user": user, "item": item}
@app.delete("/items/{item_id}") def delete_item(user: CurrentUser, item_id: int): return {"user": user, "deleted": item_id}
undefined

Authentication & Authorization

认证与授权

OAuth2 with Password Flow

带密码流的OAuth2

python
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from typing import Optional
import jwt
from datetime import datetime, timedelta

app = FastAPI()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

class Token(BaseModel):
    access_token: str
    token_type: str

class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
    except jwt.PyJWTError:
        raise credentials_exception

    user = await get_user_from_db(username)
    if user is None:
        raise credentials_exception
    return user

async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user

@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user = await authenticate_user(form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user
python
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from typing import Optional
import jwt
from datetime import datetime, timedelta

app = FastAPI()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

class Token(BaseModel):
    access_token: str
    token_type: str

class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
    except jwt.PyJWTError:
        raise credentials_exception

    user = await get_user_from_db(username)
    if user is None:
        raise credentials_exception
    return user

async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user

@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user = await authenticate_user(form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user

OAuth2 with Scopes

带作用域的OAuth2

python
from fastapi.security import SecurityScopes
from pydantic import ValidationError

async def get_current_user(
    security_scopes: SecurityScopes,
    token: str = Depends(oauth2_scheme)
):
    if security_scopes.scopes:
        authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
    else:
        authenticate_value = "Bearer"

    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": authenticate_value},
    )

    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        token_scopes = payload.get("scopes", [])
    except (jwt.PyJWTError, ValidationError):
        raise credentials_exception

    for scope in security_scopes.scopes:
        if scope not in token_scopes:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="Not enough permissions",
                headers={"WWW-Authenticate": authenticate_value},
            )

    user = await get_user(username)
    if user is None:
        raise credentials_exception
    return user

@app.get("/items/", dependencies=[Security(get_current_user, scopes=["items:read"])])
async def read_items():
    return [{"item_id": "Foo"}]

@app.post("/items/", dependencies=[Security(get_current_user, scopes=["items:write"])])
async def create_item(item: Item):
    return item
python
from fastapi.security import SecurityScopes
from pydantic import ValidationError

async def get_current_user(
    security_scopes: SecurityScopes,
    token: str = Depends(oauth2_scheme)
):
    if security_scopes.scopes:
        authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
    else:
        authenticate_value = "Bearer"

    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": authenticate_value},
    )

    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        token_scopes = payload.get("scopes", [])
    except (jwt.PyJWTError, ValidationError):
        raise credentials_exception

    for scope in security_scopes.scopes:
        if scope not in token_scopes:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="Not enough permissions",
                headers={"WWW-Authenticate": authenticate_value},
            )

    user = await get_user(username)
    if user is None:
        raise credentials_exception
    return user

@app.get("/items/", dependencies=[Security(get_current_user, scopes=["items:read"])])
async def read_items():
    return [{"item_id": "Foo"}]

@app.post("/items/", dependencies=[Security(get_current_user, scopes=["items:write"])])
async def create_item(item: Item):
    return item

Background Tasks

后台任务

Simple Background Tasks

简单后台任务

python
from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

def write_log(message: str):
    with open("log.txt", mode="a") as log_file:
        log_file.write(message)

@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_log, f"Notification sent to {email}\n")
    return {"message": "Notification sent in the background"}
python
from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

def write_log(message: str):
    with open("log.txt", mode="a") as log_file:
        log_file.write(message)

@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_log, f"Notification sent to {email}\n")
    return {"message": "Notification sent in the background"}

Background Tasks with Dependencies

带依赖的后台任务

python
from fastapi import BackgroundTasks, Depends
from typing import Annotated

def write_log(message: str):
    with open("log.txt", mode="a") as log_file:
        log_file.write(message)

async def get_query_and_log(
    query: str | None = None,
    background_tasks: BackgroundTasks = Depends()
):
    if query:
        background_tasks.add_task(write_log, f"query: {query}\n")
    return query

@app.post("/send-notification/{email}")
async def send_notification(
    email: str,
    background_tasks: BackgroundTasks,
    query: Annotated[str | None, Depends(get_query_and_log)],
):
    background_tasks.add_task(write_log, f"email: {email}, query: {query}\n")
    return {"message": "Notification sent"}
python
from fastapi import BackgroundTasks, Depends
from typing import Annotated

def write_log(message: str):
    with open("log.txt", mode="a") as log_file:
        log_file.write(message)

async def get_query_and_log(
    query: str | None = None,
    background_tasks: BackgroundTasks = Depends()
):
    if query:
        background_tasks.add_task(write_log, f"query: {query}\n")
    return query

@app.post("/send-notification/{email}")
async def send_notification(
    email: str,
    background_tasks: BackgroundTasks,
    query: Annotated[str | None, Depends(get_query_and_log)],
):
    background_tasks.add_task(write_log, f"email: {email}, query: {query}\n")
    return {"message": "Notification sent"}

WebSocket Support

WebSocket支持

Basic WebSocket

基础WebSocket

python
from fastapi import WebSocket, WebSocketDisconnect

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Message received: {data}")
    except WebSocketDisconnect:
        print("Client disconnected")
python
from fastapi import WebSocket, WebSocketDisconnect

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Message received: {data}")
    except WebSocketDisconnect:
        print("Client disconnected")

WebSocket with Dependencies

带依赖的WebSocket

python
from fastapi import WebSocket, Depends, Query, Cookie, WebSocketException, status

async def get_cookie_or_token(
    websocket: WebSocket,
    session: str | None = Cookie(None),
    token: str | None = Query(None),
):
    if session is None and token is None:
        raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)
    return session or token

@app.websocket("/ws/{item_id}")
async def websocket_endpoint(
    websocket: WebSocket,
    item_id: str,
    q: int | None = None,
    cookie_or_token: str = Depends(get_cookie_or_token),
):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(
                f"Session: {cookie_or_token}, Item: {item_id}, Data: {data}"
            )
    except WebSocketDisconnect:
        print(f"Client {item_id} disconnected")
python
from fastapi import WebSocket, Depends, Query, Cookie, WebSocketException, status

async def get_cookie_or_token(
    websocket: WebSocket,
    session: str | None = Cookie(None),
    token: str | None = Query(None),
):
    if session is None and token is None:
        raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)
    return session or token

@app.websocket("/ws/{item_id}")
async def websocket_endpoint(
    websocket: WebSocket,
    item_id: str,
    q: int | None = None,
    cookie_or_token: str = Depends(get_cookie_or_token),
):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(
                f"Session: {cookie_or_token}, Item: {item_id}, Data: {data}"
            )
    except WebSocketDisconnect:
        print(f"Client {item_id} disconnected")

WebSocket Connection Manager

WebSocket连接管理器

python
from typing import List

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

manager = ConnectionManager()

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            await manager.send_personal_message(f"You wrote: {data}", websocket)
            await manager.broadcast(f"Client #{client_id} says: {data}")
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        await manager.broadcast(f"Client #{client_id} left the chat")
python
from typing import List

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

manager = ConnectionManager()

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            await manager.send_personal_message(f"You wrote: {data}", websocket)
            await manager.broadcast(f"Client #{client_id} says: {data}")
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        await manager.broadcast(f"Client #{client_id} left the chat")

Database Integration

数据库集成

SQLAlchemy with Async

异步SQLAlchemy

python
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, Integer, String

DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"

engine = create_async_engine(DATABASE_URL, echo=True)
async_session_maker = sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False
)
Base = declarative_base()

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True)
    username = Column(String, unique=True, index=True)

async def get_db() -> AsyncSession:
    async with async_session_maker() as session:
        try:
            yield session
        finally:
            await session.close()

@app.get("/users/{user_id}")
async def read_user(user_id: int, db: AsyncSession = Depends(get_db)):
    result = await db.execute(select(User).filter(User.id == user_id))
    user = result.scalars().first()
    if user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return user
python
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, Integer, String

DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"

engine = create_async_engine(DATABASE_URL, echo=True)
async_session_maker = sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False
)
Base = declarative_base()

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True)
    username = Column(String, unique=True, index=True)

async def get_db() -> AsyncSession:
    async with async_session_maker() as session:
        try:
            yield session
        finally:
            await session.close()

@app.get("/users/{user_id}")
async def read_user(user_id: int, db: AsyncSession = Depends(get_db)):
    result = await db.execute(select(User).filter(User.id == user_id))
    user = result.scalars().first()
    if user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return user

MongoDB with Motor

MongoDB与Motor

python
from motor.motor_asyncio import AsyncIOMotorClient
from fastapi import Depends

MONGODB_URL = "mongodb://localhost:27017"
client = AsyncIOMotorClient(MONGODB_URL)
database = client.mydatabase

async def get_database():
    return database

@app.post("/items/")
async def create_item(item: Item, db = Depends(get_database)):
    result = await db.items.insert_one(item.dict())
    return {"id": str(result.inserted_id)}

@app.get("/items/{item_id}")
async def read_item(item_id: str, db = Depends(get_database)):
    from bson import ObjectId
    item = await db.items.find_one({"_id": ObjectId(item_id)})
    if item is None:
        raise HTTPException(status_code=404, detail="Item not found")
    item["_id"] = str(item["_id"])
    return item
python
from motor.motor_asyncio import AsyncIOMotorClient
from fastapi import Depends

MONGODB_URL = "mongodb://localhost:27017"
client = AsyncIOMotorClient(MONGODB_URL)
database = client.mydatabase

async def get_database():
    return database

@app.post("/items/")
async def create_item(item: Item, db = Depends(get_database)):
    result = await db.items.insert_one(item.dict())
    return {"id": str(result.inserted_id)}

@app.get("/items/{item_id}")
async def read_item(item_id: str, db = Depends(get_database)):
    from bson import ObjectId
    item = await db.items.find_one({"_id": ObjectId(item_id)})
    if item is None:
        raise HTTPException(status_code=404, detail="Item not found")
    item["_id"] = str(item["_id"])
    return item

Error Handling

错误处理

Custom Exception Handlers

自定义异常处理器

python
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse

class UnicornException(Exception):
    def __init__(self, name: str):
        self.name = name

@app.exception_handler(UnicornException)
async def unicorn_exception_handler(request: Request, exc: UnicornException):
    return JSONResponse(
        status_code=418,
        content={"message": f"Oops! {exc.name} did something wrong."},
    )

@app.get("/unicorns/{name}")
async def read_unicorn(name: str):
    if name == "yolo":
        raise UnicornException(name=name)
    return {"unicorn_name": name}
python
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse

class UnicornException(Exception):
    def __init__(self, name: str):
        self.name = name

@app.exception_handler(UnicornException)
async def unicorn_exception_handler(request: Request, exc: UnicornException):
    return JSONResponse(
        status_code=418,
        content={"message": f"Oops! {exc.name} did something wrong."},
    )

@app.get("/unicorns/{name}")
async def read_unicorn(name: str):
    if name == "yolo":
        raise UnicornException(name=name)
    return {"unicorn_name": name}

Override Default Exception Handlers

覆盖默认异常处理器

python
from fastapi.exceptions import RequestValidationError
from fastapi.responses import PlainTextResponse

@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
    return PlainTextResponse(str(exc), status_code=400)
python
from fastapi.exceptions import RequestValidationError
from fastapi.responses import PlainTextResponse

@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
    return PlainTextResponse(str(exc), status_code=400)

Testing

测试

Test Setup with TestClient

使用TestClient搭建测试环境

python
from fastapi.testclient import TestClient
from main import app

client = TestClient(app)

def test_read_main():
    response = client.get("/")
    assert response.status_code == 200
    assert response.json() == {"message": "Hello World"}

def test_create_item():
    response = client.post(
        "/items/",
        json={"name": "Foo", "price": 45.2}
    )
    assert response.status_code == 201
    assert response.json()["name"] == "Foo"

def test_read_item():
    response = client.get("/items/1")
    assert response.status_code == 200
    assert "name" in response.json()
python
from fastapi.testclient import TestClient
from main import app

client = TestClient(app)

def test_read_main():
    response = client.get("/")
    assert response.status_code == 200
    assert response.json() == {"message": "Hello World"}

def test_create_item():
    response = client.post(
        "/items/",
        json={"name": "Foo", "price": 45.2}
    )
    assert response.status_code == 201
    assert response.json()["name"] == "Foo"

def test_read_item():
    response = client.get("/items/1")
    assert response.status_code == 200
    assert "name" in response.json()

Testing with Dependencies

带依赖的测试

python
from fastapi import Depends

async def override_get_db():
    return {"test": "database"}

app.dependency_overrides[get_db] = override_get_db

def test_with_dependency():
    response = client.get("/items/")
    assert response.status_code == 200
    # Uses overridden dependency
python
from fastapi import Depends

async def override_get_db():
    return {"test": "database"}

app.dependency_overrides[get_db] = override_get_db

def test_with_dependency():
    response = client.get("/items/")
    assert response.status_code == 200
    # 使用重写后的依赖

Async Testing

异步测试

python
import pytest
from httpx import AsyncClient
from main import app

@pytest.mark.asyncio
async def test_read_items():
    async with AsyncClient(app=app, base_url="http://test") as ac:
        response = await ac.get("/items/")
    assert response.status_code == 200
    assert isinstance(response.json(), list)
python
import pytest
from httpx import AsyncClient
from main import app

@pytest.mark.asyncio
async def test_read_items():
    async with AsyncClient(app=app, base_url="http://test") as ac:
        response = await ac.get("/items/")
    assert response.status_code == 200
    assert isinstance(response.json(), list)

Production Deployment

生产环境部署

Docker Configuration

Docker配置

Dockerfile:
dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY ./app /app

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
Multi-stage Build:
dockerfile
FROM python:3.11-slim as builder

WORKDIR /app
COPY requirements.txt .
RUN pip install --user --no-cache-dir -r requirements.txt

FROM python:3.11-slim

WORKDIR /app
COPY --from=builder /root/.local /root/.local
COPY ./app /app

ENV PATH=/root/.local/bin:$PATH

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
docker-compose.yml:
yaml
version: '3.8'

services:
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql://user:password@db:5432/mydb
      - REDIS_URL=redis://redis:6379
    depends_on:
      - db
      - redis
    volumes:
      - ./app:/app
    command: uvicorn main:app --host 0.0.0.0 --port 8000 --reload

  db:
    image: postgres:15
    environment:
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=password
      - POSTGRES_DB=mydb
    volumes:
      - postgres_data:/var/lib/postgresql/data

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

volumes:
  postgres_data:
Dockerfile:
dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY ./app /app

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
多阶段构建:
dockerfile
FROM python:3.11-slim as builder

WORKDIR /app
COPY requirements.txt .
RUN pip install --user --no-cache-dir -r requirements.txt

FROM python:3.11-slim

WORKDIR /app
COPY --from=builder /root/.local /root/.local
COPY ./app /app

ENV PATH=/root/.local/bin:$PATH

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
docker-compose.yml:
yaml
version: '3.8'

services:
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql://user:password@db:5432/mydb
      - REDIS_URL=redis://redis:6379
    depends_on:
      - db
      - redis
    volumes:
      - ./app:/app
    command: uvicorn main:app --host 0.0.0.0 --port 8000 --reload

  db:
    image: postgres:15
    environment:
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=password
      - POSTGRES_DB=mydb
    volumes:
      - postgres_data:/var/lib/postgresql/data

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

volumes:
  postgres_data:

Kubernetes Deployment

Kubernetes部署

deployment.yaml:
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: fastapi-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: fastapi
  template:
    metadata:
      labels:
        app: fastapi
    spec:
      containers:
      - name: fastapi
        image: myregistry/fastapi-app:latest
        ports:
        - containerPort: 8000
        env:
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: db-secret
              key: url
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: fastapi-service
spec:
  selector:
    app: fastapi
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8000
  type: LoadBalancer
deployment.yaml:
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: fastapi-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: fastapi
  template:
    metadata:
      labels:
        app: fastapi
    spec:
      containers:
      - name: fastapi
        image: myregistry/fastapi-app:latest
        ports:
        - containerPort: 8000
        env:
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: db-secret
              key: url
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: fastapi-service
spec:
  selector:
    app: fastapi
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8000
  type: LoadBalancer

Environment Configuration

环境配置

python
from pydantic_settings import BaseSettings
from functools import lru_cache

class Settings(BaseSettings):
    app_name: str = "FastAPI Microservice"
    database_url: str
    redis_url: str
    secret_key: str
    algorithm: str = "HS256"
    access_token_expire_minutes: int = 30

    class Config:
        env_file = ".env"

@lru_cache()
def get_settings():
    return Settings()

@app.get("/info")
async def info(settings: Settings = Depends(get_settings)):
    return {"app_name": settings.app_name}
python
from pydantic_settings import BaseSettings
from functools import lru_cache

class Settings(BaseSettings):
    app_name: str = "FastAPI Microservice"
    database_url: str
    redis_url: str
    secret_key: str
    algorithm: str = "HS256"
    access_token_expire_minutes: int = 30

    class Config:
        env_file = ".env"

@lru_cache()
def get_settings():
    return Settings()

@app.get("/info")
async def info(settings: Settings = Depends(get_settings)):
    return {"app_name": settings.app_name}

Health Checks

健康检查

python
@app.get("/health")
async def health_check():
    return {"status": "healthy"}

@app.get("/ready")
async def readiness_check(db = Depends(get_db)):
    try:
        # Check database connectivity
        await db.execute("SELECT 1")
        return {"status": "ready"}
    except Exception as e:
        raise HTTPException(status_code=503, detail="Service not ready")
python
@app.get("/health")
async def health_check():
    return {"status": "healthy"}

@app.get("/ready")
async def readiness_check(db = Depends(get_db)):
    try:
        # 检查数据库连接
        await db.execute("SELECT 1")
        return {"status": "ready"}
    except Exception as e:
        raise HTTPException(status_code=503, detail="Service not ready")

Monitoring & Logging

监控与日志

Structured Logging

结构化日志

python
import logging
import json
from datetime import datetime

class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_data = {
            "timestamp": datetime.utcnow().isoformat(),
            "level": record.levelname,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
        }
        return json.dumps(log_data)

logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
logger.setLevel(logging.INFO)

@app.middleware("http")
async def log_requests(request: Request, call_next):
    logger.info(f"Request: {request.method} {request.url}")
    response = await call_next(request)
    logger.info(f"Response: {response.status_code}")
    return response
python
import logging
import json
from datetime import datetime

class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_data = {
            "timestamp": datetime.utcnow().isoformat(),
            "level": record.levelname,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
        }
        return json.dumps(log_data)

logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
logger.setLevel(logging.INFO)

@app.middleware("http")
async def log_requests(request: Request, call_next):
    logger.info(f"Request: {request.method} {request.url}")
    response = await call_next(request)
    logger.info(f"Response: {response.status_code}")
    return response

Prometheus Metrics

Prometheus指标

python
from prometheus_client import Counter, Histogram, generate_latest
from fastapi.responses import Response
import time

REQUEST_COUNT = Counter(
    'http_requests_total',
    'Total HTTP requests',
    ['method', 'endpoint', 'status']
)

REQUEST_DURATION = Histogram(
    'http_request_duration_seconds',
    'HTTP request duration',
    ['method', 'endpoint']
)

@app.middleware("http")
async def prometheus_middleware(request: Request, call_next):
    start_time = time.time()
    response = await call_next(request)
    duration = time.time() - start_time

    REQUEST_COUNT.labels(
        method=request.method,
        endpoint=request.url.path,
        status=response.status_code
    ).inc()

    REQUEST_DURATION.labels(
        method=request.method,
        endpoint=request.url.path
    ).observe(duration)

    return response

@app.get("/metrics")
async def metrics():
    return Response(generate_latest(), media_type="text/plain")
python
from prometheus_client import Counter, Histogram, generate_latest
from fastapi.responses import Response
import time

REQUEST_COUNT = Counter(
    'http_requests_total',
    'Total HTTP requests',
    ['method', 'endpoint', 'status']
)

REQUEST_DURATION = Histogram(
    'http_request_duration_seconds',
    'HTTP request duration',
    ['method', 'endpoint']
)

@app.middleware("http")
async def prometheus_middleware(request: Request, call_next):
    start_time = time.time()
    response = await call_next(request)
    duration = time.time() - start_time

    REQUEST_COUNT.labels(
        method=request.method,
        endpoint=request.url.path,
        status=response.status_code
    ).inc()

    REQUEST_DURATION.labels(
        method=request.method,
        endpoint=request.url.path
    ).observe(duration)

    return response

@app.get("/metrics")
async def metrics():
    return Response(generate_latest(), media_type="text/plain")

Best Practices

最佳实践

1. Project Structure

1. 项目结构

fastapi-service/
├── app/
│   ├── __init__.py
│   ├── main.py
│   ├── config.py
│   ├── dependencies.py
│   ├── models/
│   │   ├── __init__.py
│   │   ├── user.py
│   │   └── item.py
│   ├── schemas/
│   │   ├── __init__.py
│   │   ├── user.py
│   │   └── item.py
│   ├── routers/
│   │   ├── __init__.py
│   │   ├── users.py
│   │   └── items.py
│   ├── services/
│   │   ├── __init__.py
│   │   ├── user_service.py
│   │   └── item_service.py
│   └── database.py
├── tests/
│   ├── __init__.py
│   ├── test_users.py
│   └── test_items.py
├── Dockerfile
├── docker-compose.yml
├── requirements.txt
└── .env
fastapi-service/
├── app/
│   ├── __init__.py
│   ├── main.py
│   ├── config.py
│   ├── dependencies.py
│   ├── models/
│   │   ├── __init__.py
│   │   ├── user.py
│   │   └── item.py
│   ├── schemas/
│   │   ├── __init__.py
│   │   ├── user.py
│   │   └── item.py
│   ├── routers/
│   │   ├── __init__.py
│   │   ├── users.py
│   │   └── items.py
│   ├── services/
│   │   ├── __init__.py
│   │   ├── user_service.py
│   │   └── item_service.py
│   └── database.py
├── tests/
│   ├── __init__.py
│   ├── test_users.py
│   └── test_items.py
├── Dockerfile
├── docker-compose.yml
├── requirements.txt
└── .env

2. Separation of Concerns

2. 关注点分离

models.py - Database models:
python
from sqlalchemy import Column, Integer, String
from .database import Base

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True)
schemas.py - Pydantic schemas:
python
from pydantic import BaseModel, EmailStr

class UserCreate(BaseModel):
    email: EmailStr
    password: str

class UserResponse(BaseModel):
    id: int
    email: EmailStr

    class Config:
        from_attributes = True
services.py - Business logic:
python
from sqlalchemy.orm import Session

class UserService:
    def __init__(self, db: Session):
        self.db = db

    async def create_user(self, user_data: UserCreate):
        # Business logic here
        pass
routers.py - API endpoints:
python
from fastapi import APIRouter, Depends

router = APIRouter(prefix="/users", tags=["users"])

@router.post("/", response_model=UserResponse)
async def create_user(user: UserCreate, service: UserService = Depends()):
    return await service.create_user(user)
models.py - 数据库模型:
python
from sqlalchemy import Column, Integer, String
from .database import Base

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True)
schemas.py - Pydantic schema:
python
from pydantic import BaseModel, EmailStr

class UserCreate(BaseModel):
    email: EmailStr
    password: str

class UserResponse(BaseModel):
    id: int
    email: EmailStr

    class Config:
        from_attributes = True
services.py - 业务逻辑:
python
from sqlalchemy.orm import Session

class UserService:
    def __init__(self, db: Session):
        self.db = db

    async def create_user(self, user_data: UserCreate):
        # 业务逻辑实现
        pass
routers.py - API端点:
python
from fastapi import APIRouter, Depends

router = APIRouter(prefix="/users", tags=["users"])

@router.post("/", response_model=UserResponse)
async def create_user(user: UserCreate, service: UserService = Depends()):
    return await service.create_user(user)

3. Security Best Practices

3. 安全最佳实践

  • Always use HTTPS in production
  • Implement rate limiting
  • Validate and sanitize all inputs
  • Use dependency injection for auth
  • Store secrets in environment variables
  • Implement CORS properly
  • Use security headers
  • Hash passwords with bcrypt/argon2
  • Implement JWT token expiration
  • Use OAuth2 scopes for authorization
  • 生产环境始终使用HTTPS
  • 实现速率限制
  • 验证和清理所有输入
  • 使用依赖注入处理认证
  • 将密钥存储在环境变量中
  • 正确实现CORS
  • 使用安全头
  • 使用bcrypt/argon2哈希密码
  • 实现JWT令牌过期机制
  • 使用OAuth2作用域进行授权

4. Performance Optimization

4. 性能优化

  • Use async/await for I/O operations
  • Implement caching (Redis)
  • Use database connection pooling
  • Paginate large responses
  • Compress responses (gzip)
  • Use CDN for static assets
  • Implement database indexes
  • Use background tasks for heavy operations
  • Monitor with APM tools
  • Load test before production
  • 对I/O操作使用async/await
  • 实现缓存(Redis)
  • 使用数据库连接池
  • 对大响应进行分页
  • 压缩响应(gzip)
  • 静态资源使用CDN
  • 实现数据库索引
  • 后台任务处理重操作
  • 使用APM工具监控
  • 上线前进行负载测试

5. API Documentation

5. API文档

FastAPI automatically generates OpenAPI documentation, but you can enhance it:
python
app = FastAPI(
    title="My Microservice API",
    description="Production-ready microservice with FastAPI",
    version="1.0.0",
    terms_of_service="http://example.com/terms/",
    contact={
        "name": "API Support",
        "url": "http://example.com/support",
        "email": "support@example.com",
    },
    license_info={
        "name": "Apache 2.0",
        "url": "https://www.apache.org/licenses/LICENSE-2.0.html",
    },
)

@app.get(
    "/items/",
    response_model=List[Item],
    summary="List all items",
    description="Retrieve a paginated list of items from the database",
    response_description="List of items with pagination metadata",
)
async def list_items(
    skip: int = Query(0, description="Number of items to skip"),
    limit: int = Query(100, description="Maximum number of items to return"),
):
    """
    List items with pagination support.

    - **skip**: Number of items to skip (for pagination)
    - **limit**: Maximum number of items to return
    """
    return await get_items(skip=skip, limit=limit)
FastAPI自动生成OpenAPI文档,你可以进一步增强:
python
app = FastAPI(
    title="My Microservice API",
    description="Production-ready microservice with FastAPI",
    version="1.0.0",
    terms_of_service="http://example.com/terms/",
    contact={
        "name": "API Support",
        "url": "http://example.com/support",
        "email": "support@example.com",
    },
    license_info={
        "name": "Apache 2.0",
        "url": "https://www.apache.org/licenses/LICENSE-2.0.html",
    },
)

@app.get(
    "/items/",
    response_model=List[Item],
    summary="List all items",
    description="Retrieve a paginated list of items from the database",
    response_description="List of items with pagination metadata",
)
async def list_items(
    skip: int = Query(0, description="Number of items to skip"),
    limit: int = Query(100, description="Maximum number of items to return"),
):
    """
    List items with pagination support.

    - **skip**: Number of items to skip (for pagination)
    - **limit**: Maximum number of items to return
    """
    return await get_items(skip=skip, limit=limit)

Common Patterns & Examples

常见模式与示例

See EXAMPLES.md for 15+ detailed, production-ready examples covering:
  • CRUD operations with async databases
  • Authentication flows
  • File upload handling
  • Caching strategies
  • Rate limiting
  • Event-driven architectures
  • Testing patterns
  • Deployment configurations
  • And more...

Skill Version: 1.0.0 Last Updated: October 2025 Skill Category: Backend Development, Microservices, Python, REST APIs Compatible With: FastAPI 0.100+, Python 3.7+, Pydantic 2.0+
查看EXAMPLES.md获取15+个详细的生产级示例,包括:
  • 异步数据库CRUD操作
  • 认证流程
  • 文件上传处理
  • 缓存策略
  • 速率限制
  • 事件驱动架构
  • 测试模式
  • 部署配置
  • 以及更多...

技能版本: 1.0.0 最后更新: 2025年10月 技能分类: 后端开发, 微服务, Python, REST APIs 兼容版本: FastAPI 0.100+, Python 3.7+, Pydantic 2.0+