Loading...
Loading...
Use this skill when building MCP (Model Context Protocol) servers with FastMCP in Python. FastMCP is a framework for creating servers that expose tools, resources, and prompts to LLMs like Claude. The skill covers server creation, tool/resource definitions, storage backends (memory/disk/Redis/DynamoDB), server lifespans, middleware system (8 built-in types), server composition (import/mount), OAuth Proxy, authentication patterns, icons, OpenAPI integration, client configuration, cloud deployment (FastMCP Cloud), error handling, and production patterns. It prevents 25+ common errors including storage misconfiguration, lifespan issues, middleware order errors, circular imports, module-level server issues, async/await confusion, OAuth security vulnerabilities, and cloud deployment failures. Includes templates for basic servers, storage backends, middleware, server composition, OAuth proxy, API integrations, testing, and self-contained production architectures. Keywords: FastMCP, MCP server Python, Model Context Protocol Python, fastmcp framework, mcp tools, mcp resources, mcp prompts, fastmcp storage, fastmcp memory storage, fastmcp disk storage, fastmcp redis, fastmcp dynamodb, fastmcp lifespan, fastmcp middleware, fastmcp oauth proxy, server composition mcp, fastmcp import, fastmcp mount, fastmcp cloud, fastmcp deployment, mcp authentication, fastmcp icons, openapi mcp, claude mcp server, fastmcp testing, storage misconfiguration, lifespan issues, middleware order, circular imports, module-level server, async await mcp
npx skill4agent add jackspace/claudeskillz fastmcppip install fastmcp
# or
uv pip install fastmcpfrom fastmcp import FastMCP
# MUST be at module level for FastMCP Cloud
mcp = FastMCP("My Server")
@mcp.tool()
async def hello(name: str) -> str:
"""Say hello to someone."""
return f"Hello, {name}!"
if __name__ == "__main__":
mcp.run()# Local development
python server.py
# With FastMCP CLI
fastmcp dev server.py
# HTTP mode
python server.py --transport http --port 8000@mcp.tool()
def calculate(operation: str, a: float, b: float) -> float:
"""Perform mathematical operations.
Args:
operation: add, subtract, multiply, or divide
a: First number
b: Second number
Returns:
Result of the operation
"""
operations = {
"add": lambda x, y: x + y,
"subtract": lambda x, y: x - y,
"multiply": lambda x, y: x * y,
"divide": lambda x, y: x / y if y != 0 else None
}
return operations.get(operation, lambda x, y: None)(a, b)# Sync tool (for non-blocking operations)
@mcp.tool()
def sync_tool(param: str) -> dict:
return {"result": param.upper()}
# Async tool (for I/O operations, API calls)
@mcp.tool()
async def async_tool(url: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.json()# Static resource
@mcp.resource("data://config")
def get_config() -> dict:
"""Provide application configuration."""
return {
"version": "1.0.0",
"features": ["auth", "api", "cache"]
}
# Dynamic resource
@mcp.resource("info://status")
async def server_status() -> dict:
"""Get current server status."""
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"api_configured": bool(os.getenv("API_KEY"))
}data://file://resource://info://api://# Single parameter
@mcp.resource("user://{user_id}/profile")
async def get_user_profile(user_id: str) -> dict:
"""Get user profile by ID."""
user = await fetch_user_from_db(user_id)
return {
"id": user_id,
"name": user.name,
"email": user.email
}
# Multiple parameters
@mcp.resource("org://{org_id}/team/{team_id}/members")
async def get_team_members(org_id: str, team_id: str) -> list:
"""Get team members with org context."""
return await db.query(
"SELECT * FROM members WHERE org_id = ? AND team_id = ?",
[org_id, team_id]
)@mcp.prompt("analyze")
def analyze_prompt(topic: str) -> str:
"""Generate analysis prompt."""
return f"""
Analyze {topic} considering:
1. Current state
2. Challenges
3. Opportunities
4. Recommendations
Use available tools to gather data.
"""
@mcp.prompt("help")
def help_prompt() -> str:
"""Generate help text for server."""
return """
Welcome to My Server!
Available tools:
- search: Search for items
- process: Process data
Available resources:
- info://status: Server status
"""from fastmcp import Context
@mcp.tool()
async def confirm_action(action: str, context: Context) -> dict:
"""Perform action with user confirmation."""
# Request confirmation from user
confirmed = await context.request_elicitation(
prompt=f"Confirm {action}? (yes/no)",
response_type=str
)
if confirmed.lower() == "yes":
result = await perform_action(action)
return {"status": "completed", "action": action}
else:
return {"status": "cancelled", "action": action}@mcp.tool()
async def batch_import(file_path: str, context: Context) -> dict:
"""Import data with progress updates."""
data = await read_file(file_path)
total = len(data)
imported = []
for i, item in enumerate(data):
# Report progress
await context.report_progress(
progress=i + 1,
total=total,
message=f"Importing item {i + 1}/{total}"
)
result = await import_item(item)
imported.append(result)
return {"imported": len(imported), "total": total}@mcp.tool()
async def enhance_text(text: str, context: Context) -> str:
"""Enhance text using LLM."""
response = await context.request_sampling(
messages=[{
"role": "system",
"content": "You are a professional copywriter."
}, {
"role": "user",
"content": f"Enhance this text: {text}"
}],
temperature=0.7,
max_tokens=500
)
return response["content"]py-key-value-aioFernetEncryptionWrapperfrom fastmcp import FastMCP
from key_value.stores import MemoryStore, DiskStore, RedisStore
from key_value.encryption import FernetEncryptionWrapper
from cryptography.fernet import Fernet
import os
# Memory storage (default)
mcp = FastMCP("My Server")
# Disk storage (persistent)
from key_value.stores import DiskStore
mcp = FastMCP(
"My Server",
storage=DiskStore(path="/app/data/storage")
)
# Redis storage (production)
from key_value.stores import RedisStore
mcp = FastMCP(
"My Server",
storage=RedisStore(
host=os.getenv("REDIS_HOST", "localhost"),
port=int(os.getenv("REDIS_PORT", "6379")),
password=os.getenv("REDIS_PASSWORD")
)
)from cryptography.fernet import Fernet
from key_value.encryption import FernetEncryptionWrapper
from key_value.stores import DiskStore
# Generate encryption key (store in environment!)
# key = Fernet.generate_key()
# Use encrypted storage
encrypted_storage = FernetEncryptionWrapper(
key_value=DiskStore(path="/app/data/storage"),
fernet=Fernet(os.getenv("STORAGE_ENCRYPTION_KEY"))
)
mcp = FastMCP("My Server", storage=encrypted_storage)from fastmcp.auth import OAuthProxy
from key_value.stores import RedisStore
from key_value.encryption import FernetEncryptionWrapper
from cryptography.fernet import Fernet
# Production OAuth with encrypted Redis storage
auth = OAuthProxy(
jwt_signing_key=os.environ["JWT_SIGNING_KEY"],
client_storage=FernetEncryptionWrapper(
key_value=RedisStore(
host=os.getenv("REDIS_HOST"),
password=os.getenv("REDIS_PASSWORD")
),
fernet=Fernet(os.environ["STORAGE_ENCRYPTION_KEY"])
),
upstream_authorization_endpoint="https://provider.com/oauth/authorize",
upstream_token_endpoint="https://provider.com/oauth/token",
upstream_client_id=os.getenv("OAUTH_CLIENT_ID"),
upstream_client_secret=os.getenv("OAUTH_CLIENT_SECRET")
)
mcp = FastMCP("OAuth Server", auth=auth)storage# Explicitly use disk storage on Linux
from key_value.stores import DiskStore
mcp = FastMCP(
"My Server",
storage=DiskStore(path="/var/lib/mcp/storage")
)from fastmcp import FastMCP
from contextlib import asynccontextmanager
from typing import AsyncIterator
from dataclasses import dataclass
@dataclass
class AppContext:
"""Shared application state."""
db: Database
api_client: httpx.AsyncClient
@asynccontextmanager
async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
"""
Initialize resources on startup, cleanup on shutdown.
Runs ONCE per server instance, NOT per client session.
"""
# Startup: Initialize resources
db = await Database.connect(os.getenv("DATABASE_URL"))
api_client = httpx.AsyncClient(
base_url=os.getenv("API_BASE_URL"),
headers={"Authorization": f"Bearer {os.getenv('API_KEY')}"},
timeout=30.0
)
print("Server initialized")
try:
# Yield context to tools
yield AppContext(db=db, api_client=api_client)
finally:
# Shutdown: Cleanup resources
await db.disconnect()
await api_client.aclose()
print("Server shutdown complete")
# Create server with lifespan
mcp = FastMCP("My Server", lifespan=app_lifespan)
# Access context in tools
from fastmcp import Context
@mcp.tool()
async def query_database(sql: str, context: Context) -> list:
"""Query database using shared connection."""
# Access lifespan context
app_context: AppContext = context.fastmcp_context.lifespan_context
return await app_context.db.query(sql)
@mcp.tool()
async def api_request(endpoint: str, context: Context) -> dict:
"""Make API request using shared client."""
app_context: AppContext = context.fastmcp_context.lifespan_context
response = await app_context.api_client.get(endpoint)
return response.json()from fastapi import FastAPI
from fastmcp import FastMCP
# FastMCP lifespan
@asynccontextmanager
async def mcp_lifespan(server: FastMCP):
print("MCP server starting")
yield
print("MCP server stopping")
mcp = FastMCP("My Server", lifespan=mcp_lifespan)
# FastAPI app MUST include MCP lifespan
app = FastAPI(lifespan=mcp.lifespan)
# Add routes
@app.get("/")
def root():
return {"message": "Hello World"}app = FastAPI() # MCP lifespan won't run!app = FastAPI(lifespan=mcp.lifespan)from fastmcp import Context
@mcp.tool()
async def set_config(key: str, value: str, context: Context) -> dict:
"""Store configuration value."""
context.fastmcp_context.set_state(key, value)
return {"status": "saved", "key": key}
@mcp.tool()
async def get_config(key: str, context: Context) -> dict:
"""Retrieve configuration value."""
value = context.fastmcp_context.get_state(key, default=None)
if value is None:
return {"error": f"Key '{key}' not found"}
return {"key": key, "value": value}from fastmcp import FastMCP
from fastmcp.middleware import (
TimingMiddleware,
LoggingMiddleware,
RateLimitingMiddleware,
ResponseCachingMiddleware,
ErrorHandlingMiddleware
)
mcp = FastMCP("My Server")
# Add middleware (order matters!)
mcp.add_middleware(ErrorHandlingMiddleware())
mcp.add_middleware(TimingMiddleware())
mcp.add_middleware(LoggingMiddleware(level="INFO"))
mcp.add_middleware(RateLimitingMiddleware(
max_requests=100,
window_seconds=60,
algorithm="token_bucket"
))
mcp.add_middleware(ResponseCachingMiddleware(
ttl_seconds=300,
storage=RedisStore(host="localhost")
))Request Flow:
→ ErrorHandlingMiddleware (catches errors)
→ TimingMiddleware (starts timer)
→ LoggingMiddleware (logs request)
→ RateLimitingMiddleware (checks rate limit)
→ ResponseCachingMiddleware (checks cache)
→ Tool/Resource Handler
← ResponseCachingMiddleware (stores in cache)
← RateLimitingMiddleware
← LoggingMiddleware (logs response)
← TimingMiddleware (stops timer, logs duration)
← ErrorHandlingMiddleware (returns error if any)from fastmcp.middleware import BaseMiddleware
from fastmcp import Context
class AccessControlMiddleware(BaseMiddleware):
"""Check authorization before tool execution."""
def __init__(self, allowed_users: list[str]):
self.allowed_users = allowed_users
async def on_call_tool(self, tool_name: str, arguments: dict, context: Context):
"""Hook runs before tool execution."""
# Get user from context (from auth)
user = context.fastmcp_context.get_state("user_id")
if user not in self.allowed_users:
raise PermissionError(f"User '{user}' not authorized")
# Continue to tool
return await self.next(tool_name, arguments, context)
# Add to server
mcp.add_middleware(AccessControlMiddleware(
allowed_users=["alice", "bob", "charlie"]
))on_messageon_requeston_notificationon_call_toolon_read_resourceon_get_prompton_list_toolson_list_resourceson_list_promptson_list_resource_templatesclass ComprehensiveMiddleware(BaseMiddleware):
async def on_message(self, message: dict, context: Context):
"""Runs for ALL messages."""
print(f"Message: {message['method']}")
return await self.next(message, context)
async def on_call_tool(self, tool_name: str, arguments: dict, context: Context):
"""Runs only for tool calls."""
print(f"Tool: {tool_name}")
return await self.next(tool_name, arguments, context)
async def on_read_resource(self, uri: str, context: Context):
"""Runs only for resource reads."""
print(f"Resource: {uri}")
return await self.next(uri, context)from fastmcp.middleware import ResponseCachingMiddleware
from key_value.stores import RedisStore
# Cache responses for 5 minutes
cache_middleware = ResponseCachingMiddleware(
ttl_seconds=300,
storage=RedisStore(host="localhost"), # Shared across instances
cache_tools=True, # Cache tool calls
cache_resources=True, # Cache resource reads
cache_prompts=False # Don't cache prompts
)
mcp.add_middleware(cache_middleware)
# Tools/resources are automatically cached
@mcp.tool()
async def expensive_computation(data: str) -> dict:
"""This will be cached for 5 minutes."""
import time
time.sleep(5) # Expensive operation
return {"result": process(data)}import_server()mount()from fastmcp import FastMCP
# Subserver with tools
api_server = FastMCP("API Server")
@api_server.tool()
def api_tool():
return "API result"
@api_server.resource("api://status")
def api_status():
return {"status": "ok"}
# Main server imports components
main_server = FastMCP("Main Server")
# Import all components from subserver
main_server.import_server(api_server)
# Now main_server has api_tool and api://status
# Changes to api_server won't affect main_serverfrom fastmcp import FastMCP
# Create servers
api_server = FastMCP("API Server")
db_server = FastMCP("DB Server")
@api_server.tool()
def fetch_data():
return "API data"
@db_server.tool()
def query_db():
return "DB result"
# Main server mounts subservers
main_server = FastMCP("Main Server")
# Mount with prefix
main_server.mount(api_server, prefix="api")
main_server.mount(db_server, prefix="db")
# Tools are namespaced:
# - api.fetch_data
# - db.query_db
# Resources are prefixed:
# - resource://api/path/to/resource
# - resource://db/path/to/resource# In-memory access, subserver runs in same process
main_server.mount(subserver, prefix="sub")# Treats subserver as separate entity with own lifecycle
main_server.mount(
subserver,
prefix="sub",
mode="proxy"
)# Tag subserver components
@api_server.tool(tags=["public"])
def public_api():
return "Public"
@api_server.tool(tags=["admin"])
def admin_api():
return "Admin only"
# Import only public tools
main_server.import_server(
api_server,
include_tags=["public"]
)
# Or exclude admin tools
main_server.import_server(
api_server,
exclude_tags=["admin"]
)
# Tag filtering is recursive with mount()
main_server.mount(
api_server,
prefix="api",
include_tags=["public"]
)resource://prefix/path/to/resourceprefix+resource://path/to/resourcemain_server.mount(
subserver,
prefix="api",
resource_prefix_format="path" # or "protocol"
)TokenVerifierJWTVerifierRemoteAuthProviderOAuthProxyOAuthProviderfrom fastmcp import FastMCP
from fastmcp.auth import JWTVerifier
# JWT verification
auth = JWTVerifier(
issuer="https://auth.example.com",
audience="my-mcp-server",
public_key=os.getenv("JWT_PUBLIC_KEY")
)
mcp = FastMCP("Secure Server", auth=auth)
@mcp.tool()
async def secure_operation(context: Context) -> dict:
"""Only accessible with valid JWT."""
# Token validated automatically
user = context.fastmcp_context.get_state("user_id")
return {"user": user, "status": "authorized"}from fastmcp.auth import RemoteAuthProvider
auth = RemoteAuthProvider(
issuer="https://auth.example.com",
# Provider must support DCR
)
mcp = FastMCP("OAuth Server", auth=auth)from fastmcp.auth import OAuthProxy
from key_value.stores import RedisStore
from key_value.encryption import FernetEncryptionWrapper
from cryptography.fernet import Fernet
import os
auth = OAuthProxy(
# JWT signing for issued tokens
jwt_signing_key=os.environ["JWT_SIGNING_KEY"],
# Encrypted storage for upstream tokens
client_storage=FernetEncryptionWrapper(
key_value=RedisStore(
host=os.getenv("REDIS_HOST"),
password=os.getenv("REDIS_PASSWORD")
),
fernet=Fernet(os.environ["STORAGE_ENCRYPTION_KEY"])
),
# Upstream OAuth provider
upstream_authorization_endpoint="https://github.com/login/oauth/authorize",
upstream_token_endpoint="https://github.com/login/oauth/access_token",
upstream_client_id=os.getenv("GITHUB_CLIENT_ID"),
upstream_client_secret=os.getenv("GITHUB_CLIENT_SECRET"),
# Scopes
upstream_scope="read:user user:email",
# Security: Enable consent screen (prevents confused deputy attacks)
enable_consent_screen=True
)
mcp = FastMCP("GitHub Auth Server", auth=auth)from fastmcp.auth import OAuthProvider
auth = OAuthProvider(
issuer="https://my-auth-server.com",
client_storage=RedisStore(host="localhost"),
# Full OAuth 2.0 server implementation
)
mcp = FastMCP("Auth Server", auth=auth)export FASTMCP_SERVER_AUTH='{"type": "oauth_proxy", "upstream_authorization_endpoint": "...", ...}'# Automatically configures from FASTMCP_SERVER_AUTH
mcp = FastMCP("Auto Auth Server")https://github.com/login/oauth/authorizehttps://accounts.google.com/o/oauth2/v2/authhttps://login.microsoftonline.com/{tenant}/oauth2/v2.0/authorizehttps://{domain}.auth.{region}.amazoncognito.com/oauth2/authorizehttps://discord.com/api/oauth2/authorizehttps://www.facebook.com/v12.0/dialog/oauthfrom fastmcp import FastMCP, Icon
mcp = FastMCP(
name="Weather Service",
website_url="https://weather.example.com",
icons=[
Icon(
url="https://example.com/icon-small.png",
size="small"
),
Icon(
url="https://example.com/icon-large.png",
size="large"
)
]
)from fastmcp import Icon
@mcp.tool(icons=[
Icon(url="https://example.com/tool-icon.png")
])
async def analyze_data(data: str) -> dict:
"""Analyze data with visual icon."""
return {"result": "analyzed"}
@mcp.resource(
"user://{user_id}/profile",
icons=[Icon(url="https://example.com/user-icon.png")]
)
async def get_user(user_id: str) -> dict:
"""User profile with icon."""
return {"id": user_id, "name": "Alice"}
@mcp.prompt(
"analyze",
icons=[Icon(url="https://example.com/prompt-icon.png")]
)
def analysis_prompt(topic: str) -> str:
"""Analysis prompt with icon."""
return f"Analyze {topic}"from fastmcp import Icon, Image
# Convert local file to data URI
icon = Icon.from_file("/path/to/icon.png", size="medium")
# Or use Image utility
image_data_uri = Image.to_data_uri("/path/to/icon.png")
icon = Icon(url=image_data_uri, size="medium")
# Use in server
mcp = FastMCP(
"My Server",
icons=[icon]
)mcp = FastMCP(
"Responsive Server",
icons=[
Icon(url="icon-16.png", size="small"), # 16x16
Icon(url="icon-32.png", size="medium"), # 32x32
Icon(url="icon-64.png", size="large"), # 64x64
]
)import httpx
import os
# Create reusable client
client = httpx.AsyncClient(
base_url=os.getenv("API_BASE_URL"),
headers={"Authorization": f"Bearer {os.getenv('API_KEY')}"},
timeout=30.0
)
@mcp.tool()
async def fetch_data(endpoint: str) -> dict:
"""Fetch data from API."""
try:
response = await client.get(endpoint)
response.raise_for_status()
return {"success": True, "data": response.json()}
except httpx.HTTPStatusError as e:
return {"error": f"HTTP {e.response.status_code}"}
except Exception as e:
return {"error": str(e)}from fastmcp import FastMCP
from fastmcp.server.openapi import RouteMap, MCPType
import httpx
# Load OpenAPI spec
spec = httpx.get("https://api.example.com/openapi.json").json()
# Create authenticated client
client = httpx.AsyncClient(
base_url="https://api.example.com",
headers={"Authorization": f"Bearer {API_TOKEN}"},
timeout=30.0
)
# Auto-generate MCP server from OpenAPI
mcp = FastMCP.from_openapi(
openapi_spec=spec,
client=client,
name="API Server",
route_maps=[
# GET with parameters → Resource Templates
RouteMap(
methods=["GET"],
pattern=r".*\{.*\}.*",
mcp_type=MCPType.RESOURCE_TEMPLATE
),
# GET without parameters → Resources
RouteMap(
methods=["GET"],
mcp_type=MCPType.RESOURCE
),
# POST/PUT/DELETE → Tools
RouteMap(
methods=["POST", "PUT", "DELETE"],
mcp_type=MCPType.TOOL
),
]
)
# Optionally add custom tools
@mcp.tool()
async def custom_operation(data: dict) -> dict:
"""Custom tool on top of generated ones."""
return process_data(data)from fastapi import FastAPI
from fastmcp import FastMCP
# Existing FastAPI app
app = FastAPI()
@app.get("/items/{item_id}")
def get_item(item_id: int):
return {"id": item_id, "name": "Item"}
# Convert to MCP server
mcp = FastMCP.from_fastapi(
app=app,
httpx_client_kwargs={
"headers": {"Authorization": "Bearer token"}
}
)mcpserverapp# server.py
from fastmcp import FastMCP
import os
# ✅ CORRECT: Module-level server object
mcp = FastMCP(
name="production-server"
)
# ✅ Use environment variables
API_KEY = os.getenv("API_KEY")
DATABASE_URL = os.getenv("DATABASE_URL")
@mcp.tool()
async def production_tool(data: str) -> dict:
"""Production-ready tool."""
if not API_KEY:
return {"error": "API_KEY not configured"}
# Your implementation
return {"status": "success", "data": data}
# ✅ Optional: for local testing
if __name__ == "__main__":
mcp.run()def create_server():
mcp = FastMCP("my-server")
return mcp
if __name__ == "__main__":
server = create_server() # Too late for cloud!
server.run()def create_server() -> FastMCP:
mcp = FastMCP("my-server")
# Complex setup logic
return mcp
# Export at module level
mcp = create_server()
if __name__ == "__main__":
mcp.run()git init
git add .
git commit -m "Initial MCP server"
gh repo create my-mcp-server --public
git push -u origin mainserver.pyhttps://your-project.fastmcp.app/mcpclaude_desktop_config.json{
"mcpServers": {
"my-server": {
"url": "https://your-project.fastmcp.app/mcp",
"transport": "http"
}
}
}{
"mcpServers": {
"my-server": {
"command": "python",
"args": ["/absolute/path/to/server.py"],
"env": {
"API_KEY": "your-key",
"DATABASE_URL": "your-db-url"
}
}
}
}{
"mcpServers": {
"my-server": {
"command": "uv",
"args": ["run", "python", "/absolute/path/to/server.py"]
}
}
}RuntimeError: No server object found at module level# ❌ WRONG
def create_server():
return FastMCP("server")
# ✅ CORRECT
mcp = FastMCP("server") # At module levelRuntimeError: no running event loop
TypeError: object coroutine can't be used in 'await' expression# ❌ WRONG: Sync function calling async
@mcp.tool()
def bad_tool():
result = await async_function() # Error!
# ✅ CORRECT: Async tool
@mcp.tool()
async def good_tool():
result = await async_function()
return result
# ✅ CORRECT: Sync tool with sync code
@mcp.tool()
def sync_tool():
return "Hello"TypeError: missing 1 required positional argument: 'context'Contextfrom fastmcp import Context
# ❌ WRONG: No type hint
@mcp.tool()
async def bad_tool(context): # Missing type!
await context.report_progress(...)
# ✅ CORRECT: Proper type hint
@mcp.tool()
async def good_tool(context: Context):
await context.report_progress(0, 100, "Starting")ValueError: Invalid resource URI: missing scheme# ❌ WRONG: Missing scheme
@mcp.resource("config")
def get_config(): pass
# ✅ CORRECT: Include scheme
@mcp.resource("data://config")
def get_config(): pass
# ✅ Valid schemes
@mcp.resource("file://config.json")
@mcp.resource("api://status")
@mcp.resource("info://health")TypeError: get_user() missing 1 required positional argument: 'user_id'# ❌ WRONG: Parameter name mismatch
@mcp.resource("user://{user_id}/profile")
def get_user(id: str): # Wrong name!
pass
# ✅ CORRECT: Matching names
@mcp.resource("user://{user_id}/profile")
def get_user(user_id: str): # Matches {user_id}
return {"id": user_id}ValidationError: value is not a valid integerfrom pydantic import BaseModel, Field
# ✅ Use Pydantic models for complex validation
class SearchParams(BaseModel):
query: str = Field(min_length=1, max_length=100)
limit: int = Field(default=10, ge=1, le=100)
@mcp.tool()
async def search(params: SearchParams) -> dict:
# Validation automatic
return await perform_search(params.query, params.limit)ConnectionError: Server using different transport# Server using stdio (default)
mcp.run() # or mcp.run(transport="stdio")
# Client configuration must match
{
"command": "python",
"args": ["server.py"]
}
# OR for HTTP:
mcp.run(transport="http", port=8000)
# Client:
{
"url": "http://localhost:8000/mcp",
"transport": "http"
}ModuleNotFoundError: No module named 'my_package'# ✅ Install in editable mode
pip install -e .
# ✅ Or use absolute imports
from src.tools import my_tool
# ✅ Or add to PYTHONPATH
export PYTHONPATH="${PYTHONPATH}:/path/to/project"DeprecationWarning: 'mcp.settings' is deprecated, use global Settings instead# ❌ OLD: FastMCP v1
from fastmcp import FastMCP
mcp = FastMCP()
api_key = mcp.settings.get("API_KEY")
# ✅ NEW: FastMCP v2
import os
api_key = os.getenv("API_KEY")OSError: [Errno 48] Address already in use# ✅ Use different port
python server.py --transport http --port 8001
# ✅ Or kill process on port
lsof -ti:8000 | xargs kill -9TypeError: Object of type 'ndarray' is not JSON serializable# ❌ WRONG: NumPy array
import numpy as np
@mcp.tool()
def bad_tool() -> np.ndarray: # Not JSON serializable
return np.array([1, 2, 3])
# ✅ CORRECT: Use JSON-compatible types
@mcp.tool()
def good_tool() -> list[float]:
return [1.0, 2.0, 3.0]
# ✅ Or convert to dict
@mcp.tool()
def array_tool() -> dict:
data = np.array([1, 2, 3])
return {"values": data.tolist()}TypeError: Object of type 'datetime' is not JSON serializablefrom datetime import datetime
# ❌ WRONG: Return datetime object
@mcp.tool()
def bad_tool() -> dict:
return {"timestamp": datetime.now()} # Not serializable
# ✅ CORRECT: Convert to string
@mcp.tool()
def good_tool() -> dict:
return {"timestamp": datetime.now().isoformat()}
# ✅ Use helper function
def make_serializable(obj):
"""Convert object to JSON-serializable format."""
if isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, bytes):
return obj.decode('utf-8')
# Add more conversions as needed
return objImportError: cannot import name 'X' from partially initialized module# ❌ WRONG: Factory function in __init__.py
# shared/__init__.py
_client = None
def get_api_client():
from .api_client import APIClient # Circular!
return APIClient()
# shared/monitoring.py
from . import get_api_client # Creates circle
# ✅ CORRECT: Direct imports
# shared/__init__.py
from .api_client import APIClient
from .cache import CacheManager
# shared/monitoring.py
from .api_client import APIClient
client = APIClient() # Create directly
# ✅ ALTERNATIVE: Lazy import
# shared/monitoring.py
def get_client():
from .api_client import APIClient
return APIClient()DeprecationWarning: datetime.utcnow() is deprecated# ❌ DEPRECATED (Python 3.12+)
from datetime import datetime
timestamp = datetime.utcnow()
# ✅ CORRECT: Future-proof
from datetime import datetime, timezone
timestamp = datetime.now(timezone.utc)RuntimeError: Event loop is closed# ❌ WRONG: Module-level async execution
import asyncpg
connection = asyncpg.connect('postgresql://...') # Runs at import!
# ✅ CORRECT: Lazy initialization
import asyncpg
class Database:
connection = None
@classmethod
async def connect(cls):
if cls.connection is None:
cls.connection = await asyncpg.connect('postgresql://...')
return cls.connection
# Usage: connection happens when needed, not at import
@mcp.tool()
async def get_users():
conn = await Database.connect()
return await conn.fetch("SELECT * FROM users")RuntimeError: OAuth tokens lost on restart
ValueError: Cache not persisting across server instances# ❌ WRONG: Memory storage in production
mcp = FastMCP("Production Server") # Tokens lost on restart!
# ✅ CORRECT: Use disk or Redis storage
from key_value.stores import DiskStore, RedisStore
from key_value.encryption import FernetEncryptionWrapper
from cryptography.fernet import Fernet
# Disk storage (single instance)
mcp = FastMCP(
"Production Server",
storage=FernetEncryptionWrapper(
key_value=DiskStore(path="/var/lib/mcp/storage"),
fernet=Fernet(os.getenv("STORAGE_ENCRYPTION_KEY"))
)
)
# Redis storage (multi-instance)
mcp = FastMCP(
"Production Server",
storage=FernetEncryptionWrapper(
key_value=RedisStore(
host=os.getenv("REDIS_HOST"),
password=os.getenv("REDIS_PASSWORD")
),
fernet=Fernet(os.getenv("STORAGE_ENCRYPTION_KEY"))
)
)RuntimeError: Database connection never initialized
Warning: MCP lifespan hooks not runningfrom fastapi import FastAPI
from fastmcp import FastMCP
# ❌ WRONG: Lifespan not passed
mcp = FastMCP("My Server", lifespan=my_lifespan)
app = FastAPI() # MCP lifespan won't run!
# ✅ CORRECT: Pass MCP lifespan to parent app
mcp = FastMCP("My Server", lifespan=my_lifespan)
app = FastAPI(lifespan=mcp.lifespan)RuntimeError: Rate limit not checked before caching
AttributeError: Context state not available in middleware# ❌ WRONG: Cache before rate limiting
mcp.add_middleware(ResponseCachingMiddleware())
mcp.add_middleware(RateLimitingMiddleware()) # Too late!
# ✅ CORRECT: Rate limit before cache
mcp.add_middleware(ErrorHandlingMiddleware()) # First: catch errors
mcp.add_middleware(TimingMiddleware()) # Second: time requests
mcp.add_middleware(LoggingMiddleware()) # Third: log
mcp.add_middleware(RateLimitingMiddleware()) # Fourth: check limits
mcp.add_middleware(ResponseCachingMiddleware()) # Last: cacheRecursionError: maximum recursion depth exceeded
RuntimeError: Middleware loop detectedself.next()# ❌ WRONG: Not calling next() or calling incorrectly
class BadMiddleware(BaseMiddleware):
async def on_call_tool(self, tool_name, arguments, context):
# Forgot to call next()!
return {"error": "blocked"}
# ✅ CORRECT: Always call next() to continue chain
class GoodMiddleware(BaseMiddleware):
async def on_call_tool(self, tool_name, arguments, context):
# Do preprocessing
print(f"Before: {tool_name}")
# MUST call next() to continue
result = await self.next(tool_name, arguments, context)
# Do postprocessing
print(f"After: {tool_name}")
return resultRuntimeError: Subserver changes not reflected
ValueError: Unexpected tool namespacingimport_server()mount()# ❌ WRONG: Using import when you want dynamic updates
main_server.import_server(subserver)
# Later: changes to subserver won't appear in main_server
# ✅ CORRECT: Use mount() for dynamic composition
main_server.mount(subserver, prefix="sub")
# Changes to subserver are immediately visible
# ❌ WRONG: Using mount when you want static bundle
main_server.mount(third_party_server, prefix="vendor")
# Runtime overhead for static components
# ✅ CORRECT: Use import_server() for static bundles
main_server.import_server(third_party_server)
# One-time copy, no runtime delegationValueError: Resource not found: resource://api/users
ValueError: Unexpected resource URI format# Path format (default since v2.4.0)
main_server.mount(api_server, prefix="api")
# Resources: resource://api/users
# ❌ WRONG: Expecting protocol format
# resource://api+users (doesn't exist)
# ✅ CORRECT: Use path format
uri = "resource://api/users"
# OR explicitly set protocol format (legacy)
main_server.mount(
api_server,
prefix="api",
resource_prefix_format="protocol"
)
# Resources: api+resource://usersSecurityWarning: Authorization bypass possible
RuntimeError: Confused deputy attack vector# ❌ WRONG: No consent screen (security risk!)
auth = OAuthProxy(
jwt_signing_key=os.getenv("JWT_KEY"),
upstream_authorization_endpoint="...",
upstream_token_endpoint="...",
# Missing: enable_consent_screen
)
# ✅ CORRECT: Enable consent screen
auth = OAuthProxy(
jwt_signing_key=os.getenv("JWT_KEY"),
upstream_authorization_endpoint="...",
upstream_token_endpoint="...",
enable_consent_screen=True # Prevents bypass attacks
)ValueError: JWT signing key required for OAuth Proxy
RuntimeError: Cannot issue tokens without signing keyjwt_signing_key# ❌ WRONG: No JWT signing key
auth = OAuthProxy(
upstream_authorization_endpoint="...",
upstream_token_endpoint="...",
# Missing: jwt_signing_key
)
# ✅ CORRECT: Provide signing key from environment
import secrets
# Generate once (in setup):
# signing_key = secrets.token_urlsafe(32)
# Store in: FASTMCP_JWT_SIGNING_KEY environment variable
auth = OAuthProxy(
jwt_signing_key=os.environ["FASTMCP_JWT_SIGNING_KEY"],
client_storage=encrypted_storage,
upstream_authorization_endpoint="...",
upstream_token_endpoint="...",
upstream_client_id=os.getenv("OAUTH_CLIENT_ID"),
upstream_client_secret=os.getenv("OAUTH_CLIENT_SECRET")
)ValueError: Invalid data URI format
TypeError: Icon URL must be string or data URIfrom fastmcp import Icon, Image
# ❌ WRONG: Invalid data URI
icon = Icon(url="base64,iVBORw0KG...") # Missing data:image/png;
# ✅ CORRECT: Use Image utility
icon = Icon.from_file("/path/to/icon.png", size="medium")
# ✅ CORRECT: Manual data URI
import base64
with open("/path/to/icon.png", "rb") as f:
image_data = base64.b64encode(f.read()).decode()
data_uri = f"data:image/png;base64,{image_data}"
icon = Icon(url=data_uri, size="medium")Warning: Lifespan runs per-server, not per-session
RuntimeError: Resources initialized multiple times# v2.12.0 and earlier: Lifespan ran per client session
# v2.13.0+: Lifespan runs once per server instance
# ✅ CORRECT: v2.13.0+ pattern (per-server)
@asynccontextmanager
async def app_lifespan(server: FastMCP):
"""Runs ONCE when server starts, not per client session."""
db = await Database.connect()
print("Server starting - runs once")
try:
yield {"db": db}
finally:
await db.disconnect()
print("Server stopping - runs once")
mcp = FastMCP("My Server", lifespan=app_lifespan)
# For per-session logic, use middleware instead:
class SessionMiddleware(BaseMiddleware):
async def on_message(self, message, context):
# Runs per client message
session_id = context.fastmcp_context.get_state("session_id")
if not session_id:
session_id = str(uuid.uuid4())
context.fastmcp_context.set_state("session_id", session_id)
return await self.next(message, context)# src/utils.py - Single file with all utilities
import os
from typing import Dict, Any
from datetime import datetime
class Config:
"""Application configuration."""
SERVER_NAME = os.getenv("SERVER_NAME", "FastMCP Server")
SERVER_VERSION = "1.0.0"
API_BASE_URL = os.getenv("API_BASE_URL")
API_KEY = os.getenv("API_KEY")
CACHE_TTL = int(os.getenv("CACHE_TTL", "300"))
def format_success(data: Any, message: str = "Success") -> Dict[str, Any]:
"""Format successful response."""
return {
"success": True,
"message": message,
"data": data,
"timestamp": datetime.now().isoformat()
}
def format_error(error: str, code: str = "ERROR") -> Dict[str, Any]:
"""Format error response."""
return {
"success": False,
"error": error,
"code": code,
"timestamp": datetime.now().isoformat()
}
# Usage in tools
from .utils import format_success, format_error, Config
@mcp.tool()
async def process_data(data: dict) -> dict:
try:
result = await process(data)
return format_success(result)
except Exception as e:
return format_error(str(e))import httpx
from typing import Optional
class APIClient:
_instance: Optional[httpx.AsyncClient] = None
@classmethod
async def get_client(cls) -> httpx.AsyncClient:
if cls._instance is None:
cls._instance = httpx.AsyncClient(
base_url=os.getenv("API_BASE_URL"),
headers={"Authorization": f"Bearer {os.getenv('API_KEY')}"},
timeout=httpx.Timeout(30.0),
limits=httpx.Limits(max_keepalive_connections=5)
)
return cls._instance
@classmethod
async def cleanup(cls):
if cls._instance:
await cls._instance.aclose()
cls._instance = None
@mcp.tool()
async def api_request(endpoint: str) -> dict:
"""Make API request with managed client."""
client = await APIClient.get_client()
response = await client.get(endpoint)
return response.json()import asyncio
from typing import Callable, TypeVar
T = TypeVar('T')
async def retry_with_backoff(
func: Callable[[], T],
max_retries: int = 3,
initial_delay: float = 1.0,
exponential_base: float = 2.0
) -> T:
"""Retry function with exponential backoff."""
delay = initial_delay
last_exception = None
for attempt in range(max_retries):
try:
return await func()
except Exception as e:
last_exception = e
if attempt < max_retries - 1:
await asyncio.sleep(delay)
delay *= exponential_base
raise last_exception
@mcp.tool()
async def resilient_api_call(endpoint: str) -> dict:
"""API call with automatic retry."""
async def make_call():
async with httpx.AsyncClient() as client:
response = await client.get(endpoint)
response.raise_for_status()
return response.json()
try:
data = await retry_with_backoff(make_call)
return {"success": True, "data": data}
except Exception as e:
return {"error": f"Failed after retries: {e}"}import time
from typing import Any, Optional
class TimeBasedCache:
def __init__(self, ttl: int = 300):
self.ttl = ttl
self.cache = {}
self.timestamps = {}
def get(self, key: str) -> Optional[Any]:
if key in self.cache:
if time.time() - self.timestamps[key] < self.ttl:
return self.cache[key]
else:
del self.cache[key]
del self.timestamps[key]
return None
def set(self, key: str, value: Any):
self.cache[key] = value
self.timestamps[key] = time.time()
cache = TimeBasedCache(ttl=300)
@mcp.tool()
async def cached_fetch(resource_id: str) -> dict:
"""Fetch with caching."""
cache_key = f"resource:{resource_id}"
cached_data = cache.get(cache_key)
if cached_data:
return {"data": cached_data, "from_cache": True}
data = await fetch_from_api(resource_id)
cache.set(cache_key, data)
return {"data": data, "from_cache": False}import pytest
from fastmcp import FastMCP
from fastmcp.testing import create_test_client
@pytest.fixture
def test_server():
"""Create test server instance."""
mcp = FastMCP("test-server")
@mcp.tool()
async def test_tool(param: str) -> str:
return f"Result: {param}"
return mcp
@pytest.mark.asyncio
async def test_tool_execution(test_server):
"""Test tool execution."""
async with create_test_client(test_server) as client:
result = await client.call_tool("test_tool", {"param": "test"})
assert result.data == "Result: test"import asyncio
from fastmcp import Client
async def test_server():
"""Test all server functionality."""
async with Client("server.py") as client:
# Test tools
tools = await client.list_tools()
print(f"Tools: {len(tools)}")
for tool in tools:
try:
result = await client.call_tool(tool.name, {})
print(f"✓ {tool.name}: {result}")
except Exception as e:
print(f"✗ {tool.name}: {e}")
# Test resources
resources = await client.list_resources()
for resource in resources:
try:
data = await client.read_resource(resource.uri)
print(f"✓ {resource.uri}")
except Exception as e:
print(f"✗ {resource.uri}: {e}")
if __name__ == "__main__":
asyncio.run(test_server())# Run with inspector (recommended)
fastmcp dev server.py
# Run normally
fastmcp run server.py
# Inspect server without running
fastmcp inspect server.py# Install to Claude Desktop
fastmcp install server.py
# Install with custom name
fastmcp install server.py --name "My Server"# Enable debug logging
FASTMCP_LOG_LEVEL=DEBUG fastmcp dev server.py
# Run with HTTP transport
fastmcp run server.py --transport http --port 8000from fastmcp import FastMCP
import os
def create_server() -> FastMCP:
"""Factory function for complex setup."""
mcp = FastMCP("Server Name")
# Configure server
setup_tools(mcp)
setup_resources(mcp)
return mcp
def setup_tools(mcp: FastMCP):
"""Register all tools."""
@mcp.tool()
def example_tool():
pass
def setup_resources(mcp: FastMCP):
"""Register all resources."""
@mcp.resource("data://config")
def get_config():
return {"version": "1.0.0"}
# Export at module level
mcp = create_server()
if __name__ == "__main__":
mcp.run()import os
from dotenv import load_dotenv
load_dotenv()
class Config:
API_KEY = os.getenv("API_KEY", "")
BASE_URL = os.getenv("BASE_URL", "https://api.example.com")
DEBUG = os.getenv("DEBUG", "false").lower() == "true"
@classmethod
def validate(cls):
if not cls.API_KEY:
raise ValueError("API_KEY is required")
return True
# Validate on startup
Config.validate()@mcp.tool()
def complex_tool(
query: str,
filters: dict = None,
limit: int = 10
) -> dict:
"""
Search with advanced filtering.
Args:
query: Search query string
filters: Optional filters dict with keys:
- category: Filter by category
- date_from: Start date (ISO format)
- date_to: End date (ISO format)
limit: Maximum results (1-100)
Returns:
Dict with 'results' list and 'total' count
Examples:
>>> complex_tool("python", {"category": "tutorial"}, 5)
{'results': [...], 'total': 5}
"""
pass@mcp.resource("health://status")
async def health_check() -> dict:
"""Comprehensive health check."""
checks = {}
# Check API connectivity
try:
async with httpx.AsyncClient() as client:
response = await client.get(f"{BASE_URL}/health", timeout=5)
checks["api"] = response.status_code == 200
except:
checks["api"] = False
# Check database
try:
checks["database"] = await check_db_connection()
except:
checks["database"] = False
all_healthy = all(checks.values())
return {
"status": "healthy" if all_healthy else "degraded",
"timestamp": datetime.now().isoformat(),
"checks": checks
}my-mcp-server/
├── server.py # Main server file
├── requirements.txt # Dependencies
├── .env # Environment variables (git-ignored)
├── .gitignore # Git ignore file
└── README.md # Documentationmy-mcp-server/
├── src/
│ ├── server.py # Main entry point
│ ├── utils.py # Shared utilities
│ ├── tools/ # Tool modules
│ │ ├── __init__.py
│ │ ├── api_tools.py
│ │ └── data_tools.py
│ ├── resources/ # Resource definitions
│ │ ├── __init__.py
│ │ └── static.py
│ └── prompts/ # Prompt templates
│ ├── __init__.py
│ └── templates.py
├── tests/
│ ├── test_tools.py
│ └── test_resources.py
├── requirements.txt
├── pyproject.toml
├── .env
├── .gitignore
└── README.md/jlowin/fastmcpopenai-apiclaude-apicloudflare-worker-baseimport_server()mount()fastmcp dev