building-mcp-servers
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseBuilding MCP Servers
构建MCP服务器
Reference guide for Model Context Protocol server development (January 2026). Covers TypeScript, Python, and C# implementations with Streamable HTTP transport.
Model Context Protocol(模型上下文协议)服务器开发参考指南(2026年1月)。涵盖TypeScript、Python和C#三种语言的实现,以及Streamable HTTP传输的配置。
Quick Reference
快速参考
| Language | Package | Version | Transport |
|---|---|---|---|
| TypeScript | | 1.25.1 | |
| Python | | 1.25.0 | |
| C# | | 0.6.0-preview | |
| 语言 | 包 | 版本 | 传输方式 |
|---|---|---|---|
| TypeScript | | 1.25.1 | |
| Python | | 1.25.0 | |
| C# | | 0.6.0-preview | |
Transport Status
传输方式状态
| Transport | Status | Use Case |
|---|---|---|
| stdio | Supported | Local/CLI (Claude Desktop, Cursor) |
| Streamable HTTP | Recommended | Remote servers, production |
| SSE | Deprecated | Legacy only |
| 传输方式 | 状态 | 使用场景 |
|---|---|---|
| stdio | 已支持 | 本地/CLI(Claude Desktop、Cursor) |
| Streamable HTTP | 推荐使用 | 远程服务器、生产环境 |
| SSE | 已废弃 | 仅用于遗留系统 |
Streamable HTTP Transport
Streamable HTTP传输
Single endpoint replaces dual SSE endpoints. Supports stateful sessions or stateless (serverless) mode.
单端点替代了原有的双SSE端点。支持有状态会话或无状态(Serverless)模式。
Protocol Flow
协议流程
Client Server
|------ POST /mcp ---------------->| (JSON-RPC messages)
|<----- JSON or SSE response ------|
|------ GET /mcp ----------------->| (Optional: server-initiated)
|<----- SSE stream ----------------|Client Server
|------ POST /mcp ---------------->| (JSON-RPC messages)
|<----- JSON or SSE response ------|
|------ GET /mcp ----------------->| (Optional: server-initiated)
|<----- SSE stream ----------------|Required Headers
必填请求头
http
POST /mcp HTTP/1.1
Content-Type: application/json
Accept: application/json, text/event-stream
Mcp-Session-Id: <session-id> # After initializationhttp
POST /mcp HTTP/1.1
Content-Type: application/json
Accept: application/json, text/event-stream
Mcp-Session-Id: <session-id> # After initializationTypeScript Implementation
TypeScript实现
Installation
安装
bash
npm install @modelcontextprotocol/sdk zod express
npm install -D @types/expressbash
npm install @modelcontextprotocol/sdk zod express
npm install -D @types/expressBasic Server
基础服务器
typescript
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { z } from "zod";
import express from "express";
const app = express();
app.use(express.json());
const server = new McpServer({
name: "my-server",
version: "1.0.0"
});
// Tool
server.registerTool("add", {
description: "Add two numbers",
inputSchema: { a: z.number(), b: z.number() }
}, async ({ a, b }) => ({
content: [{ type: "text", text: String(a + b) }]
}));
// Resource
server.registerResource(
"config",
"config://app",
{ description: "App configuration" },
async (uri) => ({
contents: [{ uri: uri.href, text: JSON.stringify({ env: "prod" }) }]
})
);
// Prompt
server.registerPrompt("review", {
description: "Code review",
argsSchema: { code: z.string() }
}, ({ code }) => ({
messages: [{ role: "user", content: { type: "text", text: `Review:\n${code}` } }]
}));
// Transport
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => crypto.randomUUID()
});
await server.connect(transport);
app.all("/mcp", async (req, res) => {
await transport.handleRequest(req, res);
});
app.listen(3000);Note: Top-levelrequires Node.js with ES modules (awaitin package.json or"type": "module"extension)..mjs
typescript
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { z } from "zod";
import express from "express";
const app = express();
app.use(express.json());
const server = new McpServer({
name: "my-server",
version: "1.0.0"
});
// Tool
server.registerTool("add", {
description: "Add two numbers",
inputSchema: { a: z.number(), b: z.number() }
}, async ({ a, b }) => ({
content: [{ type: "text", text: String(a + b) }]
}));
// Resource
server.registerResource(
"config",
"config://app",
{ description: "App configuration" },
async (uri) => ({
contents: [{ uri: uri.href, text: JSON.stringify({ env: "prod" }) }]
})
);
// Prompt
server.registerPrompt("review", {
description: "Code review",
argsSchema: { code: z.string() }
}, ({ code }) => ({
messages: [{ role: "user", content: { type: "text", text: `Review:\n${code}` } }]
}));
// Transport
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => crypto.randomUUID()
});
await server.connect(transport);
app.all("/mcp", async (req, res) => {
await transport.handleRequest(req, res);
});
app.listen(3000);注意: 顶层需要Node.js启用ES模块(在package.json中设置await或使用"type": "module"扩展名)。.mjs
Stateless Mode (Serverless)
无状态模式(Serverless)
typescript
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: undefined // Disables sessions
});typescript
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: undefined // Disables sessions
});Python Implementation
Python实现
Installation
安装
bash
pip install "mcp[cli]"bash
pip install "mcp[cli]"Basic Server (FastMCP)
基础服务器(FastMCP)
python
from mcp.server.fastmcp import FastMCP, Context
from typing import List
mcp = FastMCP("my-server")python
from mcp.server.fastmcp import FastMCP, Context
from typing import List
mcp = FastMCP("my-server")Tool
Tool
@mcp.tool()
def add(a: int, b: int) -> int:
"""Add two numbers."""
return a + b
@mcp.tool()
def add(a: int, b: int) -> int:
"""Add two numbers."""
return a + b
Async tool with progress
Async tool with progress
@mcp.tool()
async def process_files(files: List[str], ctx: Context) -> str:
"""Process files with progress."""
for i, f in enumerate(files):
await ctx.report_progress(i + 1, len(files))
return f"Processed {len(files)} files"
@mcp.tool()
async def process_files(files: List[str], ctx: Context) -> str:
"""Process files with progress."""
for i, f in enumerate(files):
await ctx.report_progress(i + 1, len(files))
return f"Processed {len(files)} files"
Resource
Resource
@mcp.resource("config://app")
def get_config() -> dict:
"""App configuration."""
return {"env": "prod", "version": "1.0.0"}
@mcp.resource("config://app")
def get_config() -> dict:
"""App configuration."""
return {"env": "prod", "version": "1.0.0"}
Dynamic resource
Dynamic resource
@mcp.resource("users://{user_id}/profile")
def get_user(user_id: str) -> dict:
"""Get user profile."""
return {"id": user_id, "name": f"User {user_id}"}
@mcp.resource("users://{user_id}/profile")
def get_user(user_id: str) -> dict:
"""Get user profile."""
return {"id": user_id, "name": f"User {user_id}"}
Prompt
Prompt
@mcp.prompt()
def code_review(code: str, language: str = "python") -> str:
"""Code review prompt."""
return f"Review this {language} code:\n\n"
{language}\n{code}\nif name == "main":
# Streamable HTTP
mcp.run(transport="streamable-http", host="0.0.0.0", port=8000, path="/mcp")
# Or stdio: mcp.run()
undefined@mcp.prompt()
def code_review(code: str, language: str = "python") -> str:
"""Code review prompt."""
return f"Review this {language} code:\n\n"
{language}\n{code}\nif name == "main":
# Streamable HTTP
mcp.run(transport="streamable-http", host="0.0.0.0", port=8000, path="/mcp")
# Or stdio: mcp.run()
undefinedFastAPI Integration
FastAPI集成
python
from fastapi import FastAPI
from mcp.server.fastmcp import FastMCP
api = FastAPI()
mcp = FastMCP("api-tools")
@mcp.tool()
def query(sql: str) -> dict:
return {"result": "data"}
api.mount("/mcp", mcp.streamable_http_app())python
from fastapi import FastAPI
from mcp.server.fastmcp import FastMCP
api = FastAPI()
mcp = FastMCP("api-tools")
@mcp.tool()
def query(sql: str) -> dict:
return {"result": "data"}
api.mount("/mcp", mcp.streamable_http_app())Run: uvicorn app:api --port 8000
Run: uvicorn app:api --port 8000
undefinedundefinedC# Implementation
C#实现
Installation
安装
bash
dotnet add package ModelContextProtocol.AspNetCore --prereleasebash
dotnet add package ModelContextProtocol.AspNetCore --prereleaseBasic Server
基础服务器
csharp
using System.ComponentModel;
using Microsoft.Extensions.DependencyInjection;
using ModelContextProtocol.Server;
var builder = WebApplication.CreateBuilder(args);
builder.Services
.AddMcpServer()
.WithHttpTransport()
.WithToolsFromAssembly()
.WithPromptsFromAssembly()
.WithResourcesFromAssembly();
var app = builder.Build();
app.MapMcp(); // Maps /, /sse, /messages
app.Run();
// Tools
[McpServerToolType]
public static class MyTools
{
[McpServerTool, Description("Add two numbers")]
public static int Add(
[Description("First number")] int a,
[Description("Second number")] int b) => a + b;
[McpServerTool, Description("Get weather")]
public static async Task<string> GetWeather(
HttpClient http, // Injected from DI
[Description("City name")] string city,
CancellationToken ct)
{
var data = await http.GetStringAsync($"https://api.weather.example/{city}", ct);
return data;
}
}
// Prompts
[McpServerPromptType]
public static class MyPrompts
{
[McpServerPrompt, Description("Code review prompt")]
public static ChatMessage CodeReview(
[Description("Code to review")] string code) =>
new(ChatRole.User, $"Review this code:\n\n{code}");
}
// Resources
[McpServerResourceType]
public static class MyResources
{
[McpServerResource(Name = "config://app"), Description("App config")]
public static string Config() => """{"env": "production"}""";
[McpServerResource(UriTemplate = "docs://{topic}")]
public static string GetDoc([Description("Topic")] string topic) =>
$"Documentation for {topic}";
}csharp
using System.ComponentModel;
using Microsoft.Extensions.DependencyInjection;
using ModelContextProtocol.Server;
var builder = WebApplication.CreateBuilder(args);
builder.Services
.AddMcpServer()
.WithHttpTransport()
.WithToolsFromAssembly()
.WithPromptsFromAssembly()
.WithResourcesFromAssembly();
var app = builder.Build();
app.MapMcp(); // Maps /, /sse, /messages
app.Run();
// Tools
[McpServerToolType]
public static class MyTools
{
[McpServerTool, Description("Add two numbers")]
public static int Add(
[Description("First number")] int a,
[Description("Second number")] int b) => a + b;
[McpServerTool, Description("Get weather")]
public static async Task<string> GetWeather(
HttpClient http, // Injected from DI
[Description("City name")] string city,
CancellationToken ct)
{
var data = await http.GetStringAsync($"https://api.weather.example/{city}", ct);
return data;
}
}
// Prompts
[McpServerPromptType]
public static class MyPrompts
{
[McpServerPrompt, Description("Code review prompt")]
public static ChatMessage CodeReview(
[Description("Code to review")] string code) =>
new(ChatRole.User, $"Review this code:\n\n{code}");
}
// Resources
[McpServerResourceType]
public static class MyResources
{
[McpServerResource(Name = "config://app"), Description("App config")]
public static string Config() => """{"env": "production"}""";
[McpServerResource(UriTemplate = "docs://{topic}")]
public static string GetDoc([Description("Topic")] string topic) =>
$"Documentation for {topic}";
}Stdio Transport (Local)
Stdio传输(本地)
csharp
var builder = Host.CreateApplicationBuilder(args);
builder.Logging.AddConsole(o => o.LogToStandardErrorThreshold = LogLevel.Trace);
builder.Services
.AddMcpServer()
.WithStdioServerTransport()
.WithToolsFromAssembly();
await builder.Build().RunAsync();csharp
var builder = Host.CreateApplicationBuilder(args);
builder.Logging.AddConsole(o => o.LogToStandardErrorThreshold = LogLevel.Trace);
builder.Services
.AddMcpServer()
.WithStdioServerTransport()
.WithToolsFromAssembly();
await builder.Build().RunAsync();Authentication (OAuth 2.1)
认证(OAuth 2.1)
MCP servers are OAuth Resource Servers (not Authorization Servers). Key requirements:
- PKCE mandatory (S256 method)
- Resource Indicators (RFC 8707) required
- Bearer tokens in Authorization header
- Audience validation on every request
MCP服务器是OAuth资源服务器(而非授权服务器)。核心要求:
- 强制使用PKCE(S256方法)
- 必须使用资源指示器(RFC 8707)
- 在Authorization请求头中使用Bearer令牌
- 对每个请求都验证受众(Audience)
Discovery Endpoints
发现端点
http
GET /.well-known/oauth-protected-resource # Server metadata
GET /.well-known/oauth-authorization-server # Auth server metadatahttp
GET /.well-known/oauth-protected-resource # Server metadata
GET /.well-known/oauth-authorization-server # Auth server metadataServer Response on 401
401错误时的服务器响应
http
HTTP/1.1 401 Unauthorized
WWW-Authenticate: Bearer realm="mcp",
resource_metadata="https://mcp.example.com/.well-known/oauth-protected-resource"http
HTTP/1.1 401 Unauthorized
WWW-Authenticate: Bearer realm="mcp",
resource_metadata="https://mcp.example.com/.well-known/oauth-protected-resource"Token Validation (Python)
令牌验证(Python)
python
import jwt
from jwt import PyJWKClient
class TokenValidator:
def __init__(self, jwks_uri: str, audience: str):
self.jwks = PyJWKClient(jwks_uri)
self.audience = audience
def validate(self, token: str) -> dict:
key = self.jwks.get_signing_key_from_jwt(token)
return jwt.decode(
token, key.key,
algorithms=["RS256"],
audience=self.audience,
options={"require": ["exp", "aud", "iss"]}
)python
import jwt
from jwt import PyJWKClient
class TokenValidator:
def __init__(self, jwks_uri: str, audience: str):
self.jwks = PyJWKClient(jwks_uri)
self.audience = audience
def validate(self, token: str) -> dict:
key = self.jwks.get_signing_key_from_jwt(token)
return jwt.decode(
token, key.key,
algorithms=["RS256"],
audience=self.audience,
options={"require": ["exp", "aud", "iss"]}
)Protected Resource Metadata Response
受保护资源元数据响应
json
{
"resource": "https://mcp.example.com",
"authorization_servers": ["https://auth.example.com"],
"scopes_supported": ["read", "write", "admin"],
"bearer_methods_supported": ["header"]
}json
{
"resource": "https://mcp.example.com",
"authorization_servers": ["https://auth.example.com"],
"scopes_supported": ["read", "write", "admin"],
"bearer_methods_supported": ["header"]
}OAuth Middleware Integration (Python/Starlette)
OAuth中间件集成(Python/Starlette)
python
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
from mcp.server.fastmcp import FastMCP
class BearerAuthMiddleware(BaseHTTPMiddleware):
def __init__(self, app, validator: TokenValidator):
super().__init__(app)
self.validator = validator
async def dispatch(self, request, call_next):
auth = request.headers.get("Authorization", "")
if not auth.startswith("Bearer "):
return JSONResponse(
{"error": "unauthorized"},
status_code=401,
headers={"WWW-Authenticate": 'Bearer realm="mcp"'}
)
try:
token = auth.replace("Bearer ", "")
request.state.auth = self.validator.validate(token)
except Exception:
return JSONResponse({"error": "invalid_token"}, status_code=401)
return await call_next(request)python
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
from mcp.server.fastmcp import FastMCP
class BearerAuthMiddleware(BaseHTTPMiddleware):
def __init__(self, app, validator: TokenValidator):
super().__init__(app)
self.validator = validator
async def dispatch(self, request, call_next):
auth = request.headers.get("Authorization", "")
if not auth.startswith("Bearer "):
return JSONResponse(
{"error": "unauthorized"},
status_code=401,
headers={"WWW-Authenticate": 'Bearer realm="mcp"'}
)
try:
token = auth.replace("Bearer ", "")
request.state.auth = self.validator.validate(token)
except Exception:
return JSONResponse({"error": "invalid_token"}, status_code=401)
return await call_next(request)Setup
Setup
mcp = FastMCP("secure-server")
validator = TokenValidator(
jwks_uri="https://auth.example.com/.well-known/jwks.json",
audience="https://mcp.example.com"
)
app = Starlette(
routes=[Mount("/mcp", app=mcp.streamable_http_app())],
middleware=[Middleware(BearerAuthMiddleware, validator=validator)]
)
undefinedmcp = FastMCP("secure-server")
validator = TokenValidator(
jwks_uri="https://auth.example.com/.well-known/jwks.json",
audience="https://mcp.example.com"
)
app = Starlette(
routes=[Mount("/mcp", app=mcp.streamable_http_app())],
middleware=[Middleware(BearerAuthMiddleware, validator=validator)]
)
undefinedOAuth Middleware Integration (TypeScript/Express)
OAuth中间件集成(TypeScript/Express)
typescript
import jwt from "jsonwebtoken";
import jwksClient from "jwks-rsa";
const client = jwksClient({ jwksUri: "https://auth.example.com/.well-known/jwks.json" });
const authMiddleware = async (req, res, next) => {
const auth = req.headers.authorization;
if (!auth?.startsWith("Bearer ")) {
return res.status(401).json({ error: "unauthorized" });
}
try {
const token = auth.slice(7);
const decoded = jwt.decode(token, { complete: true });
const key = await client.getSigningKey(decoded.header.kid);
req.auth = jwt.verify(token, key.getPublicKey(), {
audience: "https://mcp.example.com",
algorithms: ["RS256"]
});
next();
} catch (err) {
res.status(401).json({ error: "invalid_token" });
}
};
app.use("/mcp", authMiddleware);typescript
import jwt from "jsonwebtoken";
import jwksClient from "jwks-rsa";
const client = jwksClient({ jwksUri: "https://auth.example.com/.well-known/jwks.json" });
const authMiddleware = async (req, res, next) => {
const auth = req.headers.authorization;
if (!auth?.startsWith("Bearer ")) {
return res.status(401).json({ error: "unauthorized" });
}
try {
const token = auth.slice(7);
const decoded = jwt.decode(token, { complete: true });
const key = await client.getSigningKey(decoded.header.kid);
req.auth = jwt.verify(token, key.getPublicKey(), {
audience: "https://mcp.example.com",
algorithms: ["RS256"]
});
next();
} catch (err) {
res.status(401).json({ error: "invalid_token" });
}
};
app.use("/mcp", authMiddleware);Authenticated Client Request
已认证的客户端请求
typescript
const transport = new StreamableHTTPClientTransport(
new URL("https://mcp.example.com/mcp"),
{
requestInit: {
headers: { Authorization: `Bearer ${accessToken}` }
}
}
);typescript
const transport = new StreamableHTTPClientTransport(
new URL("https://mcp.example.com/mcp"),
{
requestInit: {
headers: { Authorization: `Bearer ${accessToken}` }
}
}
);Security Anti-Patterns
安全反模式
| Anti-Pattern | Fix |
|---|---|
| Token passthrough to upstream APIs | Use separate tokens for upstream calls |
| Missing audience validation | Always validate |
| Tokens in URLs | Use Authorization header only |
| 反模式 | 修复方案 |
|---|---|
| 将令牌透传给上游API | 为上游调用使用独立的令牌 |
| 缺少受众验证 | 始终验证 |
| 在URL中传递令牌 | 仅使用Authorization请求头 |
Scope Enforcement in Tools
工具中的权限范围校验
Check scopes before executing sensitive operations:
TypeScript:
typescript
const authMiddleware = async (req, res, next) => {
// ... token validation ...
req.auth = { sub: decoded.sub, scopes: decoded.scope?.split(" ") || [] };
next();
};
server.registerTool("delete_user", { /* ... */ }, async ({ userId }, { meta }) => {
const scopes = meta?.auth?.scopes || [];
if (!scopes.includes("admin:write")) {
return { isError: true, content: [{ type: "text", text: "Insufficient scope" }] };
}
// ... perform deletion ...
});Python: Use HTTP middleware to validate, then check in tools:
python
undefined在执行敏感操作前检查权限范围:
TypeScript:
typescript
const authMiddleware = async (req, res, next) => {
// ... token validation ...
req.auth = { sub: decoded.sub, scopes: decoded.scope?.split(" ") || [] };
next();
};
server.registerTool("delete_user", { /* ... */ }, async ({ userId }, { meta }) => {
const scopes = meta?.auth?.scopes || [];
if (!scopes.includes("admin:write")) {
return { isError: true, content: [{ type: "text", text: "Insufficient scope" }] };
}
// ... perform deletion ...
});Python: 使用HTTP中间件进行验证,然后在工具中检查:
python
undefinedWith Starlette middleware (see OAuth Middleware Integration above)
With Starlette middleware (see OAuth Middleware Integration above)
Store validated claims in request.state, then check in tool:
Store validated claims in request.state, then check in tool:
@mcp.tool()
def delete_user(user_id: str) -> str:
"""Delete user - requires admin:write scope."""
# Scope enforcement happens at HTTP layer via middleware
# Tool assumes request already passed auth checks
return f"Deleted user {user_id}"
> **Note:** For advanced middleware with per-tool auth context, use `fastmcp` package (`pip install fastmcp`) which provides `Middleware` class and `Context.get_state()`. The official `mcp` package provides FastMCP but with simpler middleware options.@mcp.tool()
def delete_user(user_id: str) -> str:
"""Delete user - requires admin:write scope."""
# Scope enforcement happens at HTTP layer via middleware
# Tool assumes request already passed auth checks
return f"Deleted user {user_id}"
> **注意:** 如需支持带每工具认证上下文的高级中间件,请使用`fastmcp`包(`pip install fastmcp`),它提供了`Middleware`类和`Context.get_state()`方法。官方`mcp`包提供FastMCP,但中间件选项较为简单。Passing Auth Context to Tool Handlers
将认证上下文传递给工具处理器
TypeScript: Store auth on transport or use a request-scoped context:
typescript
// In middleware: attach to request
req.auth = { sub: decoded.sub, email: decoded.email };
// Pass to tool via server context or closure
const sessions = new Map();
app.all("/mcp", async (req, res) => {
const transport = new StreamableHTTPServerTransport({ /* ... */ });
sessions.set(transport.sessionId, { auth: req.auth });
// Tools access via sessions.get(sessionId)
});Python (with package): Use middleware and context state:
fastmcppython
undefinedTypeScript: 将认证信息存储在传输层或使用请求作用域上下文:
typescript
// In middleware: attach to request
req.auth = { sub: decoded.sub, email: decoded.email };
// Pass to tool via server context or closure
const sessions = new Map();
app.all("/mcp", async (req, res) => {
const transport = new StreamableHTTPServerTransport({ /* ... */ });
sessions.set(transport.sessionId, { auth: req.auth });
// Tools access via sessions.get(sessionId)
});Python(使用包): 使用中间件和上下文状态:
fastmcppython
undefinedpip install fastmcp
pip install fastmcp
from fastmcp import FastMCP, Context
from fastmcp.server.middleware import Middleware, MiddlewareContext
mcp = FastMCP("my-server")
class AuthMiddleware(Middleware):
async def on_call_tool(self, context: MiddlewareContext, call_next):
# Extract and validate token, then store in context
context.fastmcp_context.set_state("user_id", "user_123")
context.fastmcp_context.set_state("scopes", ["read", "write"])
return await call_next()
mcp.add_middleware(AuthMiddleware())
@mcp.tool
async def get_my_profile(ctx: Context) -> dict:
user_id = ctx.get_state("user_id") # Set by middleware
return {"user_id": user_id, "profile": "..."}
undefinedfrom fastmcp import FastMCP, Context
from fastmcp.server.middleware import Middleware, MiddlewareContext
mcp = FastMCP("my-server")
class AuthMiddleware(Middleware):
async def on_call_tool(self, context: MiddlewareContext, call_next):
# Extract and validate token, then store in context
context.fastmcp_context.set_state("user_id", "user_123")
context.fastmcp_context.set_state("scopes", ["read", "write"])
return await call_next()
mcp.add_middleware(AuthMiddleware())
@mcp.tool
async def get_my_profile(ctx: Context) -> dict:
user_id = ctx.get_state("user_id") # Set by middleware
return {"user_id": user_id, "profile": "..."}
undefinedToken Refresh Handling
令牌刷新处理
Access tokens are short-lived (typically 1 hour). Strategies:
- Client-side refresh: Clients refresh tokens before expiration and reconnect
- Proactive server refresh: Background task refreshes tokens expiring soon
- On-demand refresh: Return 401, client refreshes and retries
Recommended pattern: Use short-lived access tokens (5-60 min) with refresh tokens. On 401:
typescript
// Client retry logic
async function callWithRefresh(tool: string, args: object) {
try {
return await client.callTool(tool, args);
} catch (err) {
if (err.status === 401) {
accessToken = await refreshAccessToken(refreshToken);
transport.updateHeaders({ Authorization: `Bearer ${accessToken}` });
return await client.callTool(tool, args);
}
throw err;
}
}访问令牌的有效期较短(通常为1小时)。处理策略:
- 客户端侧刷新: 客户端在令牌过期前刷新令牌并重新连接
- 服务器主动刷新: 后台任务刷新即将过期的令牌
- 按需刷新: 返回401错误,客户端刷新令牌后重试
推荐模式: 使用短有效期的访问令牌(5-60分钟)搭配刷新令牌。当收到401错误时:
typescript
// Client retry logic
async function callWithRefresh(tool: string, args: object) {
try {
return await client.callTool(tool, args);
} catch (err) {
if (err.status === 401) {
accessToken = await refreshAccessToken(refreshToken);
transport.updateHeaders({ Authorization: `Bearer ${accessToken}` });
return await client.callTool(tool, args);
}
throw err;
}
}Migration: SSE to Streamable HTTP
迁移:从SSE到Streamable HTTP
Server Changes
服务器端变更
typescript
// OLD (SSE) - Two endpoints
app.get("/sse", async (req, res) => {
const transport = new SSEServerTransport("/sse/messages", res);
await server.connect(transport);
});
app.post("/sse/messages", async (req, res) => { /* ... */ });
// NEW (Streamable HTTP) - Single endpoint
app.all("/mcp", async (req, res) => {
const transport = new StreamableHTTPServerTransport();
await server.connect(transport);
await transport.handleRequest(req, res);
});typescript
// OLD (SSE) - Two endpoints
app.get("/sse", async (req, res) => {
const transport = new SSEServerTransport("/sse/messages", res);
await server.connect(transport);
});
app.post("/sse/messages", async (req, res) => { /* ... */ });
// NEW (Streamable HTTP) - Single endpoint
app.all("/mcp", async (req, res) => {
const transport = new StreamableHTTPServerTransport();
await server.connect(transport);
await transport.handleRequest(req, res);
});Client Changes
客户端变更
typescript
// OLD
import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js";
const transport = new SSEClientTransport(new URL("http://localhost:3000/sse"));
// NEW
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
const transport = new StreamableHTTPClientTransport(new URL("http://localhost:3000/mcp"));typescript
// OLD
import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js";
const transport = new SSEClientTransport(new URL("http://localhost:3000/sse"));
// NEW
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
const transport = new StreamableHTTPClientTransport(new URL("http://localhost:3000/mcp"));Backward-Compatible Server
向后兼容的服务器
typescript
// Support both transports during migration
app.all("/mcp", /* new Streamable HTTP handler */);
app.get("/sse", /* legacy SSE handler */);
app.post("/sse/messages", /* legacy message handler */);typescript
// Support both transports during migration
app.all("/mcp", /* new Streamable HTTP handler */);
app.get("/sse", /* legacy SSE handler */);
app.post("/sse/messages", /* legacy message handler */);Client Configuration
客户端配置
Claude Desktop / Claude Code (stdio)
Claude Desktop / Claude Code(stdio)
json
{
"mcpServers": {
"my-server": {
"command": "node",
"args": ["/path/to/server.js"],
"env": { "API_KEY": "secret" }
}
}
}json
{
"mcpServers": {
"my-server": {
"command": "node",
"args": ["/path/to/server.js"],
"env": { "API_KEY": "secret" }
}
}
}Remote Server (Streamable HTTP)
远程服务器(Streamable HTTP)
json
{
"mcpServers": {
"remote": {
"type": "streamable-http",
"url": "https://mcp.example.com/mcp"
}
}
}json
{
"mcpServers": {
"remote": {
"type": "streamable-http",
"url": "https://mcp.example.com/mcp"
}
}
}Error Handling
错误处理
Return tool errors (not protocol errors) for model self-correction:
TypeScript:
typescript
server.registerTool("query", { /* ... */ }, async ({ sql }) => {
if (sql.includes("DROP")) {
return {
isError: true,
content: [{ type: "text", text: "Destructive queries not allowed" }]
};
}
// ...
});Python: Raise exceptions in tools - they're caught and returned as errors:
python
@mcp.tool()
def query(sql: str) -> dict:
if "DROP" in sql.upper():
raise ValueError("Destructive queries not allowed")
return {"result": "data"}返回工具错误(而非协议错误)以便模型进行自我修正:
TypeScript:
typescript
server.registerTool("query", { /* ... */ }, async ({ sql }) => {
if (sql.includes("DROP")) {
return {
isError: true,
content: [{ type: "text", text: "Destructive queries not allowed" }]
};
}
// ...
});Python: 在工具中抛出异常 - 这些异常会被捕获并作为错误返回:
python
@mcp.tool()
def query(sql: str) -> dict:
if "DROP" in sql.upper():
raise ValueError("Destructive queries not allowed")
return {"result": "data"}Common Mistakes
常见错误
| Mistake | Fix |
|---|---|
| Using SSE for new servers | Use Streamable HTTP |
| Writing to stdout in stdio servers | Use stderr for logs |
| Missing session ID after init | Always include |
| Not validating OAuth audience | Validate token |
| Forwarding client tokens upstream | Use separate credentials for upstream APIs |
| 错误 | 修复方案 |
|---|---|
| 为新服务器使用SSE | 使用Streamable HTTP |
| 在stdio服务器中写入stdout | 使用stderr输出日志 |
| 初始化后缺少会话ID | 始终包含 |
| 未验证OAuth受众 | 验证令牌的 |
| 将客户端令牌转发给上游 | 为上游API使用独立的凭据 |
Official Resources
官方资源
SDKs
SDK
Documentation
文档
Testing
测试
bash
npx @modelcontextprotocol/inspector node path/to/server.jsbash
npx @modelcontextprotocol/inspector node path/to/server.js