multi-tenant-safety-checker
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseMulti-tenant Safety Checker
多租户安全检查工具
Ensure complete tenant isolation and prevent data leakage.
确保完整的租户隔离,防止数据泄露。
Row Level Security (RLS)
行级安全性(RLS)
PostgreSQL RLS Setup
PostgreSQL RLS配置
sql
-- Enable RLS on tables
ALTER TABLE users ENABLE ROW LEVEL SECURITY;
ALTER TABLE orders ENABLE ROW LEVEL SECURITY;
ALTER TABLE products ENABLE ROW LEVEL SECURITY;
-- Create policy for users table
CREATE POLICY tenant_isolation_policy ON users
USING (tenant_id = current_setting('app.tenant_id')::INTEGER);
-- Create policy for orders table
CREATE POLICY tenant_isolation_policy ON orders
USING (tenant_id = current_setting('app.tenant_id')::INTEGER);
-- Create policy for products table
CREATE POLICY tenant_isolation_policy ON products
USING (tenant_id = current_setting('app.tenant_id')::INTEGER);
-- Force RLS even for table owners
ALTER TABLE users FORCE ROW LEVEL SECURITY;
ALTER TABLE orders FORCE ROW LEVEL SECURITY;
ALTER TABLE products FORCE ROW LEVEL SECURITY;sql
-- Enable RLS on tables
ALTER TABLE users ENABLE ROW LEVEL SECURITY;
ALTER TABLE orders ENABLE ROW LEVEL SECURITY;
ALTER TABLE products ENABLE ROW LEVEL SECURITY;
-- Create policy for users table
CREATE POLICY tenant_isolation_policy ON users
USING (tenant_id = current_setting('app.tenant_id')::INTEGER);
-- Create policy for orders table
CREATE POLICY tenant_isolation_policy ON orders
USING (tenant_id = current_setting('app.tenant_id')::INTEGER);
-- Create policy for products table
CREATE POLICY tenant_isolation_policy ON products
USING (tenant_id = current_setting('app.tenant_id')::INTEGER);
-- Force RLS even for table owners
ALTER TABLE users FORCE ROW LEVEL SECURITY;
ALTER TABLE orders FORCE ROW LEVEL SECURITY;
ALTER TABLE products FORCE ROW LEVEL SECURITY;Application-Level Tenant Context
应用层租户上下文
typescript
// middleware/tenant-context.ts
import { PrismaClient } from "@prisma/client";
export class TenantContext {
constructor(private prisma: PrismaClient) {}
async setTenant(tenantId: number): Promise<void> {
await this.prisma.$executeRaw`
SET LOCAL app.tenant_id = ${tenantId}
`;
}
async withTenant<T>(
tenantId: number,
callback: () => Promise<T>
): Promise<T> {
return this.prisma.$transaction(async (tx) => {
// Set tenant context for this transaction
await tx.$executeRaw`SET LOCAL app.tenant_id = ${tenantId}`;
// Execute queries within tenant context
return callback();
});
}
}
// Usage in API route
app.get("/api/orders", async (req, res) => {
const tenantId = req.user.tenantId;
const orders = await tenantContext.withTenant(tenantId, async () => {
return prisma.order.findMany(); // Automatically filtered by RLS
});
res.json(orders);
});typescript
// middleware/tenant-context.ts
import { PrismaClient } from "@prisma/client";
export class TenantContext {
constructor(private prisma: PrismaClient) {}
async setTenant(tenantId: number): Promise<void> {
await this.prisma.$executeRaw`
SET LOCAL app.tenant_id = ${tenantId}
`;
}
async withTenant<T>(
tenantId: number,
callback: () => Promise<T>
): Promise<T> {
return this.prisma.$transaction(async (tx) => {
// Set tenant context for this transaction
await tx.$executeRaw`SET LOCAL app.tenant_id = ${tenantId}`;
// Execute queries within tenant context
return callback();
});
}
}
// Usage in API route
app.get("/api/orders", async (req, res) => {
const tenantId = req.user.tenantId;
const orders = await tenantContext.withTenant(tenantId, async () => {
return prisma.order.findMany(); // Automatically filtered by RLS
});
res.json(orders);
});Tenant Isolation Checklist
租户隔离检查清单
markdown
undefinedmarkdown
undefinedMulti-tenant Security Checklist
多租户安全检查清单
Database Level
数据库层面
- All tables have column
tenant_id - is NOT NULL on all tables
tenant_id - Foreign keys include tenant_id checks
- Row Level Security enabled on all tables
- RLS policies created for all tables
- RLS enforced even for table owners
- Composite indexes include tenant_id
- 所有表都包含字段
tenant_id - 所有表的字段均为非空
tenant_id - 外键包含tenant_id校验
- 所有表已启用行级安全性
- 所有表已创建RLS策略
- 即使是表所有者也会强制执行RLS
- 复合索引包含tenant_id
Application Level
应用层面
- Tenant context set on every request
- Tenant ID validated from JWT/session
- No raw SQL without tenant filter
- All queries include tenant_id (if no RLS)
- API endpoints validate tenant access
- File uploads scoped to tenant
- Background jobs include tenant context
- 每个请求都设置了租户上下文
- 从JWT/会话中验证租户ID
- 不使用未包含租户过滤条件的原生SQL
- 所有查询均包含tenant_id(未使用RLS时)
- API端点验证租户访问权限
- 文件上传范围限定为当前租户
- 后台任务包含租户上下文
Testing
测试层面
- Cross-tenant query tests
- RLS bypass attempt tests
- SQL injection with tenant bypass tests
- Automated regression tests
- Regular security audits
undefined- 跨租户查询测试
- RLS绕过尝试测试
- 租户绕过的SQL注入测试
- 自动化回归测试
- 定期安全审计
undefinedAutomated Security Tests
自动化安全测试
typescript
// tests/tenant-isolation.test.ts
import { PrismaClient } from "@prisma/client";
describe("Tenant Isolation", () => {
let prisma: PrismaClient;
let tenant1Id: number;
let tenant2Id: number;
beforeAll(async () => {
prisma = new PrismaClient();
// Create test tenants
const tenant1 = await prisma.tenant.create({
data: { name: "Tenant 1" },
});
const tenant2 = await prisma.tenant.create({
data: { name: "Tenant 2" },
});
tenant1Id = tenant1.id;
tenant2Id = tenant2.id;
// Create test data
await prisma.user.create({
data: {
email: "user1@tenant1.com",
tenantId: tenant1Id,
},
});
await prisma.user.create({
data: {
email: "user2@tenant2.com",
tenantId: tenant2Id,
},
});
});
it("should not access data from other tenants", async () => {
// Set tenant context to Tenant 1
await prisma.$executeRaw`SET app.tenant_id = ${tenant1Id}`;
// Query users
const users = await prisma.user.findMany();
// Should only see Tenant 1 users
expect(users.length).toBe(1);
expect(users[0].email).toBe("user1@tenant1.com");
// Should NOT see Tenant 2 users
expect(users.find((u) => u.email === "user2@tenant2.com")).toBeUndefined();
});
it("should prevent cross-tenant updates", async () => {
await prisma.$executeRaw`SET app.tenant_id = ${tenant1Id}`;
// Try to update Tenant 2 user (should fail silently with RLS)
const tenant2User = await prisma.user.findFirst({
where: { email: "user2@tenant2.com" },
});
// Should not find user from other tenant
expect(tenant2User).toBeNull();
});
it("should prevent cross-tenant deletes", async () => {
await prisma.$executeRaw`SET app.tenant_id = ${tenant1Id}`;
// Try to delete Tenant 2 user
const result = await prisma.user.deleteMany({
where: { tenantId: tenant2Id },
});
// Should delete 0 rows (RLS prevents access)
expect(result.count).toBe(0);
// Verify user still exists
await prisma.$executeRaw`SET app.tenant_id = ${tenant2Id}`;
const user = await prisma.user.findFirst({
where: { email: "user2@tenant2.com" },
});
expect(user).not.toBeNull();
});
it("should handle transaction rollback correctly", async () => {
try {
await prisma.$transaction(async (tx) => {
await tx.$executeRaw`SET LOCAL app.tenant_id = ${tenant1Id}`;
// Create user
await tx.user.create({
data: {
email: "test@tenant1.com",
tenantId: tenant1Id,
},
});
// Force error
throw new Error("Rollback test");
});
} catch (error) {
// Transaction rolled back
}
// User should not exist
await prisma.$executeRaw`SET app.tenant_id = ${tenant1Id}`;
const user = await prisma.user.findFirst({
where: { email: "test@tenant1.com" },
});
expect(user).toBeNull();
});
});typescript
// tests/tenant-isolation.test.ts
import { PrismaClient } from "@prisma/client";
describe("Tenant Isolation", () => {
let prisma: PrismaClient;
let tenant1Id: number;
let tenant2Id: number;
beforeAll(async () => {
prisma = new PrismaClient();
// Create test tenants
const tenant1 = await prisma.tenant.create({
data: { name: "Tenant 1" },
});
const tenant2 = await prisma.tenant.create({
data: { name: "Tenant 2" },
});
tenant1Id = tenant1.id;
tenant2Id = tenant2.id;
// Create test data
await prisma.user.create({
data: {
email: "user1@tenant1.com",
tenantId: tenant1Id,
},
});
await prisma.user.create({
data: {
email: "user2@tenant2.com",
tenantId: tenant2Id,
},
});
});
it("should not access data from other tenants", async () => {
// Set tenant context to Tenant 1
await prisma.$executeRaw`SET app.tenant_id = ${tenant1Id}`;
// Query users
const users = await prisma.user.findMany();
// Should only see Tenant 1 users
expect(users.length).toBe(1);
expect(users[0].email).toBe("user1@tenant1.com");
// Should NOT see Tenant 2 users
expect(users.find((u) => u.email === "user2@tenant2.com")).toBeUndefined();
});
it("should prevent cross-tenant updates", async () => {
await prisma.$executeRaw`SET app.tenant_id = ${tenant1Id}`;
// Try to update Tenant 2 user (should fail silently with RLS)
const tenant2User = await prisma.user.findFirst({
where: { email: "user2@tenant2.com" },
});
// Should not find user from other tenant
expect(tenant2User).toBeNull();
});
it("should prevent cross-tenant deletes", async () => {
await prisma.$executeRaw`SET app.tenant_id = ${tenant1Id}`;
// Try to delete Tenant 2 user
const result = await prisma.user.deleteMany({
where: { tenantId: tenant2Id },
});
// Should delete 0 rows (RLS prevents access)
expect(result.count).toBe(0);
// Verify user still exists
await prisma.$executeRaw`SET app.tenant_id = ${tenant2Id}`;
const user = await prisma.user.findFirst({
where: { email: "user2@tenant2.com" },
});
expect(user).not.toBeNull();
});
it("should handle transaction rollback correctly", async () => {
try {
await prisma.$transaction(async (tx) => {
await tx.$executeRaw`SET LOCAL app.tenant_id = ${tenant1Id}`;
// Create user
await tx.user.create({
data: {
email: "test@tenant1.com",
tenantId: tenant1Id,
},
});
// Force error
throw new Error("Rollback test");
});
} catch (error) {
// Transaction rolled back
}
// User should not exist
await prisma.$executeRaw`SET app.tenant_id = ${tenant1Id}`;
const user = await prisma.user.findFirst({
where: { email: "test@tenant1.com" },
});
expect(user).toBeNull();
});
});RLS Audit Script
RLS审计脚本
typescript
// scripts/audit-rls.ts
async function auditRLS() {
const tables = await prisma.$queryRaw<any[]>`
SELECT tablename
FROM pg_tables
WHERE schemaname = 'public'
AND tablename != '_prisma_migrations'
`;
console.log("🔍 Auditing Row Level Security...\n");
for (const { tablename } of tables) {
// Check if table has tenant_id
const columns = await prisma.$queryRaw<any[]>`
SELECT column_name
FROM information_schema.columns
WHERE table_name = ${tablename}
AND column_name = 'tenant_id'
`;
if (columns.length === 0) {
console.log(`❌ ${tablename}: Missing tenant_id column`);
continue;
}
// Check if RLS is enabled
const rlsStatus = await prisma.$queryRaw<any[]>`
SELECT relname, relrowsecurity, relforcerowsecurity
FROM pg_class
WHERE relname = ${tablename}
`;
if (!rlsStatus[0]?.relrowsecurity) {
console.log(`❌ ${tablename}: RLS not enabled`);
continue;
}
if (!rlsStatus[0]?.relforcerowsecurity) {
console.log(`⚠️ ${tablename}: RLS not forced (owners can bypass)`);
}
// Check if policy exists
const policies = await prisma.$queryRaw<any[]>`
SELECT policyname, qual
FROM pg_policies
WHERE tablename = ${tablename}
`;
if (policies.length === 0) {
console.log(`❌ ${tablename}: No RLS policies defined`);
} else {
console.log(
`✅ ${tablename}: RLS configured (${policies.length} policies)`
);
}
}
}typescript
// scripts/audit-rls.ts
async function auditRLS() {
const tables = await prisma.$queryRaw<any[]>`
SELECT tablename
FROM pg_tables
WHERE schemaname = 'public'
AND tablename != '_prisma_migrations'
`;
console.log("🔍 Auditing Row Level Security...\n");
for (const { tablename } of tables) {
// Check if table has tenant_id
const columns = await prisma.$queryRaw<any[]>`
SELECT column_name
FROM information_schema.columns
WHERE table_name = ${tablename}
AND column_name = 'tenant_id'
`;
if (columns.length === 0) {
console.log(`❌ ${tablename}: Missing tenant_id column`);
continue;
}
// Check if RLS is enabled
const rlsStatus = await prisma.$queryRaw<any[]>`
SELECT relname, relrowsecurity, relforcerowsecurity
FROM pg_class
WHERE relname = ${tablename}
`;
if (!rlsStatus[0]?.relrowsecurity) {
console.log(`❌ ${tablename}: RLS not enabled`);
continue;
}
if (!rlsStatus[0]?.relforcerowsecurity) {
console.log(`⚠️ ${tablename}: RLS not forced (owners can bypass)`);
}
// Check if policy exists
const policies = await prisma.$queryRaw<any[]>`
SELECT policyname, qual
FROM pg_policies
WHERE tablename = ${tablename}
`;
if (policies.length === 0) {
console.log(`❌ ${tablename}: No RLS policies defined`);
} else {
console.log(
`✅ ${tablename}: RLS configured (${policies.length} policies)`
);
}
}
}Composite Indexes for Performance
性能优化的复合索引
sql
-- Composite indexes with tenant_id first
CREATE INDEX idx_orders_tenant_user ON orders(tenant_id, user_id);
CREATE INDEX idx_orders_tenant_created ON orders(tenant_id, created_at DESC);
CREATE INDEX idx_products_tenant_category ON products(tenant_id, category);
-- This ensures queries filtered by tenant_id are fast
-- SELECT * FROM orders WHERE tenant_id = 1 AND user_id = 123; -- Uses indexsql
-- Composite indexes with tenant_id first
CREATE INDEX idx_orders_tenant_user ON orders(tenant_id, user_id);
CREATE INDEX idx_orders_tenant_created ON orders(tenant_id, created_at DESC);
CREATE INDEX idx_products_tenant_category ON products(tenant_id, category);
-- This ensures queries filtered by tenant_id are fast
-- SELECT * FROM orders WHERE tenant_id = 1 AND user_id = 123; -- Uses indexMiddleware for Automatic Tenant Injection
自动注入租户信息的中间件
typescript
// prisma/middleware.ts
import { Prisma } from "@prisma/client";
export function tenantMiddleware(tenantId: number) {
return async (
params: Prisma.MiddlewareParams,
next: (params: Prisma.MiddlewareParams) => Promise<any>
) => {
// Inject tenant_id into all queries
if (params.action === "findMany" || params.action === "findFirst") {
params.args.where = {
...params.args.where,
tenantId,
};
}
if (params.action === "create") {
params.args.data = {
...params.args.data,
tenantId,
};
}
if (params.action === "createMany") {
if (Array.isArray(params.args.data)) {
params.args.data = params.args.data.map((item) => ({
...item,
tenantId,
}));
}
}
return next(params);
};
}
// Usage:
const prisma = new PrismaClient();
prisma.$use(tenantMiddleware(req.user.tenantId));typescript
// prisma/middleware.ts
import { Prisma } from "@prisma/client";
export function tenantMiddleware(tenantId: number) {
return async (
params: Prisma.MiddlewareParams,
next: (params: Prisma.MiddlewareParams) => Promise<any>
) => {
// Inject tenant_id into all queries
if (params.action === "findMany" || params.action === "findFirst") {
params.args.where = {
...params.args.where,
tenantId,
};
}
if (params.action === "create") {
params.args.data = {
...params.args.data,
tenantId,
};
}
if (params.action === "createMany") {
if (Array.isArray(params.args.data)) {
params.args.data = params.args.data.map((item) => ({
...item,
tenantId,
}));
}
}
return next(params);
};
}
// Usage:
const prisma = new PrismaClient();
prisma.$use(tenantMiddleware(req.user.tenantId));Security Regression Tests
安全回归测试
typescript
// tests/security-regression.test.ts
describe("Security Regression Tests", () => {
it("should not allow SQL injection to bypass tenant", async () => {
const maliciousInput = "1 OR 1=1 --";
// This should be safely parameterized
const users = await prisma.user.findMany({
where: {
tenantId: parseInt(maliciousInput), // Will be NaN, safe
},
});
expect(users).toEqual([]);
});
it("should not expose tenant data via API error messages", async () => {
try {
await prisma.user.findUniqueOrThrow({
where: { id: 9999 }, // Non-existent
});
} catch (error) {
// Error should not leak tenant information
expect(error.message).not.toContain("tenant_id");
expect(error.message).not.toContain("tenantId");
}
});
});typescript
// tests/security-regression.test.ts
describe("Security Regression Tests", () => {
it("should not allow SQL injection to bypass tenant", async () => {
const maliciousInput = "1 OR 1=1 --";
// This should be safely parameterized
const users = await prisma.user.findMany({
where: {
tenantId: parseInt(maliciousInput), // Will be NaN, safe
},
});
expect(users).toEqual([]);
});
it("should not expose tenant data via API error messages", async () => {
try {
await prisma.user.findUniqueOrThrow({
where: { id: 9999 }, // Non-existent
});
} catch (error) {
// Error should not leak tenant information
expect(error.message).not.toContain("tenant_id");
expect(error.message).not.toContain("tenantId");
}
});
});Best Practices
最佳实践
- Always use RLS: Don't rely on application logic alone
- Force RLS: Even for table owners (FORCE ROW LEVEL SECURITY)
- Test thoroughly: Automated tests for cross-tenant access
- Audit regularly: Monthly RLS configuration audits
- Composite indexes: tenant_id first in all indexes
- Tenant validation: Verify user belongs to tenant
- Monitor: Log cross-tenant access attempts
- 始终使用RLS:不要仅依赖应用层逻辑
- 强制执行RLS:即使是表所有者也需遵守(FORCE ROW LEVEL SECURITY)
- 全面测试:针对跨租户访问的自动化测试
- 定期审计:每月进行RLS配置审计
- 复合索引:所有索引均以tenant_id作为首字段
- 租户验证:验证用户所属租户
- 监控:记录跨租户访问尝试
Output Checklist
输出检查清单
- RLS enabled on all tables
- RLS policies created
- Tenant context middleware
- Automated security tests
- RLS audit script
- Composite indexes created
- Cross-tenant access prevention tested
- Security regression test suite
- 所有表已启用RLS
- 已创建RLS策略
- 已实现租户上下文中间件
- 已完成自动化安全测试
- 已编写RLS审计脚本
- 已创建复合索引
- 已测试跨租户访问防护
- 已搭建安全回归测试套件