fastmcp
Use this skill when building MCP (Model Context Protocol) servers with FastMCP in Python. FastMCP is a framework for creating servers that expose tools, resources, and prompts to LLMs like Claude. The skill covers server creation, tool/resource definitions, storage backends (memory/disk/Redis/DynamoDB), server lifespans, middleware system (8 built-in types), server composition (import/mount), OAuth Proxy, authentication patterns, icons, OpenAPI integration, client configuration, cloud deployment (FastMCP Cloud), error handling, and production patterns. It prevents 25+ common errors including storage misconfiguration, lifespan issues, middleware order errors, circular imports, module-level server issues, async/await confusion, OAuth security vulnerabilities, and cloud deployment failures. Includes templates for basic servers, storage backends, middleware, server composition, OAuth proxy, API integrations, testing, and self-contained production architectures. Keywords: FastMCP, MCP server Python, Model Context Protocol Python, fastmcp framework, mcp tools, mcp resources, mcp prompts, fastmcp storage, fastmcp memory storage, fastmcp disk storage, fastmcp redis, fastmcp dynamodb, fastmcp lifespan, fastmcp middleware, fastmcp oauth proxy, server composition mcp, fastmcp import, fastmcp mount, fastmcp cloud, fastmcp deployment, mcp authentication, fastmcp icons, openapi mcp, claude mcp server, fastmcp testing, storage misconfiguration, lifespan issues, middleware order, circular imports, module-level server, async await mcp
NPX Install
npx skill4agent add jackspace/claudeskillz fastmcpTags
Translated version includes tags in frontmatterSKILL.md Content
View Translation Comparison →FastMCP - Build MCP Servers in Python
Quick Start
Installation
pip install fastmcp
# or
uv pip install fastmcpMinimal Server
from fastmcp import FastMCP
# MUST be at module level for FastMCP Cloud
mcp = FastMCP("My Server")
@mcp.tool()
async def hello(name: str) -> str:
"""Say hello to someone."""
return f"Hello, {name}!"
if __name__ == "__main__":
mcp.run()# Local development
python server.py
# With FastMCP CLI
fastmcp dev server.py
# HTTP mode
python server.py --transport http --port 8000Core Concepts
1. Tools
@mcp.tool()
def calculate(operation: str, a: float, b: float) -> float:
"""Perform mathematical operations.
Args:
operation: add, subtract, multiply, or divide
a: First number
b: Second number
Returns:
Result of the operation
"""
operations = {
"add": lambda x, y: x + y,
"subtract": lambda x, y: x - y,
"multiply": lambda x, y: x * y,
"divide": lambda x, y: x / y if y != 0 else None
}
return operations.get(operation, lambda x, y: None)(a, b)- Clear, descriptive function names
- Comprehensive docstrings (LLMs read these!)
- Strong type hints (Pydantic validates automatically)
- Return structured data (dicts/lists)
- Handle errors gracefully
# Sync tool (for non-blocking operations)
@mcp.tool()
def sync_tool(param: str) -> dict:
return {"result": param.upper()}
# Async tool (for I/O operations, API calls)
@mcp.tool()
async def async_tool(url: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.json()2. Resources
# Static resource
@mcp.resource("data://config")
def get_config() -> dict:
"""Provide application configuration."""
return {
"version": "1.0.0",
"features": ["auth", "api", "cache"]
}
# Dynamic resource
@mcp.resource("info://status")
async def server_status() -> dict:
"""Get current server status."""
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"api_configured": bool(os.getenv("API_KEY"))
}- - Generic data
data:// - - File resources
file:// - - General resources
resource:// - - Information/metadata
info:// - - API endpoints
api:// - Custom schemes allowed
3. Resource Templates
# Single parameter
@mcp.resource("user://{user_id}/profile")
async def get_user_profile(user_id: str) -> dict:
"""Get user profile by ID."""
user = await fetch_user_from_db(user_id)
return {
"id": user_id,
"name": user.name,
"email": user.email
}
# Multiple parameters
@mcp.resource("org://{org_id}/team/{team_id}/members")
async def get_team_members(org_id: str, team_id: str) -> list:
"""Get team members with org context."""
return await db.query(
"SELECT * FROM members WHERE org_id = ? AND team_id = ?",
[org_id, team_id]
)4. Prompts
@mcp.prompt("analyze")
def analyze_prompt(topic: str) -> str:
"""Generate analysis prompt."""
return f"""
Analyze {topic} considering:
1. Current state
2. Challenges
3. Opportunities
4. Recommendations
Use available tools to gather data.
"""
@mcp.prompt("help")
def help_prompt() -> str:
"""Generate help text for server."""
return """
Welcome to My Server!
Available tools:
- search: Search for items
- process: Process data
Available resources:
- info://status: Server status
"""Context Features
1. Elicitation (User Input)
from fastmcp import Context
@mcp.tool()
async def confirm_action(action: str, context: Context) -> dict:
"""Perform action with user confirmation."""
# Request confirmation from user
confirmed = await context.request_elicitation(
prompt=f"Confirm {action}? (yes/no)",
response_type=str
)
if confirmed.lower() == "yes":
result = await perform_action(action)
return {"status": "completed", "action": action}
else:
return {"status": "cancelled", "action": action}2. Progress Tracking
@mcp.tool()
async def batch_import(file_path: str, context: Context) -> dict:
"""Import data with progress updates."""
data = await read_file(file_path)
total = len(data)
imported = []
for i, item in enumerate(data):
# Report progress
await context.report_progress(
progress=i + 1,
total=total,
message=f"Importing item {i + 1}/{total}"
)
result = await import_item(item)
imported.append(result)
return {"imported": len(imported), "total": total}3. Sampling (LLM Integration)
@mcp.tool()
async def enhance_text(text: str, context: Context) -> str:
"""Enhance text using LLM."""
response = await context.request_sampling(
messages=[{
"role": "system",
"content": "You are a professional copywriter."
}, {
"role": "user",
"content": f"Enhance this text: {text}"
}],
temperature=0.7,
max_tokens=500
)
return response["content"]Storage Backends
py-key-value-aioAvailable Backends
- Ephemeral storage (lost on restart)
- Fast, no configuration needed
- Good for development
- Persistent storage on local filesystem
- Encrypted by default with
FernetEncryptionWrapper - Platform-aware defaults (Mac/Windows use disk, Linux uses memory)
- Distributed storage for production
- Supports multi-instance deployments
- Ideal for response caching across servers
- DynamoDB (AWS)
- MongoDB
- Elasticsearch
- Memcached
- RocksDB
- Valkey
Basic Usage
from fastmcp import FastMCP
from key_value.stores import MemoryStore, DiskStore, RedisStore
from key_value.encryption import FernetEncryptionWrapper
from cryptography.fernet import Fernet
import os
# Memory storage (default)
mcp = FastMCP("My Server")
# Disk storage (persistent)
from key_value.stores import DiskStore
mcp = FastMCP(
"My Server",
storage=DiskStore(path="/app/data/storage")
)
# Redis storage (production)
from key_value.stores import RedisStore
mcp = FastMCP(
"My Server",
storage=RedisStore(
host=os.getenv("REDIS_HOST", "localhost"),
port=int(os.getenv("REDIS_PORT", "6379")),
password=os.getenv("REDIS_PASSWORD")
)
)Encrypted Storage
from cryptography.fernet import Fernet
from key_value.encryption import FernetEncryptionWrapper
from key_value.stores import DiskStore
# Generate encryption key (store in environment!)
# key = Fernet.generate_key()
# Use encrypted storage
encrypted_storage = FernetEncryptionWrapper(
key_value=DiskStore(path="/app/data/storage"),
fernet=Fernet(os.getenv("STORAGE_ENCRYPTION_KEY"))
)
mcp = FastMCP("My Server", storage=encrypted_storage)OAuth Token Storage
from fastmcp.auth import OAuthProxy
from key_value.stores import RedisStore
from key_value.encryption import FernetEncryptionWrapper
from cryptography.fernet import Fernet
# Production OAuth with encrypted Redis storage
auth = OAuthProxy(
jwt_signing_key=os.environ["JWT_SIGNING_KEY"],
client_storage=FernetEncryptionWrapper(
key_value=RedisStore(
host=os.getenv("REDIS_HOST"),
password=os.getenv("REDIS_PASSWORD")
),
fernet=Fernet(os.environ["STORAGE_ENCRYPTION_KEY"])
),
upstream_authorization_endpoint="https://provider.com/oauth/authorize",
upstream_token_endpoint="https://provider.com/oauth/token",
upstream_client_id=os.getenv("OAUTH_CLIENT_ID"),
upstream_client_secret=os.getenv("OAUTH_CLIENT_SECRET")
)
mcp = FastMCP("OAuth Server", auth=auth)Platform-Aware Defaults
- Mac/Windows: Disk storage (persistent)
- Linux: Memory storage (ephemeral)
- Override: Set parameter explicitly
storage
# Explicitly use disk storage on Linux
from key_value.stores import DiskStore
mcp = FastMCP(
"My Server",
storage=DiskStore(path="/var/lib/mcp/storage")
)Server Lifespans
Basic Pattern
from fastmcp import FastMCP
from contextlib import asynccontextmanager
from typing import AsyncIterator
from dataclasses import dataclass
@dataclass
class AppContext:
"""Shared application state."""
db: Database
api_client: httpx.AsyncClient
@asynccontextmanager
async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
"""
Initialize resources on startup, cleanup on shutdown.
Runs ONCE per server instance, NOT per client session.
"""
# Startup: Initialize resources
db = await Database.connect(os.getenv("DATABASE_URL"))
api_client = httpx.AsyncClient(
base_url=os.getenv("API_BASE_URL"),
headers={"Authorization": f"Bearer {os.getenv('API_KEY')}"},
timeout=30.0
)
print("Server initialized")
try:
# Yield context to tools
yield AppContext(db=db, api_client=api_client)
finally:
# Shutdown: Cleanup resources
await db.disconnect()
await api_client.aclose()
print("Server shutdown complete")
# Create server with lifespan
mcp = FastMCP("My Server", lifespan=app_lifespan)
# Access context in tools
from fastmcp import Context
@mcp.tool()
async def query_database(sql: str, context: Context) -> list:
"""Query database using shared connection."""
# Access lifespan context
app_context: AppContext = context.fastmcp_context.lifespan_context
return await app_context.db.query(sql)
@mcp.tool()
async def api_request(endpoint: str, context: Context) -> dict:
"""Make API request using shared client."""
app_context: AppContext = context.fastmcp_context.lifespan_context
response = await app_context.api_client.get(endpoint)
return response.json()ASGI Integration
from fastapi import FastAPI
from fastmcp import FastMCP
# FastMCP lifespan
@asynccontextmanager
async def mcp_lifespan(server: FastMCP):
print("MCP server starting")
yield
print("MCP server stopping")
mcp = FastMCP("My Server", lifespan=mcp_lifespan)
# FastAPI app MUST include MCP lifespan
app = FastAPI(lifespan=mcp.lifespan)
# Add routes
@app.get("/")
def root():
return {"message": "Hello World"}app = FastAPI() # MCP lifespan won't run!app = FastAPI(lifespan=mcp.lifespan)State Management
from fastmcp import Context
@mcp.tool()
async def set_config(key: str, value: str, context: Context) -> dict:
"""Store configuration value."""
context.fastmcp_context.set_state(key, value)
return {"status": "saved", "key": key}
@mcp.tool()
async def get_config(key: str, context: Context) -> dict:
"""Retrieve configuration value."""
value = context.fastmcp_context.get_state(key, default=None)
if value is None:
return {"error": f"Key '{key}' not found"}
return {"key": key, "value": value}Middleware System
Built-in Middleware (8 Types)
- TimingMiddleware - Performance monitoring
- ResponseCachingMiddleware - TTL-based caching with pluggable storage
- LoggingMiddleware - Human-readable and JSON-structured logging
- RateLimitingMiddleware - Token bucket and sliding window algorithms
- ErrorHandlingMiddleware - Consistent error management
- ToolInjectionMiddleware - Dynamic tool injection
- PromptToolMiddleware - Tool-based prompt access for limited clients
- ResourceToolMiddleware - Tool-based resource access for limited clients
Basic Usage
from fastmcp import FastMCP
from fastmcp.middleware import (
TimingMiddleware,
LoggingMiddleware,
RateLimitingMiddleware,
ResponseCachingMiddleware,
ErrorHandlingMiddleware
)
mcp = FastMCP("My Server")
# Add middleware (order matters!)
mcp.add_middleware(ErrorHandlingMiddleware())
mcp.add_middleware(TimingMiddleware())
mcp.add_middleware(LoggingMiddleware(level="INFO"))
mcp.add_middleware(RateLimitingMiddleware(
max_requests=100,
window_seconds=60,
algorithm="token_bucket"
))
mcp.add_middleware(ResponseCachingMiddleware(
ttl_seconds=300,
storage=RedisStore(host="localhost")
))Middleware Execution Order
Request Flow:
→ ErrorHandlingMiddleware (catches errors)
→ TimingMiddleware (starts timer)
→ LoggingMiddleware (logs request)
→ RateLimitingMiddleware (checks rate limit)
→ ResponseCachingMiddleware (checks cache)
→ Tool/Resource Handler
← ResponseCachingMiddleware (stores in cache)
← RateLimitingMiddleware
← LoggingMiddleware (logs response)
← TimingMiddleware (stops timer, logs duration)
← ErrorHandlingMiddleware (returns error if any)Custom Middleware
from fastmcp.middleware import BaseMiddleware
from fastmcp import Context
class AccessControlMiddleware(BaseMiddleware):
"""Check authorization before tool execution."""
def __init__(self, allowed_users: list[str]):
self.allowed_users = allowed_users
async def on_call_tool(self, tool_name: str, arguments: dict, context: Context):
"""Hook runs before tool execution."""
# Get user from context (from auth)
user = context.fastmcp_context.get_state("user_id")
if user not in self.allowed_users:
raise PermissionError(f"User '{user}' not authorized")
# Continue to tool
return await self.next(tool_name, arguments, context)
# Add to server
mcp.add_middleware(AccessControlMiddleware(
allowed_users=["alice", "bob", "charlie"]
))Hook Hierarchy
- - All messages (requests and notifications)
on_message - /
on_request- By message typeon_notification - ,
on_call_tool,on_read_resource- Operation-specificon_get_prompt - ,
on_list_tools,on_list_resources,on_list_prompts- List operationson_list_resource_templates
class ComprehensiveMiddleware(BaseMiddleware):
async def on_message(self, message: dict, context: Context):
"""Runs for ALL messages."""
print(f"Message: {message['method']}")
return await self.next(message, context)
async def on_call_tool(self, tool_name: str, arguments: dict, context: Context):
"""Runs only for tool calls."""
print(f"Tool: {tool_name}")
return await self.next(tool_name, arguments, context)
async def on_read_resource(self, uri: str, context: Context):
"""Runs only for resource reads."""
print(f"Resource: {uri}")
return await self.next(uri, context)Response Caching Middleware
from fastmcp.middleware import ResponseCachingMiddleware
from key_value.stores import RedisStore
# Cache responses for 5 minutes
cache_middleware = ResponseCachingMiddleware(
ttl_seconds=300,
storage=RedisStore(host="localhost"), # Shared across instances
cache_tools=True, # Cache tool calls
cache_resources=True, # Cache resource reads
cache_prompts=False # Don't cache prompts
)
mcp.add_middleware(cache_middleware)
# Tools/resources are automatically cached
@mcp.tool()
async def expensive_computation(data: str) -> dict:
"""This will be cached for 5 minutes."""
import time
time.sleep(5) # Expensive operation
return {"result": process(data)}Server Composition
Two Strategies
import_server()- One-time copy of components at import time
- Changes to subserver don't propagate
- Fast (no runtime delegation)
- Use for: Bundling finalized components
mount()- Live runtime link to subserver
- Changes to subserver immediately visible
- Runtime delegation (slower)
- Use for: Modular runtime composition
Import Server (Static)
from fastmcp import FastMCP
# Subserver with tools
api_server = FastMCP("API Server")
@api_server.tool()
def api_tool():
return "API result"
@api_server.resource("api://status")
def api_status():
return {"status": "ok"}
# Main server imports components
main_server = FastMCP("Main Server")
# Import all components from subserver
main_server.import_server(api_server)
# Now main_server has api_tool and api://status
# Changes to api_server won't affect main_serverMount Server (Dynamic)
from fastmcp import FastMCP
# Create servers
api_server = FastMCP("API Server")
db_server = FastMCP("DB Server")
@api_server.tool()
def fetch_data():
return "API data"
@db_server.tool()
def query_db():
return "DB result"
# Main server mounts subservers
main_server = FastMCP("Main Server")
# Mount with prefix
main_server.mount(api_server, prefix="api")
main_server.mount(db_server, prefix="db")
# Tools are namespaced:
# - api.fetch_data
# - db.query_db
# Resources are prefixed:
# - resource://api/path/to/resource
# - resource://db/path/to/resourceMounting Modes
# In-memory access, subserver runs in same process
main_server.mount(subserver, prefix="sub")# Treats subserver as separate entity with own lifecycle
main_server.mount(
subserver,
prefix="sub",
mode="proxy"
)Tag Filtering
# Tag subserver components
@api_server.tool(tags=["public"])
def public_api():
return "Public"
@api_server.tool(tags=["admin"])
def admin_api():
return "Admin only"
# Import only public tools
main_server.import_server(
api_server,
include_tags=["public"]
)
# Or exclude admin tools
main_server.import_server(
api_server,
exclude_tags=["admin"]
)
# Tag filtering is recursive with mount()
main_server.mount(
api_server,
prefix="api",
include_tags=["public"]
)Resource Prefix Formats
resource://prefix/path/to/resourceprefix+resource://path/to/resourcemain_server.mount(
subserver,
prefix="api",
resource_prefix_format="path" # or "protocol"
)OAuth Proxy & Authentication
Four Authentication Patterns
- Token Validation (/
TokenVerifier) - Validate external tokensJWTVerifier - External Identity Providers () - OAuth 2.0/OIDC with DCR
RemoteAuthProvider - OAuth Proxy () - Bridge to traditional OAuth providers
OAuthProxy - Full OAuth () - Complete authorization server
OAuthProvider
Pattern 1: Token Validation
from fastmcp import FastMCP
from fastmcp.auth import JWTVerifier
# JWT verification
auth = JWTVerifier(
issuer="https://auth.example.com",
audience="my-mcp-server",
public_key=os.getenv("JWT_PUBLIC_KEY")
)
mcp = FastMCP("Secure Server", auth=auth)
@mcp.tool()
async def secure_operation(context: Context) -> dict:
"""Only accessible with valid JWT."""
# Token validated automatically
user = context.fastmcp_context.get_state("user_id")
return {"user": user, "status": "authorized"}Pattern 2: External Identity Providers
from fastmcp.auth import RemoteAuthProvider
auth = RemoteAuthProvider(
issuer="https://auth.example.com",
# Provider must support DCR
)
mcp = FastMCP("OAuth Server", auth=auth)Pattern 3: OAuth Proxy (Recommended for Production)
from fastmcp.auth import OAuthProxy
from key_value.stores import RedisStore
from key_value.encryption import FernetEncryptionWrapper
from cryptography.fernet import Fernet
import os
auth = OAuthProxy(
# JWT signing for issued tokens
jwt_signing_key=os.environ["JWT_SIGNING_KEY"],
# Encrypted storage for upstream tokens
client_storage=FernetEncryptionWrapper(
key_value=RedisStore(
host=os.getenv("REDIS_HOST"),
password=os.getenv("REDIS_PASSWORD")
),
fernet=Fernet(os.environ["STORAGE_ENCRYPTION_KEY"])
),
# Upstream OAuth provider
upstream_authorization_endpoint="https://github.com/login/oauth/authorize",
upstream_token_endpoint="https://github.com/login/oauth/access_token",
upstream_client_id=os.getenv("GITHUB_CLIENT_ID"),
upstream_client_secret=os.getenv("GITHUB_CLIENT_SECRET"),
# Scopes
upstream_scope="read:user user:email",
# Security: Enable consent screen (prevents confused deputy attacks)
enable_consent_screen=True
)
mcp = FastMCP("GitHub Auth Server", auth=auth)OAuth Proxy Features
- Proxy issues its own JWTs (not forwarding upstream tokens)
- Upstream tokens stored encrypted
- Proxy tokens can have custom claims
- Prevents authorization bypass attacks
- Shows user what permissions are being granted
- Required for security compliance
- End-to-end validation from client to upstream
- Protects against authorization code interception
- Validate tokens with upstream provider
- Check revocation status
Pattern 4: Full OAuth Provider
from fastmcp.auth import OAuthProvider
auth = OAuthProvider(
issuer="https://my-auth-server.com",
client_storage=RedisStore(host="localhost"),
# Full OAuth 2.0 server implementation
)
mcp = FastMCP("Auth Server", auth=auth)Environment-Based Configuration
export FASTMCP_SERVER_AUTH='{"type": "oauth_proxy", "upstream_authorization_endpoint": "...", ...}'# Automatically configures from FASTMCP_SERVER_AUTH
mcp = FastMCP("Auto Auth Server")Supported OAuth Providers
- GitHub:
https://github.com/login/oauth/authorize - Google:
https://accounts.google.com/o/oauth2/v2/auth - Azure:
https://login.microsoftonline.com/{tenant}/oauth2/v2.0/authorize - AWS Cognito:
https://{domain}.auth.{region}.amazoncognito.com/oauth2/authorize - Discord:
https://discord.com/api/oauth2/authorize - Facebook:
https://www.facebook.com/v12.0/dialog/oauth - WorkOS: Enterprise identity
- AuthKit: Authentication toolkit
- Descope: Auth platform
- Scalekit: Enterprise SSO
Icons Support
Server-Level Icons
from fastmcp import FastMCP, Icon
mcp = FastMCP(
name="Weather Service",
website_url="https://weather.example.com",
icons=[
Icon(
url="https://example.com/icon-small.png",
size="small"
),
Icon(
url="https://example.com/icon-large.png",
size="large"
)
]
)Component-Level Icons
from fastmcp import Icon
@mcp.tool(icons=[
Icon(url="https://example.com/tool-icon.png")
])
async def analyze_data(data: str) -> dict:
"""Analyze data with visual icon."""
return {"result": "analyzed"}
@mcp.resource(
"user://{user_id}/profile",
icons=[Icon(url="https://example.com/user-icon.png")]
)
async def get_user(user_id: str) -> dict:
"""User profile with icon."""
return {"id": user_id, "name": "Alice"}
@mcp.prompt(
"analyze",
icons=[Icon(url="https://example.com/prompt-icon.png")]
)
def analysis_prompt(topic: str) -> str:
"""Analysis prompt with icon."""
return f"Analyze {topic}"Data URI Support
from fastmcp import Icon, Image
# Convert local file to data URI
icon = Icon.from_file("/path/to/icon.png", size="medium")
# Or use Image utility
image_data_uri = Image.to_data_uri("/path/to/icon.png")
icon = Icon(url=image_data_uri, size="medium")
# Use in server
mcp = FastMCP(
"My Server",
icons=[icon]
)Multiple Sizes
mcp = FastMCP(
"Responsive Server",
icons=[
Icon(url="icon-16.png", size="small"), # 16x16
Icon(url="icon-32.png", size="medium"), # 32x32
Icon(url="icon-64.png", size="large"), # 64x64
]
)API Integration
Pattern 1: Manual API Integration
import httpx
import os
# Create reusable client
client = httpx.AsyncClient(
base_url=os.getenv("API_BASE_URL"),
headers={"Authorization": f"Bearer {os.getenv('API_KEY')}"},
timeout=30.0
)
@mcp.tool()
async def fetch_data(endpoint: str) -> dict:
"""Fetch data from API."""
try:
response = await client.get(endpoint)
response.raise_for_status()
return {"success": True, "data": response.json()}
except httpx.HTTPStatusError as e:
return {"error": f"HTTP {e.response.status_code}"}
except Exception as e:
return {"error": str(e)}Pattern 2: OpenAPI/Swagger Auto-Generation
from fastmcp import FastMCP
from fastmcp.server.openapi import RouteMap, MCPType
import httpx
# Load OpenAPI spec
spec = httpx.get("https://api.example.com/openapi.json").json()
# Create authenticated client
client = httpx.AsyncClient(
base_url="https://api.example.com",
headers={"Authorization": f"Bearer {API_TOKEN}"},
timeout=30.0
)
# Auto-generate MCP server from OpenAPI
mcp = FastMCP.from_openapi(
openapi_spec=spec,
client=client,
name="API Server",
route_maps=[
# GET with parameters → Resource Templates
RouteMap(
methods=["GET"],
pattern=r".*\{.*\}.*",
mcp_type=MCPType.RESOURCE_TEMPLATE
),
# GET without parameters → Resources
RouteMap(
methods=["GET"],
mcp_type=MCPType.RESOURCE
),
# POST/PUT/DELETE → Tools
RouteMap(
methods=["POST", "PUT", "DELETE"],
mcp_type=MCPType.TOOL
),
]
)
# Optionally add custom tools
@mcp.tool()
async def custom_operation(data: dict) -> dict:
"""Custom tool on top of generated ones."""
return process_data(data)Pattern 3: FastAPI Conversion
from fastapi import FastAPI
from fastmcp import FastMCP
# Existing FastAPI app
app = FastAPI()
@app.get("/items/{item_id}")
def get_item(item_id: int):
return {"id": item_id, "name": "Item"}
# Convert to MCP server
mcp = FastMCP.from_fastapi(
app=app,
httpx_client_kwargs={
"headers": {"Authorization": "Bearer token"}
}
)Cloud Deployment (FastMCP Cloud)
Critical Requirements
- Module-level server object named ,
mcp, orserverapp - PyPI dependencies only in requirements.txt
- Public GitHub repository (or accessible to FastMCP Cloud)
- Environment variables for configuration
Cloud-Ready Server Pattern
# server.py
from fastmcp import FastMCP
import os
# ✅ CORRECT: Module-level server object
mcp = FastMCP(
name="production-server"
)
# ✅ Use environment variables
API_KEY = os.getenv("API_KEY")
DATABASE_URL = os.getenv("DATABASE_URL")
@mcp.tool()
async def production_tool(data: str) -> dict:
"""Production-ready tool."""
if not API_KEY:
return {"error": "API_KEY not configured"}
# Your implementation
return {"status": "success", "data": data}
# ✅ Optional: for local testing
if __name__ == "__main__":
mcp.run()Common Cloud Deployment Errors
def create_server():
mcp = FastMCP("my-server")
return mcp
if __name__ == "__main__":
server = create_server() # Too late for cloud!
server.run()def create_server() -> FastMCP:
mcp = FastMCP("my-server")
# Complex setup logic
return mcp
# Export at module level
mcp = create_server()
if __name__ == "__main__":
mcp.run()Deployment Steps
- Prepare Repository:
git init
git add .
git commit -m "Initial MCP server"
gh repo create my-mcp-server --public
git push -u origin main-
Deploy on FastMCP Cloud:
- Visit https://fastmcp.cloud
- Sign in with GitHub
- Click "Create Project"
- Select your repository
- Configure:
- Server Name: Your project name
- Entrypoint:
server.py - Environment Variables: Add any needed
-
Access Your Server:
- URL:
https://your-project.fastmcp.app/mcp - Automatic deployment on push to main
- PR preview deployments
- URL:
Client Configuration
Claude Desktop
claude_desktop_config.json{
"mcpServers": {
"my-server": {
"url": "https://your-project.fastmcp.app/mcp",
"transport": "http"
}
}
}Local Development
{
"mcpServers": {
"my-server": {
"command": "python",
"args": ["/absolute/path/to/server.py"],
"env": {
"API_KEY": "your-key",
"DATABASE_URL": "your-db-url"
}
}
}
}Claude Code CLI
{
"mcpServers": {
"my-server": {
"command": "uv",
"args": ["run", "python", "/absolute/path/to/server.py"]
}
}
}25 Common Errors (With Solutions)
Error 1: Missing Server Object
RuntimeError: No server object found at module level# ❌ WRONG
def create_server():
return FastMCP("server")
# ✅ CORRECT
mcp = FastMCP("server") # At module levelError 2: Async/Await Confusion
RuntimeError: no running event loop
TypeError: object coroutine can't be used in 'await' expression# ❌ WRONG: Sync function calling async
@mcp.tool()
def bad_tool():
result = await async_function() # Error!
# ✅ CORRECT: Async tool
@mcp.tool()
async def good_tool():
result = await async_function()
return result
# ✅ CORRECT: Sync tool with sync code
@mcp.tool()
def sync_tool():
return "Hello"Error 3: Context Not Injected
TypeError: missing 1 required positional argument: 'context'Contextfrom fastmcp import Context
# ❌ WRONG: No type hint
@mcp.tool()
async def bad_tool(context): # Missing type!
await context.report_progress(...)
# ✅ CORRECT: Proper type hint
@mcp.tool()
async def good_tool(context: Context):
await context.report_progress(0, 100, "Starting")Error 4: Resource URI Syntax
ValueError: Invalid resource URI: missing scheme# ❌ WRONG: Missing scheme
@mcp.resource("config")
def get_config(): pass
# ✅ CORRECT: Include scheme
@mcp.resource("data://config")
def get_config(): pass
# ✅ Valid schemes
@mcp.resource("file://config.json")
@mcp.resource("api://status")
@mcp.resource("info://health")Error 5: Resource Template Parameter Mismatch
TypeError: get_user() missing 1 required positional argument: 'user_id'# ❌ WRONG: Parameter name mismatch
@mcp.resource("user://{user_id}/profile")
def get_user(id: str): # Wrong name!
pass
# ✅ CORRECT: Matching names
@mcp.resource("user://{user_id}/profile")
def get_user(user_id: str): # Matches {user_id}
return {"id": user_id}Error 6: Pydantic Validation Error
ValidationError: value is not a valid integerfrom pydantic import BaseModel, Field
# ✅ Use Pydantic models for complex validation
class SearchParams(BaseModel):
query: str = Field(min_length=1, max_length=100)
limit: int = Field(default=10, ge=1, le=100)
@mcp.tool()
async def search(params: SearchParams) -> dict:
# Validation automatic
return await perform_search(params.query, params.limit)Error 7: Transport/Protocol Mismatch
ConnectionError: Server using different transport# Server using stdio (default)
mcp.run() # or mcp.run(transport="stdio")
# Client configuration must match
{
"command": "python",
"args": ["server.py"]
}
# OR for HTTP:
mcp.run(transport="http", port=8000)
# Client:
{
"url": "http://localhost:8000/mcp",
"transport": "http"
}Error 8: Import Errors (Editable Package)
ModuleNotFoundError: No module named 'my_package'# ✅ Install in editable mode
pip install -e .
# ✅ Or use absolute imports
from src.tools import my_tool
# ✅ Or add to PYTHONPATH
export PYTHONPATH="${PYTHONPATH}:/path/to/project"Error 9: Deprecation Warnings
DeprecationWarning: 'mcp.settings' is deprecated, use global Settings instead# ❌ OLD: FastMCP v1
from fastmcp import FastMCP
mcp = FastMCP()
api_key = mcp.settings.get("API_KEY")
# ✅ NEW: FastMCP v2
import os
api_key = os.getenv("API_KEY")Error 10: Port Already in Use
OSError: [Errno 48] Address already in use# ✅ Use different port
python server.py --transport http --port 8001
# ✅ Or kill process on port
lsof -ti:8000 | xargs kill -9Error 11: Schema Generation Failures
TypeError: Object of type 'ndarray' is not JSON serializable# ❌ WRONG: NumPy array
import numpy as np
@mcp.tool()
def bad_tool() -> np.ndarray: # Not JSON serializable
return np.array([1, 2, 3])
# ✅ CORRECT: Use JSON-compatible types
@mcp.tool()
def good_tool() -> list[float]:
return [1.0, 2.0, 3.0]
# ✅ Or convert to dict
@mcp.tool()
def array_tool() -> dict:
data = np.array([1, 2, 3])
return {"values": data.tolist()}Error 12: JSON Serialization
TypeError: Object of type 'datetime' is not JSON serializablefrom datetime import datetime
# ❌ WRONG: Return datetime object
@mcp.tool()
def bad_tool() -> dict:
return {"timestamp": datetime.now()} # Not serializable
# ✅ CORRECT: Convert to string
@mcp.tool()
def good_tool() -> dict:
return {"timestamp": datetime.now().isoformat()}
# ✅ Use helper function
def make_serializable(obj):
"""Convert object to JSON-serializable format."""
if isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, bytes):
return obj.decode('utf-8')
# Add more conversions as needed
return objError 13: Circular Import Errors
ImportError: cannot import name 'X' from partially initialized module# ❌ WRONG: Factory function in __init__.py
# shared/__init__.py
_client = None
def get_api_client():
from .api_client import APIClient # Circular!
return APIClient()
# shared/monitoring.py
from . import get_api_client # Creates circle
# ✅ CORRECT: Direct imports
# shared/__init__.py
from .api_client import APIClient
from .cache import CacheManager
# shared/monitoring.py
from .api_client import APIClient
client = APIClient() # Create directly
# ✅ ALTERNATIVE: Lazy import
# shared/monitoring.py
def get_client():
from .api_client import APIClient
return APIClient()Error 14: Python Version Compatibility
DeprecationWarning: datetime.utcnow() is deprecated# ❌ DEPRECATED (Python 3.12+)
from datetime import datetime
timestamp = datetime.utcnow()
# ✅ CORRECT: Future-proof
from datetime import datetime, timezone
timestamp = datetime.now(timezone.utc)Error 15: Import-Time Execution
RuntimeError: Event loop is closed# ❌ WRONG: Module-level async execution
import asyncpg
connection = asyncpg.connect('postgresql://...') # Runs at import!
# ✅ CORRECT: Lazy initialization
import asyncpg
class Database:
connection = None
@classmethod
async def connect(cls):
if cls.connection is None:
cls.connection = await asyncpg.connect('postgresql://...')
return cls.connection
# Usage: connection happens when needed, not at import
@mcp.tool()
async def get_users():
conn = await Database.connect()
return await conn.fetch("SELECT * FROM users")Error 16: Storage Backend Not Configured
RuntimeError: OAuth tokens lost on restart
ValueError: Cache not persisting across server instances# ❌ WRONG: Memory storage in production
mcp = FastMCP("Production Server") # Tokens lost on restart!
# ✅ CORRECT: Use disk or Redis storage
from key_value.stores import DiskStore, RedisStore
from key_value.encryption import FernetEncryptionWrapper
from cryptography.fernet import Fernet
# Disk storage (single instance)
mcp = FastMCP(
"Production Server",
storage=FernetEncryptionWrapper(
key_value=DiskStore(path="/var/lib/mcp/storage"),
fernet=Fernet(os.getenv("STORAGE_ENCRYPTION_KEY"))
)
)
# Redis storage (multi-instance)
mcp = FastMCP(
"Production Server",
storage=FernetEncryptionWrapper(
key_value=RedisStore(
host=os.getenv("REDIS_HOST"),
password=os.getenv("REDIS_PASSWORD")
),
fernet=Fernet(os.getenv("STORAGE_ENCRYPTION_KEY"))
)
)Error 17: Lifespan Not Passed to ASGI App
RuntimeError: Database connection never initialized
Warning: MCP lifespan hooks not runningfrom fastapi import FastAPI
from fastmcp import FastMCP
# ❌ WRONG: Lifespan not passed
mcp = FastMCP("My Server", lifespan=my_lifespan)
app = FastAPI() # MCP lifespan won't run!
# ✅ CORRECT: Pass MCP lifespan to parent app
mcp = FastMCP("My Server", lifespan=my_lifespan)
app = FastAPI(lifespan=mcp.lifespan)Error 18: Middleware Execution Order Error
RuntimeError: Rate limit not checked before caching
AttributeError: Context state not available in middleware# ❌ WRONG: Cache before rate limiting
mcp.add_middleware(ResponseCachingMiddleware())
mcp.add_middleware(RateLimitingMiddleware()) # Too late!
# ✅ CORRECT: Rate limit before cache
mcp.add_middleware(ErrorHandlingMiddleware()) # First: catch errors
mcp.add_middleware(TimingMiddleware()) # Second: time requests
mcp.add_middleware(LoggingMiddleware()) # Third: log
mcp.add_middleware(RateLimitingMiddleware()) # Fourth: check limits
mcp.add_middleware(ResponseCachingMiddleware()) # Last: cacheError 19: Circular Middleware Dependencies
RecursionError: maximum recursion depth exceeded
RuntimeError: Middleware loop detectedself.next()# ❌ WRONG: Not calling next() or calling incorrectly
class BadMiddleware(BaseMiddleware):
async def on_call_tool(self, tool_name, arguments, context):
# Forgot to call next()!
return {"error": "blocked"}
# ✅ CORRECT: Always call next() to continue chain
class GoodMiddleware(BaseMiddleware):
async def on_call_tool(self, tool_name, arguments, context):
# Do preprocessing
print(f"Before: {tool_name}")
# MUST call next() to continue
result = await self.next(tool_name, arguments, context)
# Do postprocessing
print(f"After: {tool_name}")
return resultError 20: Import vs Mount Confusion
RuntimeError: Subserver changes not reflected
ValueError: Unexpected tool namespacingimport_server()mount()# ❌ WRONG: Using import when you want dynamic updates
main_server.import_server(subserver)
# Later: changes to subserver won't appear in main_server
# ✅ CORRECT: Use mount() for dynamic composition
main_server.mount(subserver, prefix="sub")
# Changes to subserver are immediately visible
# ❌ WRONG: Using mount when you want static bundle
main_server.mount(third_party_server, prefix="vendor")
# Runtime overhead for static components
# ✅ CORRECT: Use import_server() for static bundles
main_server.import_server(third_party_server)
# One-time copy, no runtime delegationError 21: Resource Prefix Format Mismatch
ValueError: Resource not found: resource://api/users
ValueError: Unexpected resource URI format# Path format (default since v2.4.0)
main_server.mount(api_server, prefix="api")
# Resources: resource://api/users
# ❌ WRONG: Expecting protocol format
# resource://api+users (doesn't exist)
# ✅ CORRECT: Use path format
uri = "resource://api/users"
# OR explicitly set protocol format (legacy)
main_server.mount(
api_server,
prefix="api",
resource_prefix_format="protocol"
)
# Resources: api+resource://usersError 22: OAuth Proxy Without Consent Screen
SecurityWarning: Authorization bypass possible
RuntimeError: Confused deputy attack vector# ❌ WRONG: No consent screen (security risk!)
auth = OAuthProxy(
jwt_signing_key=os.getenv("JWT_KEY"),
upstream_authorization_endpoint="...",
upstream_token_endpoint="...",
# Missing: enable_consent_screen
)
# ✅ CORRECT: Enable consent screen
auth = OAuthProxy(
jwt_signing_key=os.getenv("JWT_KEY"),
upstream_authorization_endpoint="...",
upstream_token_endpoint="...",
enable_consent_screen=True # Prevents bypass attacks
)Error 23: Missing JWT Signing Key in Production
ValueError: JWT signing key required for OAuth Proxy
RuntimeError: Cannot issue tokens without signing keyjwt_signing_key# ❌ WRONG: No JWT signing key
auth = OAuthProxy(
upstream_authorization_endpoint="...",
upstream_token_endpoint="...",
# Missing: jwt_signing_key
)
# ✅ CORRECT: Provide signing key from environment
import secrets
# Generate once (in setup):
# signing_key = secrets.token_urlsafe(32)
# Store in: FASTMCP_JWT_SIGNING_KEY environment variable
auth = OAuthProxy(
jwt_signing_key=os.environ["FASTMCP_JWT_SIGNING_KEY"],
client_storage=encrypted_storage,
upstream_authorization_endpoint="...",
upstream_token_endpoint="...",
upstream_client_id=os.getenv("OAUTH_CLIENT_ID"),
upstream_client_secret=os.getenv("OAUTH_CLIENT_SECRET")
)Error 24: Icon Data URI Format Error
ValueError: Invalid data URI format
TypeError: Icon URL must be string or data URIfrom fastmcp import Icon, Image
# ❌ WRONG: Invalid data URI
icon = Icon(url="base64,iVBORw0KG...") # Missing data:image/png;
# ✅ CORRECT: Use Image utility
icon = Icon.from_file("/path/to/icon.png", size="medium")
# ✅ CORRECT: Manual data URI
import base64
with open("/path/to/icon.png", "rb") as f:
image_data = base64.b64encode(f.read()).decode()
data_uri = f"data:image/png;base64,{image_data}"
icon = Icon(url=data_uri, size="medium")Error 25: Lifespan Behavior Change (v2.13.0)
Warning: Lifespan runs per-server, not per-session
RuntimeError: Resources initialized multiple times# v2.12.0 and earlier: Lifespan ran per client session
# v2.13.0+: Lifespan runs once per server instance
# ✅ CORRECT: v2.13.0+ pattern (per-server)
@asynccontextmanager
async def app_lifespan(server: FastMCP):
"""Runs ONCE when server starts, not per client session."""
db = await Database.connect()
print("Server starting - runs once")
try:
yield {"db": db}
finally:
await db.disconnect()
print("Server stopping - runs once")
mcp = FastMCP("My Server", lifespan=app_lifespan)
# For per-session logic, use middleware instead:
class SessionMiddleware(BaseMiddleware):
async def on_message(self, message, context):
# Runs per client message
session_id = context.fastmcp_context.get_state("session_id")
if not session_id:
session_id = str(uuid.uuid4())
context.fastmcp_context.set_state("session_id", session_id)
return await self.next(message, context)Production Patterns
Pattern 1: Self-Contained Utils Module
# src/utils.py - Single file with all utilities
import os
from typing import Dict, Any
from datetime import datetime
class Config:
"""Application configuration."""
SERVER_NAME = os.getenv("SERVER_NAME", "FastMCP Server")
SERVER_VERSION = "1.0.0"
API_BASE_URL = os.getenv("API_BASE_URL")
API_KEY = os.getenv("API_KEY")
CACHE_TTL = int(os.getenv("CACHE_TTL", "300"))
def format_success(data: Any, message: str = "Success") -> Dict[str, Any]:
"""Format successful response."""
return {
"success": True,
"message": message,
"data": data,
"timestamp": datetime.now().isoformat()
}
def format_error(error: str, code: str = "ERROR") -> Dict[str, Any]:
"""Format error response."""
return {
"success": False,
"error": error,
"code": code,
"timestamp": datetime.now().isoformat()
}
# Usage in tools
from .utils import format_success, format_error, Config
@mcp.tool()
async def process_data(data: dict) -> dict:
try:
result = await process(data)
return format_success(result)
except Exception as e:
return format_error(str(e))Pattern 2: Connection Pooling
import httpx
from typing import Optional
class APIClient:
_instance: Optional[httpx.AsyncClient] = None
@classmethod
async def get_client(cls) -> httpx.AsyncClient:
if cls._instance is None:
cls._instance = httpx.AsyncClient(
base_url=os.getenv("API_BASE_URL"),
headers={"Authorization": f"Bearer {os.getenv('API_KEY')}"},
timeout=httpx.Timeout(30.0),
limits=httpx.Limits(max_keepalive_connections=5)
)
return cls._instance
@classmethod
async def cleanup(cls):
if cls._instance:
await cls._instance.aclose()
cls._instance = None
@mcp.tool()
async def api_request(endpoint: str) -> dict:
"""Make API request with managed client."""
client = await APIClient.get_client()
response = await client.get(endpoint)
return response.json()Pattern 3: Error Handling with Retry
import asyncio
from typing import Callable, TypeVar
T = TypeVar('T')
async def retry_with_backoff(
func: Callable[[], T],
max_retries: int = 3,
initial_delay: float = 1.0,
exponential_base: float = 2.0
) -> T:
"""Retry function with exponential backoff."""
delay = initial_delay
last_exception = None
for attempt in range(max_retries):
try:
return await func()
except Exception as e:
last_exception = e
if attempt < max_retries - 1:
await asyncio.sleep(delay)
delay *= exponential_base
raise last_exception
@mcp.tool()
async def resilient_api_call(endpoint: str) -> dict:
"""API call with automatic retry."""
async def make_call():
async with httpx.AsyncClient() as client:
response = await client.get(endpoint)
response.raise_for_status()
return response.json()
try:
data = await retry_with_backoff(make_call)
return {"success": True, "data": data}
except Exception as e:
return {"error": f"Failed after retries: {e}"}Pattern 4: Time-Based Caching
import time
from typing import Any, Optional
class TimeBasedCache:
def __init__(self, ttl: int = 300):
self.ttl = ttl
self.cache = {}
self.timestamps = {}
def get(self, key: str) -> Optional[Any]:
if key in self.cache:
if time.time() - self.timestamps[key] < self.ttl:
return self.cache[key]
else:
del self.cache[key]
del self.timestamps[key]
return None
def set(self, key: str, value: Any):
self.cache[key] = value
self.timestamps[key] = time.time()
cache = TimeBasedCache(ttl=300)
@mcp.tool()
async def cached_fetch(resource_id: str) -> dict:
"""Fetch with caching."""
cache_key = f"resource:{resource_id}"
cached_data = cache.get(cache_key)
if cached_data:
return {"data": cached_data, "from_cache": True}
data = await fetch_from_api(resource_id)
cache.set(cache_key, data)
return {"data": data, "from_cache": False}Testing
Unit Testing Tools
import pytest
from fastmcp import FastMCP
from fastmcp.testing import create_test_client
@pytest.fixture
def test_server():
"""Create test server instance."""
mcp = FastMCP("test-server")
@mcp.tool()
async def test_tool(param: str) -> str:
return f"Result: {param}"
return mcp
@pytest.mark.asyncio
async def test_tool_execution(test_server):
"""Test tool execution."""
async with create_test_client(test_server) as client:
result = await client.call_tool("test_tool", {"param": "test"})
assert result.data == "Result: test"Integration Testing
import asyncio
from fastmcp import Client
async def test_server():
"""Test all server functionality."""
async with Client("server.py") as client:
# Test tools
tools = await client.list_tools()
print(f"Tools: {len(tools)}")
for tool in tools:
try:
result = await client.call_tool(tool.name, {})
print(f"✓ {tool.name}: {result}")
except Exception as e:
print(f"✗ {tool.name}: {e}")
# Test resources
resources = await client.list_resources()
for resource in resources:
try:
data = await client.read_resource(resource.uri)
print(f"✓ {resource.uri}")
except Exception as e:
print(f"✗ {resource.uri}: {e}")
if __name__ == "__main__":
asyncio.run(test_server())CLI Commands
# Run with inspector (recommended)
fastmcp dev server.py
# Run normally
fastmcp run server.py
# Inspect server without running
fastmcp inspect server.py# Install to Claude Desktop
fastmcp install server.py
# Install with custom name
fastmcp install server.py --name "My Server"# Enable debug logging
FASTMCP_LOG_LEVEL=DEBUG fastmcp dev server.py
# Run with HTTP transport
fastmcp run server.py --transport http --port 8000Best Practices
1. Server Structure
from fastmcp import FastMCP
import os
def create_server() -> FastMCP:
"""Factory function for complex setup."""
mcp = FastMCP("Server Name")
# Configure server
setup_tools(mcp)
setup_resources(mcp)
return mcp
def setup_tools(mcp: FastMCP):
"""Register all tools."""
@mcp.tool()
def example_tool():
pass
def setup_resources(mcp: FastMCP):
"""Register all resources."""
@mcp.resource("data://config")
def get_config():
return {"version": "1.0.0"}
# Export at module level
mcp = create_server()
if __name__ == "__main__":
mcp.run()2. Environment Configuration
import os
from dotenv import load_dotenv
load_dotenv()
class Config:
API_KEY = os.getenv("API_KEY", "")
BASE_URL = os.getenv("BASE_URL", "https://api.example.com")
DEBUG = os.getenv("DEBUG", "false").lower() == "true"
@classmethod
def validate(cls):
if not cls.API_KEY:
raise ValueError("API_KEY is required")
return True
# Validate on startup
Config.validate()3. Documentation
@mcp.tool()
def complex_tool(
query: str,
filters: dict = None,
limit: int = 10
) -> dict:
"""
Search with advanced filtering.
Args:
query: Search query string
filters: Optional filters dict with keys:
- category: Filter by category
- date_from: Start date (ISO format)
- date_to: End date (ISO format)
limit: Maximum results (1-100)
Returns:
Dict with 'results' list and 'total' count
Examples:
>>> complex_tool("python", {"category": "tutorial"}, 5)
{'results': [...], 'total': 5}
"""
pass4. Health Checks
@mcp.resource("health://status")
async def health_check() -> dict:
"""Comprehensive health check."""
checks = {}
# Check API connectivity
try:
async with httpx.AsyncClient() as client:
response = await client.get(f"{BASE_URL}/health", timeout=5)
checks["api"] = response.status_code == 200
except:
checks["api"] = False
# Check database
try:
checks["database"] = await check_db_connection()
except:
checks["database"] = False
all_healthy = all(checks.values())
return {
"status": "healthy" if all_healthy else "degraded",
"timestamp": datetime.now().isoformat(),
"checks": checks
}Project Structure
Simple Server
my-mcp-server/
├── server.py # Main server file
├── requirements.txt # Dependencies
├── .env # Environment variables (git-ignored)
├── .gitignore # Git ignore file
└── README.md # DocumentationProduction Server
my-mcp-server/
├── src/
│ ├── server.py # Main entry point
│ ├── utils.py # Shared utilities
│ ├── tools/ # Tool modules
│ │ ├── __init__.py
│ │ ├── api_tools.py
│ │ └── data_tools.py
│ ├── resources/ # Resource definitions
│ │ ├── __init__.py
│ │ └── static.py
│ └── prompts/ # Prompt templates
│ ├── __init__.py
│ └── templates.py
├── tests/
│ ├── test_tools.py
│ └── test_resources.py
├── requirements.txt
├── pyproject.toml
├── .env
├── .gitignore
└── README.mdReferences
- FastMCP: https://github.com/jlowin/fastmcp
- FastMCP Cloud: https://fastmcp.cloud
- MCP Protocol: https://modelcontextprotocol.io
- Context7 Docs:
/jlowin/fastmcp
- - OpenAI integration
openai-api - - Claude API
claude-api - - Deploy MCP as Worker
cloudflare-worker-base
- fastmcp >= 2.13.0
- Python >= 3.10
- httpx (recommended for async API calls)
- pydantic (for validation)
- py-key-value-aio (for storage backends)
- cryptography (for encrypted storage)
Summary
- Always export server at module level for FastMCP Cloud compatibility
- Use persistent storage backends (Disk/Redis) in production for OAuth tokens and caching
- Configure server lifespans for proper resource management (DB connections, API clients)
- Add middleware strategically - order matters! (errors → timing → logging → rate limiting → caching)
- Choose composition wisely - for static bundles,
import_server()for dynamic compositionmount() - Secure OAuth properly - Enable consent screens, encrypt token storage, use JWT signing keys
- Use async/await properly - don't block the event loop
- Handle errors gracefully with structured responses and ErrorHandlingMiddleware
- Avoid circular imports especially with factory functions
- Test locally before deploying using
fastmcp dev - Use environment variables for all configuration (never hardcode secrets)
- Document thoroughly - LLMs read your docstrings
- Follow production patterns for self-contained, maintainable code
- Leverage OpenAPI for instant API integration
- Monitor with health checks and middleware for production reliability
- Storage: Encrypted persistence for OAuth tokens and response caching
- Authentication: 4 auth patterns (Token Validation, Remote OAuth, OAuth Proxy, Full OAuth)
- Middleware: 8 built-in types for logging, rate limiting, caching, error handling
- Composition: Modular server architecture with import/mount strategies
- Security: Consent screens, PKCE, RFC 7662 token introspection, encrypted storage
- Performance: Response caching, connection pooling, timing middleware