fastapi-microservices-development
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseFastAPI Microservices Development
FastAPI 微服务开发
A comprehensive skill for building production-ready microservices using FastAPI. This skill covers REST API design patterns, asynchronous operations, dependency injection, testing strategies, and deployment best practices for scalable Python applications.
这是一份使用FastAPI构建生产级微服务的综合指南,涵盖REST API设计模式、异步操作、依赖注入、测试策略以及可扩展Python应用的部署最佳实践。
When to Use This Skill
何时使用本技能
Use this skill when:
- Building RESTful microservices with Python
- Developing high-performance async APIs
- Creating production-grade web services with comprehensive validation
- Implementing service-oriented architectures
- Building APIs requiring advanced dependency injection
- Developing services with complex authentication/authorization
- Creating scalable, maintainable backend services
- Building APIs with automatic OpenAPI documentation
- Implementing WebSocket services alongside REST APIs
- Deploying containerized Python services to production
在以下场景使用本技能:
- 使用Python构建RESTful微服务
- 开发高性能异步API
- 创建具备全面验证能力的生产级Web服务
- 实现面向服务的架构
- 构建需要高级依赖注入的API
- 开发具备复杂认证/授权机制的服务
- 构建可扩展、可维护的后端服务
- 构建自动生成OpenAPI文档的API
- 在REST API之外实现WebSocket服务
- 将容器化的Python服务部署到生产环境
Core Concepts
核心概念
FastAPI Fundamentals
FastAPI 基础
FastAPI is a modern, high-performance web framework for building APIs with Python 3.7+ based on standard Python type hints.
Key Features:
- Fast: Very high performance, on par with NodeJS and Go (powered by Starlette and Pydantic)
- Fast to code: Increase development speed by 200-300%
- Fewer bugs: Reduce human-induced errors by about 40%
- Intuitive: Great editor support with autocompletion everywhere
- Easy: Designed to be easy to learn and use
- Short: Minimize code duplication
- Robust: Production-ready code with automatic interactive documentation
- Standards-based: Based on OpenAPI and JSON Schema
FastAPI是一个现代、高性能的Web框架,用于使用Python 3.7+基于标准Python类型提示构建API。
关键特性:
- 快速:极高的性能,与NodeJS和Go相当(基于Starlette和Pydantic)
- 编码快速:将开发速度提升200-300%
- 更少错误:减少约40%的人为错误
- 直观:出色的编辑器支持,全场景自动补全
- 易用:设计为易于学习和使用
- 简洁:最小化代码重复
- 健壮:具备自动交互式文档的生产级代码
- 基于标准:基于OpenAPI和JSON Schema
Async/Await Programming
Async/Await 编程
FastAPI fully supports asynchronous request handling using Python's / syntax:
asyncawaitpython
from fastapi import FastAPI
app = FastAPI()
@app.get('/burgers')
async def read_burgers():
burgers = await get_burgers(2)
return burgersWhen to use :
async def- Database queries with async drivers
- External API calls
- File I/O operations
- Long-running computations that can be awaited
- WebSocket connections
- Background task processing
When to use regular :
def- Simple CRUD operations
- Synchronous database libraries
- CPU-bound operations
- Quick data transformations
FastAPI完全支持使用Python的/语法处理异步请求:
asyncawaitpython
from fastapi import FastAPI
app = FastAPI()
@app.get('/burgers')
async def read_burgers():
burgers = await get_burgers(2)
return burgers何时使用:
async def- 使用异步驱动的数据库查询
- 外部API调用
- 文件I/O操作
- 可等待的长时间计算
- WebSocket连接
- 后台任务处理
何时使用常规:
def- 简单CRUD操作
- 同步数据库库
- CPU密集型操作
- 快速数据转换
Dependency Injection System
依赖注入系统
FastAPI's dependency injection is one of its most powerful features, enabling:
- Code reusability across endpoints
- Shared logic implementation
- Database connection management
- Authentication and authorization
- Request validation
- Background task scheduling
Basic Dependency Pattern:
python
from typing import Annotated, Union
from fastapi import Depends, FastAPI
app = FastAPI()FastAPI的依赖注入是其最强大的特性之一,支持:
- 端点间的代码复用
- 共享逻辑实现
- 数据库连接管理
- 认证与授权
- 请求验证
- 后台任务调度
基础依赖模式:
python
from typing import Annotated, Union
from fastapi import Depends, FastAPI
app = FastAPI()Dependency function
依赖函数
async def common_parameters(
q: Union[str, None] = None,
skip: int = 0,
limit: int = 100
):
return {"q": q, "skip": skip, "limit": limit}
async def common_parameters(
q: Union[str, None] = None,
skip: int = 0,
limit: int = 100
):
return {"q": q, "skip": skip, "limit": limit}
Using dependency in multiple endpoints
在多个端点中使用依赖
@app.get("/items/")
async def read_items(commons: Annotated[dict, Depends(common_parameters)]):
return {"params": commons, "items": ["item1", "item2"]}
@app.get("/users/")
async def read_users(commons: Annotated[dict, Depends(common_parameters)]):
return {"params": commons, "users": ["user1", "user2"]}
undefined@app.get("/items/")
async def read_items(commons: Annotated[dict, Depends(common_parameters)]):
return {"params": commons, "items": ["item1", "item2"]}
@app.get("/users/")
async def read_users(commons: Annotated[dict, Depends(common_parameters)]):
return {"params": commons, "users": ["user1", "user2"]}
undefinedMicroservices Architecture Patterns
微服务架构模式
Service Design Principles
服务设计原则
1. Single Responsibility
- Each microservice handles one business capability
- Clear boundaries and minimal coupling
- Independent deployment and scaling
2. API-First Design
- Design APIs before implementation
- Use OpenAPI schemas for contracts
- Version APIs appropriately
3. Database Per Service
- Each service owns its data
- No direct database sharing
- Use APIs for cross-service data access
4. Stateless Services
- Services don't maintain client session state
- Enables horizontal scaling
- Use external storage for session data
1. 单一职责
- 每个微服务负责一项业务能力
- 清晰的边界和最小耦合
- 独立部署和扩展
2. API优先设计
- 在实现前设计API
- 使用OpenAPI schema作为契约
- 合理版本化API
3. 服务专属数据库
- 每个服务拥有自己的数据
- 不直接共享数据库
- 使用API进行跨服务数据访问
4. 无状态服务
- 服务不维护客户端会话状态
- 支持水平扩展
- 使用外部存储保存会话数据
Service Communication Patterns
服务通信模式
Synchronous Communication (REST APIs):
python
import httpx
from fastapi import FastAPI, HTTPException
app = FastAPI()
@app.get("/orders/{order_id}")
async def get_order(order_id: str):
# Call another microservice
async with httpx.AsyncClient() as client:
try:
response = await client.get(f"http://inventory-service/stock/{order_id}")
inventory_data = response.json()
except httpx.HTTPError:
raise HTTPException(status_code=503, detail="Inventory service unavailable")
return {"order_id": order_id, "inventory": inventory_data}Event-Driven Communication:
- Use message brokers (RabbitMQ, Kafka, Redis)
- Publish/Subscribe patterns
- Asynchronous processing
- Loose coupling between services
同步通信(REST APIs):
python
import httpx
from fastapi import FastAPI, HTTPException
app = FastAPI()
@app.get("/orders/{order_id}")
async def get_order(order_id: str):
# 调用另一个微服务
async with httpx.AsyncClient() as client:
try:
response = await client.get(f"http://inventory-service/stock/{order_id}")
inventory_data = response.json()
except httpx.HTTPError:
raise HTTPException(status_code=503, detail="Inventory service unavailable")
return {"order_id": order_id, "inventory": inventory_data}事件驱动通信:
- 使用消息中间件(RabbitMQ、Kafka、Redis)
- 发布/订阅模式
- 异步处理
- 服务间松耦合
Service Discovery
服务发现
Options:
- Environment variables for simple setups
- Consul, Eureka for dynamic discovery
- Kubernetes DNS for K8s deployments
- API Gateway for centralized routing
可选方案:
- 简单环境下使用环境变量
- 动态发现使用Consul、Eureka
- Kubernetes部署使用K8s DNS
- 集中式路由使用API网关
REST API Design Patterns
REST API设计模式
Resource Modeling
资源建模
RESTful Resource Design:
python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
app = FastAPI()RESTful资源设计:
python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
app = FastAPI()Resource Models
资源模型
class ItemBase(BaseModel):
name: str
description: Optional[str] = None
price: float
tax: Optional[float] = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config:
from_attributes = Trueclass ItemBase(BaseModel):
name: str
description: Optional[str] = None
price: float
tax: Optional[float] = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config:
from_attributes = TrueCollection Endpoints
集合端点
@app.get("/items/", response_model=List[Item])
async def list_items(skip: int = 0, limit: int = 100):
"""List all items with pagination"""
items = await get_items_from_db(skip=skip, limit=limit)
return items
@app.post("/items/", response_model=Item, status_code=201)
async def create_item(item: ItemCreate):
"""Create a new item"""
new_item = await save_item_to_db(item)
return new_item
@app.get("/items/", response_model=List[Item])
async def list_items(skip: int = 0, limit: int = 100):
"""分页列出所有条目"""
items = await get_items_from_db(skip=skip, limit=limit)
return items
@app.post("/items/", response_model=Item, status_code=201)
async def create_item(item: ItemCreate):
"""创建新条目"""
new_item = await save_item_to_db(item)
return new_item
Resource Endpoints
资源端点
@app.get("/items/{item_id}", response_model=Item)
async def read_item(item_id: int):
"""Get a specific item by ID"""
item = await get_item_from_db(item_id)
if item is None:
raise HTTPException(status_code=404, detail="Item not found")
return item
@app.put("/items/{item_id}", response_model=Item)
async def update_item(item_id: int, item: ItemCreate):
"""Update an existing item"""
updated_item = await update_item_in_db(item_id, item)
if updated_item is None:
raise HTTPException(status_code=404, detail="Item not found")
return updated_item
@app.delete("/items/{item_id}", status_code=204)
async def delete_item(item_id: int):
"""Delete an item"""
success = await delete_item_from_db(item_id)
if not success:
raise HTTPException(status_code=404, detail="Item not found")
undefined@app.get("/items/{item_id}", response_model=Item)
async def read_item(item_id: int):
"""通过ID获取特定条目"""
item = await get_item_from_db(item_id)
if item is None:
raise HTTPException(status_code=404, detail="Item not found")
return item
@app.put("/items/{item_id}", response_model=Item)
async def update_item(item_id: int, item: ItemCreate):
"""更新现有条目"""
updated_item = await update_item_in_db(item_id, item)
if updated_item is None:
raise HTTPException(status_code=404, detail="Item not found")
return updated_item
@app.delete("/items/{item_id}", status_code=204)
async def delete_item(item_id: int):
"""删除条目"""
success = await delete_item_from_db(item_id)
if not success:
raise HTTPException(status_code=404, detail="Item not found")
undefinedAPI Versioning
API版本化
URL Path Versioning (Recommended):
python
from fastapi import FastAPI, APIRouter
app = FastAPI()URL路径版本化(推荐):
python
from fastapi import FastAPI, APIRouter
app = FastAPI()V1 API Router
V1 API路由
v1_router = APIRouter(prefix="/api/v1")
@v1_router.get("/users/")
async def list_users_v1():
return {"version": "v1", "users": []}
v1_router = APIRouter(prefix="/api/v1")
@v1_router.get("/users/")
async def list_users_v1():
return {"version": "v1", "users": []}
V2 API Router
V2 API路由
v2_router = APIRouter(prefix="/api/v2")
@v2_router.get("/users/")
async def list_users_v2():
return {"version": "v2", "users": [], "metadata": {}}
app.include_router(v1_router)
app.include_router(v2_router)
undefinedv2_router = APIRouter(prefix="/api/v2")
@v2_router.get("/users/")
async def list_users_v2():
return {"version": "v2", "users": [], "metadata": {}}
app.include_router(v1_router)
app.include_router(v2_router)
undefinedRequest/Response Validation
请求/响应验证
FastAPI uses Pydantic for automatic validation:
python
from pydantic import BaseModel, Field, EmailStr, validator
from typing import Optional
from datetime import datetime
class UserCreate(BaseModel):
username: str = Field(..., min_length=3, max_length=50)
email: EmailStr
password: str = Field(..., min_length=8)
age: Optional[int] = Field(None, ge=0, le=150)
@validator('username')
def username_alphanumeric(cls, v):
assert v.isalnum(), 'must be alphanumeric'
return v
@validator('password')
def password_strength(cls, v):
if not any(char.isdigit() for char in v):
raise ValueError('must contain at least one digit')
if not any(char.isupper() for char in v):
raise ValueError('must contain at least one uppercase letter')
return v
class UserResponse(BaseModel):
id: int
username: str
email: EmailStr
created_at: datetime
class Config:
from_attributes = True
@app.post("/users/", response_model=UserResponse, status_code=201)
async def create_user(user: UserCreate):
# Automatic validation of request body
new_user = await save_user(user)
return new_userFastAPI使用Pydantic实现自动验证:
python
from pydantic import BaseModel, Field, EmailStr, validator
from typing import Optional
from datetime import datetime
class UserCreate(BaseModel):
username: str = Field(..., min_length=3, max_length=50)
email: EmailStr
password: str = Field(..., min_length=8)
age: Optional[int] = Field(None, ge=0, le=150)
@validator('username')
def username_alphanumeric(cls, v):
assert v.isalnum(), 'must be alphanumeric'
return v
@validator('password')
def password_strength(cls, v):
if not any(char.isdigit() for char in v):
raise ValueError('must contain at least one digit')
if not any(char.isupper() for char in v):
raise ValueError('must contain at least one uppercase letter')
return v
class UserResponse(BaseModel):
id: int
username: str
email: EmailStr
created_at: datetime
class Config:
from_attributes = True
@app.post("/users/", response_model=UserResponse, status_code=201)
async def create_user(user: UserCreate):
# 自动验证请求体
new_user = await save_user(user)
return new_userAdvanced Dependency Injection
高级依赖注入
Dependencies with Yield
带Yield的依赖
Dependencies can use for setup/teardown operations:
yieldpython
from fastapi import FastAPI, Depends
app = FastAPI()依赖可以使用实现初始化/清理操作:
yieldpython
from fastapi import FastAPI, Depends
app = FastAPI()Database dependency with cleanup
带清理操作的数据库依赖
async def get_db():
db = await connect_to_database()
try:
yield db
finally:
await db.close()
@app.get("/items/")
async def read_items(db = Depends(get_db)):
items = await db.query("SELECT * FROM items")
return items
**Advanced Resource Management:**
```python
from fastapi import Depends, HTTPException
async def get_database():
with Session() as session:
try:
yield session
except HTTPException:
session.rollback()
raise
finally:
session.close()
@app.post("/users/")
async def create_user(user: UserCreate, db = Depends(get_database)):
try:
new_user = db.add(User(**user.dict()))
db.commit()
return new_user
except Exception as e:
# Session automatically rolled back by dependency
raise HTTPException(status_code=500, detail=str(e))async def get_db():
db = await connect_to_database()
try:
yield db
finally:
await db.close()
@app.get("/items/")
async def read_items(db = Depends(get_db)):
items = await db.query("SELECT * FROM items")
return items
**高级资源管理:**
```python
from fastapi import Depends, HTTPException
async def get_database():
with Session() as session:
try:
yield session
except HTTPException:
session.rollback()
raise
finally:
session.close()
@app.post("/users/")
async def create_user(user: UserCreate, db = Depends(get_database)):
try:
new_user = db.add(User(**user.dict()))
db.commit()
return new_user
except Exception as e:
# 依赖自动回滚会话
raise HTTPException(status_code=500, detail=str(e))Sub-Dependencies
子依赖
Dependencies can depend on other dependencies:
python
from typing import Optional
from fastapi import FastAPI, Depends, Cookie
app = FastAPI()
async def query_extractor(q: Optional[str] = None):
return q
async def query_or_cookie_extractor(
q: str = Depends(query_extractor),
last_query: Optional[str] = Cookie(None)
):
if not q:
return last_query
return q
@app.get('/items/')
async def read_items(query: str = Depends(query_or_cookie_extractor)):
return {'query': query}依赖可以依赖其他依赖:
python
from typing import Optional
from fastapi import FastAPI, Depends, Cookie
app = FastAPI()
async def query_extractor(q: Optional[str] = None):
return q
async def query_or_cookie_extractor(
q: str = Depends(query_extractor),
last_query: Optional[str] = Cookie(None)
):
if not q:
return last_query
return q
@app.get('/items/')
async def read_items(query: str = Depends(query_or_cookie_extractor)):
return {'query': query}Class-Based Dependencies
基于类的依赖
Use classes for complex dependency logic:
python
from typing import Optional
from fastapi import FastAPI, Depends
app = FastAPI()
class CommonQueryParams:
def __init__(
self,
q: Optional[str] = None,
skip: int = 0,
limit: int = 100,
):
self.q = q
self.skip = skip
self.limit = limit
@app.get("/items/")
async def read_items(commons: CommonQueryParams = Depends(CommonQueryParams)):
return {"q": commons.q, "skip": commons.skip, "limit": commons.limit}使用类实现复杂依赖逻辑:
python
from typing import Optional
from fastapi import FastAPI, Depends
app = FastAPI()
class CommonQueryParams:
def __init__(
self,
q: Optional[str] = None,
skip: int = 0,
limit: int = 100,
):
self.q = q
self.skip = skip
self.limit = limit
@app.get("/items/")
async def read_items(commons: CommonQueryParams = Depends(CommonQueryParams)):
return {"q": commons.q, "skip": commons.skip, "limit": commons.limit}Shortcut syntax
简化语法
@app.get("/users/")
async def read_users(commons: CommonQueryParams = Depends()):
return commons
undefined@app.get("/users/")
async def read_users(commons: CommonQueryParams = Depends()):
return commons
undefinedGlobal Dependencies
全局依赖
Apply dependencies to all routes:
python
from fastapi import FastAPI, Depends, Header, HTTPException
async def verify_token(x_token: str = Header(...)):
if x_token != "secret-token":
raise HTTPException(status_code=400, detail="Invalid X-Token header")
return x_token
async def verify_key(x_key: str = Header(...)):
if x_key != "secret-key":
raise HTTPException(status_code=400, detail="Invalid X-Key header")
return x_key将依赖应用于所有路由:
python
from fastapi import FastAPI, Depends, Header, HTTPException
async def verify_token(x_token: str = Header(...)):
if x_token != "secret-token":
raise HTTPException(status_code=400, detail="Invalid X-Token header")
return x_token
async def verify_key(x_key: str = Header(...)):
if x_key != "secret-key":
raise HTTPException(status_code=400, detail="Invalid X-Key header")
return x_keyApply to entire application
应用于整个应用
app = FastAPI(dependencies=[Depends(verify_token), Depends(verify_key)])
app = FastAPI(dependencies=[Depends(verify_token), Depends(verify_key)])
Apply to router
应用于路由
from fastapi import APIRouter
router = APIRouter(
prefix="/items",
dependencies=[Depends(verify_token)]
)
@router.get("/")
async def read_items():
return [{"item_id": "Foo"}]
app.include_router(router)
undefinedfrom fastapi import APIRouter
router = APIRouter(
prefix="/items",
dependencies=[Depends(verify_token)]
)
@router.get("/")
async def read_items():
return [{"item_id": "Foo"}]
app.include_router(router)
undefinedReusable Dependency Aliases
可复用依赖别名
Create type aliases for common dependencies:
python
from typing import Annotated
from fastapi import Depends为常用依赖创建类型别名:
python
from typing import Annotated
from fastapi import DependsDefine reusable dependency types
定义可复用依赖类型
async def get_current_user():
return {"username": "johndoe"}
CurrentUser = Annotated[dict, Depends(get_current_user)]
async def get_current_user():
return {"username": "johndoe"}
CurrentUser = Annotated[dict, Depends(get_current_user)]
Use across multiple endpoints
在多个端点中使用
@app.get("/items/")
def read_items(user: CurrentUser):
return {"user": user, "items": []}
@app.post("/items/")
def create_item(user: CurrentUser, item: Item):
return {"user": user, "item": item}
@app.delete("/items/{item_id}")
def delete_item(user: CurrentUser, item_id: int):
return {"user": user, "deleted": item_id}
undefined@app.get("/items/")
def read_items(user: CurrentUser):
return {"user": user, "items": []}
@app.post("/items/")
def create_item(user: CurrentUser, item: Item):
return {"user": user, "item": item}
@app.delete("/items/{item_id}")
def delete_item(user: CurrentUser, item_id: int):
return {"user": user, "deleted": item_id}
undefinedAuthentication & Authorization
认证与授权
OAuth2 with Password Flow
带密码流的OAuth2
python
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from typing import Optional
import jwt
from datetime import datetime, timedelta
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
class Token(BaseModel):
access_token: str
token_type: str
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except jwt.PyJWTError:
raise credentials_exception
user = await get_user_from_db(username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user = await authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_userpython
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from typing import Optional
import jwt
from datetime import datetime, timedelta
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
class Token(BaseModel):
access_token: str
token_type: str
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except jwt.PyJWTError:
raise credentials_exception
user = await get_user_from_db(username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user = await authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_userOAuth2 with Scopes
带作用域的OAuth2
python
from fastapi.security import SecurityScopes
from pydantic import ValidationError
async def get_current_user(
security_scopes: SecurityScopes,
token: str = Depends(oauth2_scheme)
):
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
token_scopes = payload.get("scopes", [])
except (jwt.PyJWTError, ValidationError):
raise credentials_exception
for scope in security_scopes.scopes:
if scope not in token_scopes:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
user = await get_user(username)
if user is None:
raise credentials_exception
return user
@app.get("/items/", dependencies=[Security(get_current_user, scopes=["items:read"])])
async def read_items():
return [{"item_id": "Foo"}]
@app.post("/items/", dependencies=[Security(get_current_user, scopes=["items:write"])])
async def create_item(item: Item):
return itempython
from fastapi.security import SecurityScopes
from pydantic import ValidationError
async def get_current_user(
security_scopes: SecurityScopes,
token: str = Depends(oauth2_scheme)
):
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
token_scopes = payload.get("scopes", [])
except (jwt.PyJWTError, ValidationError):
raise credentials_exception
for scope in security_scopes.scopes:
if scope not in token_scopes:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
user = await get_user(username)
if user is None:
raise credentials_exception
return user
@app.get("/items/", dependencies=[Security(get_current_user, scopes=["items:read"])])
async def read_items():
return [{"item_id": "Foo"}]
@app.post("/items/", dependencies=[Security(get_current_user, scopes=["items:write"])])
async def create_item(item: Item):
return itemBackground Tasks
后台任务
Simple Background Tasks
简单后台任务
python
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def write_log(message: str):
with open("log.txt", mode="a") as log_file:
log_file.write(message)
@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(write_log, f"Notification sent to {email}\n")
return {"message": "Notification sent in the background"}python
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def write_log(message: str):
with open("log.txt", mode="a") as log_file:
log_file.write(message)
@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(write_log, f"Notification sent to {email}\n")
return {"message": "Notification sent in the background"}Background Tasks with Dependencies
带依赖的后台任务
python
from fastapi import BackgroundTasks, Depends
from typing import Annotated
def write_log(message: str):
with open("log.txt", mode="a") as log_file:
log_file.write(message)
async def get_query_and_log(
query: str | None = None,
background_tasks: BackgroundTasks = Depends()
):
if query:
background_tasks.add_task(write_log, f"query: {query}\n")
return query
@app.post("/send-notification/{email}")
async def send_notification(
email: str,
background_tasks: BackgroundTasks,
query: Annotated[str | None, Depends(get_query_and_log)],
):
background_tasks.add_task(write_log, f"email: {email}, query: {query}\n")
return {"message": "Notification sent"}python
from fastapi import BackgroundTasks, Depends
from typing import Annotated
def write_log(message: str):
with open("log.txt", mode="a") as log_file:
log_file.write(message)
async def get_query_and_log(
query: str | None = None,
background_tasks: BackgroundTasks = Depends()
):
if query:
background_tasks.add_task(write_log, f"query: {query}\n")
return query
@app.post("/send-notification/{email}")
async def send_notification(
email: str,
background_tasks: BackgroundTasks,
query: Annotated[str | None, Depends(get_query_and_log)],
):
background_tasks.add_task(write_log, f"email: {email}, query: {query}\n")
return {"message": "Notification sent"}WebSocket Support
WebSocket支持
Basic WebSocket
基础WebSocket
python
from fastapi import WebSocket, WebSocketDisconnect
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message received: {data}")
except WebSocketDisconnect:
print("Client disconnected")python
from fastapi import WebSocket, WebSocketDisconnect
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message received: {data}")
except WebSocketDisconnect:
print("Client disconnected")WebSocket with Dependencies
带依赖的WebSocket
python
from fastapi import WebSocket, Depends, Query, Cookie, WebSocketException, status
async def get_cookie_or_token(
websocket: WebSocket,
session: str | None = Cookie(None),
token: str | None = Query(None),
):
if session is None and token is None:
raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)
return session or token
@app.websocket("/ws/{item_id}")
async def websocket_endpoint(
websocket: WebSocket,
item_id: str,
q: int | None = None,
cookie_or_token: str = Depends(get_cookie_or_token),
):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(
f"Session: {cookie_or_token}, Item: {item_id}, Data: {data}"
)
except WebSocketDisconnect:
print(f"Client {item_id} disconnected")python
from fastapi import WebSocket, Depends, Query, Cookie, WebSocketException, status
async def get_cookie_or_token(
websocket: WebSocket,
session: str | None = Cookie(None),
token: str | None = Query(None),
):
if session is None and token is None:
raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)
return session or token
@app.websocket("/ws/{item_id}")
async def websocket_endpoint(
websocket: WebSocket,
item_id: str,
q: int | None = None,
cookie_or_token: str = Depends(get_cookie_or_token),
):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(
f"Session: {cookie_or_token}, Item: {item_id}, Data: {data}"
)
except WebSocketDisconnect:
print(f"Client {item_id} disconnected")WebSocket Connection Manager
WebSocket连接管理器
python
from typing import List
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.send_personal_message(f"You wrote: {data}", websocket)
await manager.broadcast(f"Client #{client_id} says: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"Client #{client_id} left the chat")python
from typing import List
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.send_personal_message(f"You wrote: {data}", websocket)
await manager.broadcast(f"Client #{client_id} says: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"Client #{client_id} left the chat")Database Integration
数据库集成
SQLAlchemy with Async
异步SQLAlchemy
python
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, Integer, String
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"
engine = create_async_engine(DATABASE_URL, echo=True)
async_session_maker = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True)
username = Column(String, unique=True, index=True)
async def get_db() -> AsyncSession:
async with async_session_maker() as session:
try:
yield session
finally:
await session.close()
@app.get("/users/{user_id}")
async def read_user(user_id: int, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).filter(User.id == user_id))
user = result.scalars().first()
if user is None:
raise HTTPException(status_code=404, detail="User not found")
return userpython
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, Integer, String
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"
engine = create_async_engine(DATABASE_URL, echo=True)
async_session_maker = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True)
username = Column(String, unique=True, index=True)
async def get_db() -> AsyncSession:
async with async_session_maker() as session:
try:
yield session
finally:
await session.close()
@app.get("/users/{user_id}")
async def read_user(user_id: int, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).filter(User.id == user_id))
user = result.scalars().first()
if user is None:
raise HTTPException(status_code=404, detail="User not found")
return userMongoDB with Motor
MongoDB与Motor
python
from motor.motor_asyncio import AsyncIOMotorClient
from fastapi import Depends
MONGODB_URL = "mongodb://localhost:27017"
client = AsyncIOMotorClient(MONGODB_URL)
database = client.mydatabase
async def get_database():
return database
@app.post("/items/")
async def create_item(item: Item, db = Depends(get_database)):
result = await db.items.insert_one(item.dict())
return {"id": str(result.inserted_id)}
@app.get("/items/{item_id}")
async def read_item(item_id: str, db = Depends(get_database)):
from bson import ObjectId
item = await db.items.find_one({"_id": ObjectId(item_id)})
if item is None:
raise HTTPException(status_code=404, detail="Item not found")
item["_id"] = str(item["_id"])
return itempython
from motor.motor_asyncio import AsyncIOMotorClient
from fastapi import Depends
MONGODB_URL = "mongodb://localhost:27017"
client = AsyncIOMotorClient(MONGODB_URL)
database = client.mydatabase
async def get_database():
return database
@app.post("/items/")
async def create_item(item: Item, db = Depends(get_database)):
result = await db.items.insert_one(item.dict())
return {"id": str(result.inserted_id)}
@app.get("/items/{item_id}")
async def read_item(item_id: str, db = Depends(get_database)):
from bson import ObjectId
item = await db.items.find_one({"_id": ObjectId(item_id)})
if item is None:
raise HTTPException(status_code=404, detail="Item not found")
item["_id"] = str(item["_id"])
return itemError Handling
错误处理
Custom Exception Handlers
自定义异常处理器
python
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
class UnicornException(Exception):
def __init__(self, name: str):
self.name = name
@app.exception_handler(UnicornException)
async def unicorn_exception_handler(request: Request, exc: UnicornException):
return JSONResponse(
status_code=418,
content={"message": f"Oops! {exc.name} did something wrong."},
)
@app.get("/unicorns/{name}")
async def read_unicorn(name: str):
if name == "yolo":
raise UnicornException(name=name)
return {"unicorn_name": name}python
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
class UnicornException(Exception):
def __init__(self, name: str):
self.name = name
@app.exception_handler(UnicornException)
async def unicorn_exception_handler(request: Request, exc: UnicornException):
return JSONResponse(
status_code=418,
content={"message": f"Oops! {exc.name} did something wrong."},
)
@app.get("/unicorns/{name}")
async def read_unicorn(name: str):
if name == "yolo":
raise UnicornException(name=name)
return {"unicorn_name": name}Override Default Exception Handlers
覆盖默认异常处理器
python
from fastapi.exceptions import RequestValidationError
from fastapi.responses import PlainTextResponse
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
return PlainTextResponse(str(exc), status_code=400)python
from fastapi.exceptions import RequestValidationError
from fastapi.responses import PlainTextResponse
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
return PlainTextResponse(str(exc), status_code=400)Testing
测试
Test Setup with TestClient
使用TestClient搭建测试环境
python
from fastapi.testclient import TestClient
from main import app
client = TestClient(app)
def test_read_main():
response = client.get("/")
assert response.status_code == 200
assert response.json() == {"message": "Hello World"}
def test_create_item():
response = client.post(
"/items/",
json={"name": "Foo", "price": 45.2}
)
assert response.status_code == 201
assert response.json()["name"] == "Foo"
def test_read_item():
response = client.get("/items/1")
assert response.status_code == 200
assert "name" in response.json()python
from fastapi.testclient import TestClient
from main import app
client = TestClient(app)
def test_read_main():
response = client.get("/")
assert response.status_code == 200
assert response.json() == {"message": "Hello World"}
def test_create_item():
response = client.post(
"/items/",
json={"name": "Foo", "price": 45.2}
)
assert response.status_code == 201
assert response.json()["name"] == "Foo"
def test_read_item():
response = client.get("/items/1")
assert response.status_code == 200
assert "name" in response.json()Testing with Dependencies
带依赖的测试
python
from fastapi import Depends
async def override_get_db():
return {"test": "database"}
app.dependency_overrides[get_db] = override_get_db
def test_with_dependency():
response = client.get("/items/")
assert response.status_code == 200
# Uses overridden dependencypython
from fastapi import Depends
async def override_get_db():
return {"test": "database"}
app.dependency_overrides[get_db] = override_get_db
def test_with_dependency():
response = client.get("/items/")
assert response.status_code == 200
# 使用重写后的依赖Async Testing
异步测试
python
import pytest
from httpx import AsyncClient
from main import app
@pytest.mark.asyncio
async def test_read_items():
async with AsyncClient(app=app, base_url="http://test") as ac:
response = await ac.get("/items/")
assert response.status_code == 200
assert isinstance(response.json(), list)python
import pytest
from httpx import AsyncClient
from main import app
@pytest.mark.asyncio
async def test_read_items():
async with AsyncClient(app=app, base_url="http://test") as ac:
response = await ac.get("/items/")
assert response.status_code == 200
assert isinstance(response.json(), list)Production Deployment
生产环境部署
Docker Configuration
Docker配置
Dockerfile:
dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY ./app /app
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]Multi-stage Build:
dockerfile
FROM python:3.11-slim as builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --user --no-cache-dir -r requirements.txt
FROM python:3.11-slim
WORKDIR /app
COPY /root/.local /root/.local
COPY ./app /app
ENV PATH=/root/.local/bin:$PATH
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]docker-compose.yml:
yaml
version: '3.8'
services:
api:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://user:password@db:5432/mydb
- REDIS_URL=redis://redis:6379
depends_on:
- db
- redis
volumes:
- ./app:/app
command: uvicorn main:app --host 0.0.0.0 --port 8000 --reload
db:
image: postgres:15
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
- POSTGRES_DB=mydb
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
postgres_data:Dockerfile:
dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY ./app /app
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]多阶段构建:
dockerfile
FROM python:3.11-slim as builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --user --no-cache-dir -r requirements.txt
FROM python:3.11-slim
WORKDIR /app
COPY /root/.local /root/.local
COPY ./app /app
ENV PATH=/root/.local/bin:$PATH
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]docker-compose.yml:
yaml
version: '3.8'
services:
api:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://user:password@db:5432/mydb
- REDIS_URL=redis://redis:6379
depends_on:
- db
- redis
volumes:
- ./app:/app
command: uvicorn main:app --host 0.0.0.0 --port 8000 --reload
db:
image: postgres:15
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
- POSTGRES_DB=mydb
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
postgres_data:Kubernetes Deployment
Kubernetes部署
deployment.yaml:
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: fastapi-service
spec:
replicas: 3
selector:
matchLabels:
app: fastapi
template:
metadata:
labels:
app: fastapi
spec:
containers:
- name: fastapi
image: myregistry/fastapi-app:latest
ports:
- containerPort: 8000
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: db-secret
key: url
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: fastapi-service
spec:
selector:
app: fastapi
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancerdeployment.yaml:
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: fastapi-service
spec:
replicas: 3
selector:
matchLabels:
app: fastapi
template:
metadata:
labels:
app: fastapi
spec:
containers:
- name: fastapi
image: myregistry/fastapi-app:latest
ports:
- containerPort: 8000
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: db-secret
key: url
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: fastapi-service
spec:
selector:
app: fastapi
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancerEnvironment Configuration
环境配置
python
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
app_name: str = "FastAPI Microservice"
database_url: str
redis_url: str
secret_key: str
algorithm: str = "HS256"
access_token_expire_minutes: int = 30
class Config:
env_file = ".env"
@lru_cache()
def get_settings():
return Settings()
@app.get("/info")
async def info(settings: Settings = Depends(get_settings)):
return {"app_name": settings.app_name}python
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
app_name: str = "FastAPI Microservice"
database_url: str
redis_url: str
secret_key: str
algorithm: str = "HS256"
access_token_expire_minutes: int = 30
class Config:
env_file = ".env"
@lru_cache()
def get_settings():
return Settings()
@app.get("/info")
async def info(settings: Settings = Depends(get_settings)):
return {"app_name": settings.app_name}Health Checks
健康检查
python
@app.get("/health")
async def health_check():
return {"status": "healthy"}
@app.get("/ready")
async def readiness_check(db = Depends(get_db)):
try:
# Check database connectivity
await db.execute("SELECT 1")
return {"status": "ready"}
except Exception as e:
raise HTTPException(status_code=503, detail="Service not ready")python
@app.get("/health")
async def health_check():
return {"status": "healthy"}
@app.get("/ready")
async def readiness_check(db = Depends(get_db)):
try:
# 检查数据库连接
await db.execute("SELECT 1")
return {"status": "ready"}
except Exception as e:
raise HTTPException(status_code=503, detail="Service not ready")Monitoring & Logging
监控与日志
Structured Logging
结构化日志
python
import logging
import json
from datetime import datetime
class JSONFormatter(logging.Formatter):
def format(self, record):
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
}
return json.dumps(log_data)
logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
logger.setLevel(logging.INFO)
@app.middleware("http")
async def log_requests(request: Request, call_next):
logger.info(f"Request: {request.method} {request.url}")
response = await call_next(request)
logger.info(f"Response: {response.status_code}")
return responsepython
import logging
import json
from datetime import datetime
class JSONFormatter(logging.Formatter):
def format(self, record):
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
}
return json.dumps(log_data)
logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
logger.setLevel(logging.INFO)
@app.middleware("http")
async def log_requests(request: Request, call_next):
logger.info(f"Request: {request.method} {request.url}")
response = await call_next(request)
logger.info(f"Response: {response.status_code}")
return responsePrometheus Metrics
Prometheus指标
python
from prometheus_client import Counter, Histogram, generate_latest
from fastapi.responses import Response
import time
REQUEST_COUNT = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status']
)
REQUEST_DURATION = Histogram(
'http_request_duration_seconds',
'HTTP request duration',
['method', 'endpoint']
)
@app.middleware("http")
async def prometheus_middleware(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
REQUEST_DURATION.labels(
method=request.method,
endpoint=request.url.path
).observe(duration)
return response
@app.get("/metrics")
async def metrics():
return Response(generate_latest(), media_type="text/plain")python
from prometheus_client import Counter, Histogram, generate_latest
from fastapi.responses import Response
import time
REQUEST_COUNT = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status']
)
REQUEST_DURATION = Histogram(
'http_request_duration_seconds',
'HTTP request duration',
['method', 'endpoint']
)
@app.middleware("http")
async def prometheus_middleware(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
REQUEST_DURATION.labels(
method=request.method,
endpoint=request.url.path
).observe(duration)
return response
@app.get("/metrics")
async def metrics():
return Response(generate_latest(), media_type="text/plain")Best Practices
最佳实践
1. Project Structure
1. 项目结构
fastapi-service/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── config.py
│ ├── dependencies.py
│ ├── models/
│ │ ├── __init__.py
│ │ ├── user.py
│ │ └── item.py
│ ├── schemas/
│ │ ├── __init__.py
│ │ ├── user.py
│ │ └── item.py
│ ├── routers/
│ │ ├── __init__.py
│ │ ├── users.py
│ │ └── items.py
│ ├── services/
│ │ ├── __init__.py
│ │ ├── user_service.py
│ │ └── item_service.py
│ └── database.py
├── tests/
│ ├── __init__.py
│ ├── test_users.py
│ └── test_items.py
├── Dockerfile
├── docker-compose.yml
├── requirements.txt
└── .envfastapi-service/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── config.py
│ ├── dependencies.py
│ ├── models/
│ │ ├── __init__.py
│ │ ├── user.py
│ │ └── item.py
│ ├── schemas/
│ │ ├── __init__.py
│ │ ├── user.py
│ │ └── item.py
│ ├── routers/
│ │ ├── __init__.py
│ │ ├── users.py
│ │ └── items.py
│ ├── services/
│ │ ├── __init__.py
│ │ ├── user_service.py
│ │ └── item_service.py
│ └── database.py
├── tests/
│ ├── __init__.py
│ ├── test_users.py
│ └── test_items.py
├── Dockerfile
├── docker-compose.yml
├── requirements.txt
└── .env2. Separation of Concerns
2. 关注点分离
models.py - Database models:
python
from sqlalchemy import Column, Integer, String
from .database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True)schemas.py - Pydantic schemas:
python
from pydantic import BaseModel, EmailStr
class UserCreate(BaseModel):
email: EmailStr
password: str
class UserResponse(BaseModel):
id: int
email: EmailStr
class Config:
from_attributes = Trueservices.py - Business logic:
python
from sqlalchemy.orm import Session
class UserService:
def __init__(self, db: Session):
self.db = db
async def create_user(self, user_data: UserCreate):
# Business logic here
passrouters.py - API endpoints:
python
from fastapi import APIRouter, Depends
router = APIRouter(prefix="/users", tags=["users"])
@router.post("/", response_model=UserResponse)
async def create_user(user: UserCreate, service: UserService = Depends()):
return await service.create_user(user)models.py - 数据库模型:
python
from sqlalchemy import Column, Integer, String
from .database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True)schemas.py - Pydantic schema:
python
from pydantic import BaseModel, EmailStr
class UserCreate(BaseModel):
email: EmailStr
password: str
class UserResponse(BaseModel):
id: int
email: EmailStr
class Config:
from_attributes = Trueservices.py - 业务逻辑:
python
from sqlalchemy.orm import Session
class UserService:
def __init__(self, db: Session):
self.db = db
async def create_user(self, user_data: UserCreate):
# 业务逻辑实现
passrouters.py - API端点:
python
from fastapi import APIRouter, Depends
router = APIRouter(prefix="/users", tags=["users"])
@router.post("/", response_model=UserResponse)
async def create_user(user: UserCreate, service: UserService = Depends()):
return await service.create_user(user)3. Security Best Practices
3. 安全最佳实践
- Always use HTTPS in production
- Implement rate limiting
- Validate and sanitize all inputs
- Use dependency injection for auth
- Store secrets in environment variables
- Implement CORS properly
- Use security headers
- Hash passwords with bcrypt/argon2
- Implement JWT token expiration
- Use OAuth2 scopes for authorization
- 生产环境始终使用HTTPS
- 实现速率限制
- 验证和清理所有输入
- 使用依赖注入处理认证
- 将密钥存储在环境变量中
- 正确实现CORS
- 使用安全头
- 使用bcrypt/argon2哈希密码
- 实现JWT令牌过期机制
- 使用OAuth2作用域进行授权
4. Performance Optimization
4. 性能优化
- Use async/await for I/O operations
- Implement caching (Redis)
- Use database connection pooling
- Paginate large responses
- Compress responses (gzip)
- Use CDN for static assets
- Implement database indexes
- Use background tasks for heavy operations
- Monitor with APM tools
- Load test before production
- 对I/O操作使用async/await
- 实现缓存(Redis)
- 使用数据库连接池
- 对大响应进行分页
- 压缩响应(gzip)
- 静态资源使用CDN
- 实现数据库索引
- 后台任务处理重操作
- 使用APM工具监控
- 上线前进行负载测试
5. API Documentation
5. API文档
FastAPI automatically generates OpenAPI documentation, but you can enhance it:
python
app = FastAPI(
title="My Microservice API",
description="Production-ready microservice with FastAPI",
version="1.0.0",
terms_of_service="http://example.com/terms/",
contact={
"name": "API Support",
"url": "http://example.com/support",
"email": "support@example.com",
},
license_info={
"name": "Apache 2.0",
"url": "https://www.apache.org/licenses/LICENSE-2.0.html",
},
)
@app.get(
"/items/",
response_model=List[Item],
summary="List all items",
description="Retrieve a paginated list of items from the database",
response_description="List of items with pagination metadata",
)
async def list_items(
skip: int = Query(0, description="Number of items to skip"),
limit: int = Query(100, description="Maximum number of items to return"),
):
"""
List items with pagination support.
- **skip**: Number of items to skip (for pagination)
- **limit**: Maximum number of items to return
"""
return await get_items(skip=skip, limit=limit)FastAPI自动生成OpenAPI文档,你可以进一步增强:
python
app = FastAPI(
title="My Microservice API",
description="Production-ready microservice with FastAPI",
version="1.0.0",
terms_of_service="http://example.com/terms/",
contact={
"name": "API Support",
"url": "http://example.com/support",
"email": "support@example.com",
},
license_info={
"name": "Apache 2.0",
"url": "https://www.apache.org/licenses/LICENSE-2.0.html",
},
)
@app.get(
"/items/",
response_model=List[Item],
summary="List all items",
description="Retrieve a paginated list of items from the database",
response_description="List of items with pagination metadata",
)
async def list_items(
skip: int = Query(0, description="Number of items to skip"),
limit: int = Query(100, description="Maximum number of items to return"),
):
"""
List items with pagination support.
- **skip**: Number of items to skip (for pagination)
- **limit**: Maximum number of items to return
"""
return await get_items(skip=skip, limit=limit)Common Patterns & Examples
常见模式与示例
See EXAMPLES.md for 15+ detailed, production-ready examples covering:
- CRUD operations with async databases
- Authentication flows
- File upload handling
- Caching strategies
- Rate limiting
- Event-driven architectures
- Testing patterns
- Deployment configurations
- And more...
Skill Version: 1.0.0
Last Updated: October 2025
Skill Category: Backend Development, Microservices, Python, REST APIs
Compatible With: FastAPI 0.100+, Python 3.7+, Pydantic 2.0+
查看EXAMPLES.md获取15+个详细的生产级示例,包括:
- 异步数据库CRUD操作
- 认证流程
- 文件上传处理
- 缓存策略
- 速率限制
- 事件驱动架构
- 测试模式
- 部署配置
- 以及更多...
技能版本: 1.0.0
最后更新: 2025年10月
技能分类: 后端开发, 微服务, Python, REST APIs
兼容版本: FastAPI 0.100+, Python 3.7+, Pydantic 2.0+