Loading...
Loading...
Expert API architect specializing in RESTful API design, GraphQL, gRPC, and API security. Deep expertise in OpenAPI 3.1, authentication patterns (OAuth2, JWT), rate limiting, pagination, and OWASP API Security Top 10. Use when designing scalable APIs, implementing API gateways, or securing API endpoints.
npx skill4agent add martinholovsky/claude-skills-generator api-expert# tests/test_users_api.py
import pytest
from httpx import AsyncClient, ASGITransport
from app.main import app
@pytest.fixture
async def client():
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
yield ac
@pytest.mark.asyncio
async def test_create_user_returns_201(client):
response = await client.post("/v1/users", json={"email": "test@example.com", "name": "Test"}, headers={"Authorization": "Bearer token"})
assert response.status_code == 201
assert "location" in response.headers
assert "password" not in response.json() # Never expose sensitive fields
@pytest.mark.asyncio
async def test_create_user_validates_email(client):
response = await client.post("/v1/users", json={"email": "invalid", "name": "Test"}, headers={"Authorization": "Bearer token"})
assert response.status_code == 422
assert "errors" in response.json() # RFC 7807 format
@pytest.mark.asyncio
async def test_get_other_user_returns_403(client):
"""BOLA protection - users can't access other users' data."""
response = await client.get("/v1/users/other-id", headers={"Authorization": "Bearer user-token"})
assert response.status_code == 403# app/routers/users.py
from fastapi import APIRouter, Depends, HTTPException, Response
router = APIRouter(prefix="/v1/users", tags=["users"])
@router.post("", status_code=201, response_model=UserResponse)
async def create_user(user_data: UserCreate, response: Response, current_user = Depends(get_current_user)):
user = await user_service.create(user_data)
response.headers["Location"] = f"/v1/users/{user.id}"
return user
@router.get("/{user_id}", response_model=UserResponse)
async def get_user(user_id: str, current_user = Depends(get_current_user)):
if current_user.id != user_id and not current_user.is_admin:
raise HTTPException(status_code=403, detail="Forbidden") # BOLA protection
return await user_service.get(user_id)pytest tests/ -v --cov=app --cov-report=term-missing # Run all API tests
openapi-spec-validator openapi.yaml # Validate OpenAPI spec
bandit -r app/ # Security scan/users/orders/users/user/users/{id}/orders/v1/users/v2/usersAccept: application/vnd.api.v1+json/users?version=1429 Too Many RequestsRetry-AfterX-RateLimit-*?offset=20&limit=10?cursor=abc123?after_id=100totalpageper_pagenextprevfirstlast# ✅ GOOD: Proper REST resource hierarchy
GET /v1/users # List users
POST /v1/users # Create user
GET /v1/users/{id} # Get user
PUT /v1/users/{id} # Replace user (full update)
PATCH /v1/users/{id} # Update user (partial)
DELETE /v1/users/{id} # Delete user
GET /v1/users/{id}/orders # Get user's orders
POST /v1/users/{id}/orders # Create order for user
# Query parameters for filtering/sorting/pagination
GET /v1/users?role=admin&sort=-created_at&limit=20&offset=0
# ❌ BAD: Verbs in URLs
GET /v1/getUsers
POST /v1/createUser
GET /v1/users/{id}/getOrders// ✅ CORRECT: Use appropriate status codes
// 2xx Success
200 OK // GET, PUT, PATCH (with body)
201 Created // POST (new resource)
204 No Content // DELETE, PUT, PATCH (no body)
// 4xx Client Errors
400 Bad Request // Invalid input
401 Unauthorized // Missing/invalid authentication
403 Forbidden // Authenticated but not authorized
404 Not Found // Resource doesn't exist
409 Conflict // Duplicate resource, version conflict
422 Unprocessable Entity // Validation failed
429 Too Many Requests // Rate limit exceeded
// 5xx Server Errors
500 Internal Server Error // Unexpected server error
503 Service Unavailable // Temporary downtime
// ❌ WRONG: Always returning 200
res.status(200).json({ error: "User not found" }); // DON'T DO THIS!
// ✅ RIGHT
res.status(404).json({
type: "https://api.example.com/errors/not-found",
title: "Resource Not Found",
status: 404,
detail: "User with ID 12345 does not exist"
});// ✅ STANDARDIZED ERROR FORMAT (RFC 7807)
{
"type": "https://api.example.com/errors/validation-failed",
"title": "Validation Failed",
"status": 422,
"detail": "The request body contains invalid fields",
"instance": "/v1/users",
"correlation_id": "f47ac10b-58cc-4372-a567-0e02b2c3d479",
"errors": [{ "field": "email", "code": "invalid_format", "message": "Email must be valid" }]
}
// Error handler middleware - never expose stack traces
app.use((err, req, res, next) => {
if (err instanceof ApiError) {
return res.status(err.status).json({ ...err, instance: req.originalUrl });
}
res.status(500).json({ type: "internal-error", title: "Internal Server Error", status: 500, correlation_id: generateCorrelationId() });
});// ✅ SECURE JWT - Use RS256, short expiration, validate all claims
const validateJWT = async (req, res, next) => {
const token = req.headers.authorization?.substring(7);
if (!token) return res.status(401).json({ type: "unauthorized", status: 401, detail: "Bearer token required" });
try {
const decoded = jwt.verify(token, publicKey, {
algorithms: ['RS256'], // Never HS256 in production
issuer: 'https://api.example.com',
audience: 'https://api.example.com'
});
const isRevoked = await tokenCache.exists(decoded.jti); // Check revocation
if (isRevoked) throw new Error('Token revoked');
req.user = decoded;
next();
} catch (error) {
return res.status(401).json({ type: "invalid-token", status: 401, detail: "Invalid or expired token" });
}
};
// Scope-based authorization
const requireScope = (...scopes) => (req, res, next) => {
const hasScope = scopes.some(s => req.user.scope.includes(s));
if (!hasScope) return res.status(403).json({ type: "forbidden", status: 403, detail: `Required: ${scopes.join(', ')}` });
next();
};
app.get('/v1/users', validateJWT, requireScope('read:users'), getUsers);# Bad: No caching
@router.get("/v1/products/{id}")
async def get_product(id: str):
return await db.products.find_one({"_id": id})
# Good: Redis cache with headers
@router.get("/v1/products/{id}")
async def get_product(id: str, response: Response):
cached = await redis_cache.get(f"product:{id}")
if cached:
response.headers["X-Cache"] = "HIT"
return cached
product = await db.products.find_one({"_id": id})
await redis_cache.setex(f"product:{id}", 300, product)
response.headers["Cache-Control"] = "public, max-age=300"
return product# Bad: Offset pagination - O(n) skip
@router.get("/v1/users")
async def list_users(offset: int = 0, limit: int = 100):
return await db.users.find().skip(offset).limit(limit)
# Good: Cursor-based - O(1) performance
@router.get("/v1/users")
async def list_users(cursor: str = None, limit: int = Query(default=20, le=100)):
query = {"_id": {"$gt": ObjectId(cursor)}} if cursor else {}
users = await db.users.find(query).sort("_id", 1).limit(limit + 1).to_list()
has_next = len(users) > limit
return {"data": users[:limit], "pagination": {"next_cursor": str(users[-1]["_id"]) if has_next else None}}# Bad: No compression
app = FastAPI()
# Good: GZip middleware for responses > 500 bytes
from fastapi.middleware.gzip import GZipMiddleware
app = FastAPI()
app.add_middleware(GZipMiddleware, minimum_size=500)# Bad: New connection per request
@router.get("/v1/data")
async def get_data():
client = AsyncIOMotorClient("mongodb://localhost") # Expensive!
return await client.db.collection.find_one()
# Good: Shared pool via lifespan
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.db = AsyncIOMotorClient("mongodb://localhost", maxPoolSize=50, minPoolSize=10)
yield
app.state.db.close()
app = FastAPI(lifespan=lifespan)
@router.get("/v1/data")
async def get_data(request: Request):
return await request.app.state.db.mydb.collection.find_one()# Bad: No rate limiting
@router.post("/v1/auth/login")
async def login(credentials: LoginRequest):
return await auth_service.login(credentials)
# Good: Tiered limits with Redis
from fastapi_limiter.depends import RateLimiter
@router.post("/v1/auth/login", dependencies=[Depends(RateLimiter(times=5, minutes=15))])
async def login(credentials: LoginRequest):
return await auth_service.login(credentials)
@router.get("/v1/users", dependencies=[Depends(RateLimiter(times=100, minutes=1))])
async def list_users():
return await user_service.list()| Threat | Description | Key Mitigation |
|---|---|---|
| API1: Broken Object Level Authorization (BOLA) | Users can access objects belonging to others | Always verify user owns resource before returning data |
| API2: Broken Authentication | Weak auth allows token/credential compromise | Use RS256 JWT, short expiration, token revocation, rate limiting |
| API3: Broken Object Property Level Authorization | Exposing sensitive fields or mass assignment | Whitelist output/input fields, use DTOs, never expose passwords/keys |
| API4: Unrestricted Resource Consumption | No limits leads to DoS | Implement rate limiting, pagination limits, request timeouts |
| API5: Broken Function Level Authorization | Admin functions lack role checks | Verify roles/scopes for every privileged operation |
| API6: Unrestricted Access to Sensitive Business Flows | Business flows can be abused | Add CAPTCHA, transaction limits, step-up auth, anomaly detection |
| API7: Server Side Request Forgery (SSRF) | APIs make requests to attacker-controlled URLs | Whitelist allowed hosts, block private IPs, validate URLs |
| API8: Security Misconfiguration | Improper security settings | Set security headers, use HTTPS, configure CORS, disable debug |
| API9: Improper Inventory Management | Unknown/forgotten APIs | Use API gateway, maintain inventory, retire old versions |
| API10: Unsafe Consumption of APIs | Trust third-party APIs without validation | Validate external responses, implement timeouts, use circuit breakers |
// ✅ ALWAYS verify authorization
app.get('/users/:id/data', validateJWT, async (req, res) => {
if (req.user.sub !== req.params.id && !req.user.isAdmin) {
return res.status(403).json({ error: 'Forbidden' });
}
// Return data...
});
// ✅ ALWAYS filter sensitive fields
const sanitizeUser = (user) => ({
id: user.id,
name: user.name,
email: user.email
// NEVER: password_hash, ssn, api_key, internal_notes
});
// ✅ ALWAYS validate input
body('email').isEmail().normalizeEmail(),
body('age').optional().isInt({ min: 0, max: 150 })
// ✅ ALWAYS implement rate limiting
const apiLimiter = rateLimit({ windowMs: 15 * 60 * 1000, max: 100 });
app.use('/api/', apiLimiter);| Anti-Pattern | Wrong | Right |
|---|---|---|
| Verbs in URLs | | |
| Always 200 | | |
| No rate limiting | | Add |
| Exposing secrets | | |
| No validation | | Use |
pytest tests/ -vopenapi-spec-validator openapi.yamlbandit -r app/