agent-governance
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAgent Governance Patterns
Agent治理模式
Patterns for adding safety, trust, and policy enforcement to AI agent systems.
为AI Agent系统添加安全、信任与策略执行的模式。
Overview
概述
Governance patterns ensure AI agents operate within defined boundaries — controlling which tools they can call, what content they can process, how much they can do, and maintaining accountability through audit trails.
User Request → Intent Classification → Policy Check → Tool Execution → Audit Log
↓ ↓ ↓
Threat Detection Allow/Deny Trust Update治理模式确保AI Agent在既定边界内运行——控制它们可调用的工具、可处理的内容、可执行的操作量,并通过审计追踪保持可追责性。
User Request → Intent Classification → Policy Check → Tool Execution → Audit Log
↓ ↓ ↓
Threat Detection Allow/Deny Trust UpdateWhen to Use
适用场景
- Agents with tool access: Any agent that calls external tools (APIs, databases, shell commands)
- Multi-agent systems: Agents delegating to other agents need trust boundaries
- Production deployments: Compliance, audit, and safety requirements
- Sensitive operations: Financial transactions, data access, infrastructure management
- 具备工具访问权限的Agent:任何调用外部工具(APIs、数据库、Shell命令)的Agent
- 多Agent系统:委托其他Agent执行任务的Agent需要信任边界
- 生产环境部署:合规、审计与安全要求
- 敏感操作:金融交易、数据访问、基础设施管理
Pattern 1: Governance Policy
模式1:治理策略
Define what an agent is allowed to do as a composable, serializable policy object.
python
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
import re
class PolicyAction(Enum):
ALLOW = "allow"
DENY = "deny"
REVIEW = "review" # flag for human review
@dataclass
class GovernancePolicy:
"""Declarative policy controlling agent behavior."""
name: str
allowed_tools: list[str] = field(default_factory=list) # allowlist
blocked_tools: list[str] = field(default_factory=list) # blocklist
blocked_patterns: list[str] = field(default_factory=list) # content filters
max_calls_per_request: int = 100 # rate limit
require_human_approval: list[str] = field(default_factory=list) # tools needing approval
def check_tool(self, tool_name: str) -> PolicyAction:
"""Check if a tool is allowed by this policy."""
if tool_name in self.blocked_tools:
return PolicyAction.DENY
if tool_name in self.require_human_approval:
return PolicyAction.REVIEW
if self.allowed_tools and tool_name not in self.allowed_tools:
return PolicyAction.DENY
return PolicyAction.ALLOW
def check_content(self, content: str) -> Optional[str]:
"""Check content against blocked patterns. Returns matched pattern or None."""
for pattern in self.blocked_patterns:
if re.search(pattern, content, re.IGNORECASE):
return pattern
return None将Agent的允许操作定义为可组合、可序列化的策略对象。
python
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
import re
class PolicyAction(Enum):
ALLOW = "allow"
DENY = "deny"
REVIEW = "review" # flag for human review
@dataclass
class GovernancePolicy:
"""Declarative policy controlling agent behavior."""
name: str
allowed_tools: list[str] = field(default_factory=list) # allowlist
blocked_tools: list[str] = field(default_factory=list) # blocklist
blocked_patterns: list[str] = field(default_factory=list) # content filters
max_calls_per_request: int = 100 # rate limit
require_human_approval: list[str] = field(default_factory=list) # tools needing approval
def check_tool(self, tool_name: str) -> PolicyAction:
"""Check if a tool is allowed by this policy."""
if tool_name in self.blocked_tools:
return PolicyAction.DENY
if tool_name in self.require_human_approval:
return PolicyAction.REVIEW
if self.allowed_tools and tool_name not in self.allowed_tools:
return PolicyAction.DENY
return PolicyAction.ALLOW
def check_content(self, content: str) -> Optional[str]:
"""Check content against blocked patterns. Returns matched pattern or None."""
for pattern in self.blocked_patterns:
if re.search(pattern, content, re.IGNORECASE):
return pattern
return NonePolicy Composition
策略组合
Combine multiple policies (e.g., org-wide + team + agent-specific):
python
def compose_policies(*policies: GovernancePolicy) -> GovernancePolicy:
"""Merge policies with most-restrictive-wins semantics."""
combined = GovernancePolicy(name="composed")
for policy in policies:
combined.blocked_tools.extend(policy.blocked_tools)
combined.blocked_patterns.extend(policy.blocked_patterns)
combined.require_human_approval.extend(policy.require_human_approval)
combined.max_calls_per_request = min(
combined.max_calls_per_request,
policy.max_calls_per_request
)
if policy.allowed_tools:
if combined.allowed_tools:
combined.allowed_tools = [
t for t in combined.allowed_tools if t in policy.allowed_tools
]
else:
combined.allowed_tools = list(policy.allowed_tools)
return combined合并多个策略(如:组织级 + 团队级 + Agent专属):
python
def compose_policies(*policies: GovernancePolicy) -> GovernancePolicy:
"""Merge policies with most-restrictive-wins semantics."""
combined = GovernancePolicy(name="composed")
for policy in policies:
combined.blocked_tools.extend(policy.blocked_tools)
combined.blocked_patterns.extend(policy.blocked_patterns)
combined.require_human_approval.extend(policy.require_human_approval)
combined.max_calls_per_request = min(
combined.max_calls_per_request,
policy.max_calls_per_request
)
if policy.allowed_tools:
if combined.allowed_tools:
combined.allowed_tools = [
t for t in combined.allowed_tools if t in policy.allowed_tools
]
else:
combined.allowed_tools = list(policy.allowed_tools)
return combinedUsage: layer policies from broad to specific
Usage: layer policies from broad to specific
org_policy = GovernancePolicy(
name="org-wide",
blocked_tools=["shell_exec", "delete_database"],
blocked_patterns=[r"(?i)(api[_-]?key|secret|password)\s*[:=]"],
max_calls_per_request=50
)
team_policy = GovernancePolicy(
name="data-team",
allowed_tools=["query_db", "read_file", "write_report"],
require_human_approval=["write_report"]
)
agent_policy = compose_policies(org_policy, team_policy)
undefinedorg_policy = GovernancePolicy(
name="org-wide",
blocked_tools=["shell_exec", "delete_database"],
blocked_patterns=[r"(?i)(api[_-]?key|secret|password)\s*[:=]"],
max_calls_per_request=50
)
team_policy = GovernancePolicy(
name="data-team",
allowed_tools=["query_db", "read_file", "write_report"],
require_human_approval=["write_report"]
)
agent_policy = compose_policies(org_policy, team_policy)
undefinedPolicy as YAML
策略的YAML存储
Store policies as configuration, not code:
yaml
undefined将策略存储为配置而非代码:
yaml
undefinedgovernance-policy.yaml
governance-policy.yaml
name: production-agent
allowed_tools:
- search_documents
- query_database
- send_email blocked_tools:
- shell_exec
- delete_record blocked_patterns:
- "(?i)(api[_-]?key|secret|password)\s*[:=]"
- "(?i)(drop|truncate|delete from)\s+\w+" max_calls_per_request: 25 require_human_approval:
- send_email
```python
import yaml
def load_policy(path: str) -> GovernancePolicy:
with open(path) as f:
data = yaml.safe_load(f)
return GovernancePolicy(**data)name: production-agent
allowed_tools:
- search_documents
- query_database
- send_email blocked_tools:
- shell_exec
- delete_record blocked_patterns:
- "(?i)(api[_-]?key|secret|password)\s*[:=]"
- "(?i)(drop|truncate|delete from)\s+\w+" max_calls_per_request: 25 require_human_approval:
- send_email
```python
import yaml
def load_policy(path: str) -> GovernancePolicy:
with open(path) as f:
data = yaml.safe_load(f)
return GovernancePolicy(**data)Pattern 2: Semantic Intent Classification
模式2:语义意图分类
Detect dangerous intent in prompts before they reach the agent, using pattern-based signals.
python
from dataclasses import dataclass
@dataclass
class IntentSignal:
category: str # e.g., "data_exfiltration", "privilege_escalation"
confidence: float # 0.0 to 1.0
evidence: str # what triggered the detection在提示词到达Agent之前检测危险意图,使用基于模式的信号。
python
from dataclasses import dataclass
@dataclass
class IntentSignal:
category: str # e.g., "data_exfiltration", "privilege_escalation"
confidence: float # 0.0 to 1.0
evidence: str # what triggered the detectionWeighted signal patterns for threat detection
Weighted signal patterns for threat detection
THREAT_SIGNALS = [
# Data exfiltration
(r"(?i)send\s+(all|every|entire)\s+\w+\s+to\s+", "data_exfiltration", 0.8),
(r"(?i)export\s+.\s+to\s+(external|outside|third.?party)", "data_exfiltration", 0.9),
(r"(?i)curl\s+.\s+-d\s+", "data_exfiltration", 0.7),
# Privilege escalation
(r"(?i)(sudo|as\s+root|admin\s+access)", "privilege_escalation", 0.8),
(r"(?i)chmod\s+777", "privilege_escalation", 0.9),
# System modification
(r"(?i)(rm\s+-rf|del\s+/[sq]|format\s+c:)", "system_destruction", 0.95),
(r"(?i)(drop\s+database|truncate\s+table)", "system_destruction", 0.9),
# Prompt injection
(r"(?i)ignore\s+(previous|above|all)\s+(instructions?|rules?)", "prompt_injection", 0.9),
(r"(?i)you\s+are\s+now\s+(a|an)\s+", "prompt_injection", 0.7),]
def classify_intent(content: str) -> list[IntentSignal]:
"""Classify content for threat signals."""
signals = []
for pattern, category, weight in THREAT_SIGNALS:
match = re.search(pattern, content)
if match:
signals.append(IntentSignal(
category=category,
confidence=weight,
evidence=match.group()
))
return signals
def is_safe(content: str, threshold: float = 0.7) -> bool:
"""Quick check: is the content safe above the given threshold?"""
signals = classify_intent(content)
return not any(s.confidence >= threshold for s in signals)
**Key insight**: Intent classification happens *before* tool execution, acting as a pre-flight safety check. This is fundamentally different from output guardrails which only check *after* generation.
---THREAT_SIGNALS = [
# Data exfiltration
(r"(?i)send\s+(all|every|entire)\s+\w+\s+to\s+", "data_exfiltration", 0.8),
(r"(?i)export\s+.\s+to\s+(external|outside|third.?party)", "data_exfiltration", 0.9),
(r"(?i)curl\s+.\s+-d\s+", "data_exfiltration", 0.7),
# Privilege escalation
(r"(?i)(sudo|as\s+root|admin\s+access)", "privilege_escalation", 0.8),
(r"(?i)chmod\s+777", "privilege_escalation", 0.9),
# System modification
(r"(?i)(rm\s+-rf|del\s+/[sq]|format\s+c:)", "system_destruction", 0.95),
(r"(?i)(drop\s+database|truncate\s+table)", "system_destruction", 0.9),
# Prompt injection
(r"(?i)ignore\s+(previous|above|all)\s+(instructions?|rules?)", "prompt_injection", 0.9),
(r"(?i)you\s+are\s+now\s+(a|an)\s+", "prompt_injection", 0.7),]
def classify_intent(content: str) -> list[IntentSignal]:
"""Classify content for threat signals."""
signals = []
for pattern, category, weight in THREAT_SIGNALS:
match = re.search(pattern, content)
if match:
signals.append(IntentSignal(
category=category,
confidence=weight,
evidence=match.group()
))
return signals
def is_safe(content: str, threshold: float = 0.7) -> bool:
"""Quick check: is the content safe above the given threshold?"""
signals = classify_intent(content)
return not any(s.confidence >= threshold for s in signals)
**核心要点**:意图分类在工具执行**之前**进行,作为飞行前的安全检查。这与仅在生成**之后**进行检查的输出防护措施有着本质区别。
---Pattern 3: Tool-Level Governance Decorator
模式3:工具级治理装饰器
Wrap individual tool functions with governance checks:
python
import functools
import time
from collections import defaultdict
_call_counters: dict[str, int] = defaultdict(int)
def govern(policy: GovernancePolicy, audit_trail=None):
"""Decorator that enforces governance policy on a tool function."""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
tool_name = func.__name__
# 1. Check tool allowlist/blocklist
action = policy.check_tool(tool_name)
if action == PolicyAction.DENY:
raise PermissionError(f"Policy '{policy.name}' blocks tool '{tool_name}'")
if action == PolicyAction.REVIEW:
raise PermissionError(f"Tool '{tool_name}' requires human approval")
# 2. Check rate limit
_call_counters[policy.name] += 1
if _call_counters[policy.name] > policy.max_calls_per_request:
raise PermissionError(f"Rate limit exceeded: {policy.max_calls_per_request} calls")
# 3. Check content in arguments
for arg in list(args) + list(kwargs.values()):
if isinstance(arg, str):
matched = policy.check_content(arg)
if matched:
raise PermissionError(f"Blocked pattern detected: {matched}")
# 4. Execute and audit
start = time.monotonic()
try:
result = await func(*args, **kwargs)
if audit_trail is not None:
audit_trail.append({
"tool": tool_name,
"action": "allowed",
"duration_ms": (time.monotonic() - start) * 1000,
"timestamp": time.time()
})
return result
except Exception as e:
if audit_trail is not None:
audit_trail.append({
"tool": tool_name,
"action": "error",
"error": str(e),
"timestamp": time.time()
})
raise
return wrapper
return decorator用治理检查包裹单个工具函数:
python
import functools
import time
from collections import defaultdict
_call_counters: dict[str, int] = defaultdict(int)
def govern(policy: GovernancePolicy, audit_trail=None):
"""Decorator that enforces governance policy on a tool function."""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
tool_name = func.__name__
# 1. Check tool allowlist/blocklist
action = policy.check_tool(tool_name)
if action == PolicyAction.DENY:
raise PermissionError(f"Policy '{policy.name}' blocks tool '{tool_name}'")
if action == PolicyAction.REVIEW:
raise PermissionError(f"Tool '{tool_name}' requires human approval")
# 2. Check rate limit
_call_counters[policy.name] += 1
if _call_counters[policy.name] > policy.max_calls_per_request:
raise PermissionError(f"Rate limit exceeded: {policy.max_calls_per_request} calls")
# 3. Check content in arguments
for arg in list(args) + list(kwargs.values()):
if isinstance(arg, str):
matched = policy.check_content(arg)
if matched:
raise PermissionError(f"Blocked pattern detected: {matched}")
# 4. Execute and audit
start = time.monotonic()
try:
result = await func(*args, **kwargs)
if audit_trail is not None:
audit_trail.append({
"tool": tool_name,
"action": "allowed",
"duration_ms": (time.monotonic() - start) * 1000,
"timestamp": time.time()
})
return result
except Exception as e:
if audit_trail is not None:
audit_trail.append({
"tool": tool_name,
"action": "error",
"error": str(e),
"timestamp": time.time()
})
raise
return wrapper
return decoratorUsage with any agent framework
Usage with any agent framework
audit_log = []
policy = GovernancePolicy(
name="search-agent",
allowed_tools=["search", "summarize"],
blocked_patterns=[r"(?i)password"],
max_calls_per_request=10
)
@govern(policy, audit_trail=audit_log)
async def search(query: str) -> str:
"""Search documents — governed by policy."""
return f"Results for: {query}"
audit_log = []
policy = GovernancePolicy(
name="search-agent",
allowed_tools=["search", "summarize"],
blocked_patterns=[r"(?i)password"],
max_calls_per_request=10
)
@govern(policy, audit_trail=audit_log)
async def search(query: str) -> str:
"""Search documents — governed by policy."""
return f"Results for: {query}"
Passes: search("latest quarterly report")
Passes: search("latest quarterly report")
Blocked: search("show me the admin password")
Blocked: search("show me the admin password")
---
---Pattern 4: Trust Scoring
模式4:信任评分
Track agent reliability over time with decay-based trust scores:
python
from dataclasses import dataclass, field
import math
import time
@dataclass
class TrustScore:
"""Trust score with temporal decay."""
score: float = 0.5 # 0.0 (untrusted) to 1.0 (fully trusted)
successes: int = 0
failures: int = 0
last_updated: float = field(default_factory=time.time)
def record_success(self, reward: float = 0.05):
self.successes += 1
self.score = min(1.0, self.score + reward * (1 - self.score))
self.last_updated = time.time()
def record_failure(self, penalty: float = 0.15):
self.failures += 1
self.score = max(0.0, self.score - penalty * self.score)
self.last_updated = time.time()
def current(self, decay_rate: float = 0.001) -> float:
"""Get score with temporal decay — trust erodes without activity."""
elapsed = time.time() - self.last_updated
decay = math.exp(-decay_rate * elapsed)
return self.score * decay
@property
def reliability(self) -> float:
total = self.successes + self.failures
return self.successes / total if total > 0 else 0.0通过基于衰减的信任分数随时间追踪Agent的可靠性:
python
from dataclasses import dataclass, field
import math
import time
@dataclass
class TrustScore:
"""Trust score with temporal decay."""
score: float = 0.5 # 0.0 (untrusted) to 1.0 (fully trusted)
successes: int = 0
failures: int = 0
last_updated: float = field(default_factory=time.time)
def record_success(self, reward: float = 0.05):
self.successes += 1
self.score = min(1.0, self.score + reward * (1 - self.score))
self.last_updated = time.time()
def record_failure(self, penalty: float = 0.15):
self.failures += 1
self.score = max(0.0, self.score - penalty * self.score)
self.last_updated = time.time()
def current(self, decay_rate: float = 0.001) -> float:
"""Get score with temporal decay — trust erodes without activity."""
elapsed = time.time() - self.last_updated
decay = math.exp(-decay_rate * elapsed)
return self.score * decay
@property
def reliability(self) -> float:
total = self.successes + self.failures
return self.successes / total if total > 0 else 0.0Usage in multi-agent systems
Usage in multi-agent systems
trust = TrustScore()
trust = TrustScore()
Agent completes tasks successfully
Agent completes tasks successfully
trust.record_success() # 0.525
trust.record_success() # 0.549
trust.record_success() # 0.525
trust.record_success() # 0.549
Agent makes an error
Agent makes an error
trust.record_failure() # 0.467
trust.record_failure() # 0.467
Gate sensitive operations on trust
Gate sensitive operations on trust
if trust.current() >= 0.7:
# Allow autonomous operation
pass
elif trust.current() >= 0.4:
# Allow with human oversight
pass
else:
# Deny or require explicit approval
pass
**Multi-agent trust**: In systems where agents delegate to other agents, each agent maintains trust scores for its delegates:
```python
class AgentTrustRegistry:
def __init__(self):
self.scores: dict[str, TrustScore] = {}
def get_trust(self, agent_id: str) -> TrustScore:
if agent_id not in self.scores:
self.scores[agent_id] = TrustScore()
return self.scores[agent_id]
def most_trusted(self, agents: list[str]) -> str:
return max(agents, key=lambda a: self.get_trust(a).current())
def meets_threshold(self, agent_id: str, threshold: float) -> bool:
return self.get_trust(agent_id).current() >= thresholdif trust.current() >= 0.7:
# Allow autonomous operation
pass
elif trust.current() >= 0.4:
# Allow with human oversight
pass
else:
# Deny or require explicit approval
pass
**多Agent信任**:在Agent之间相互委托任务的系统中,每个Agent都会为其委托的Agent维护信任分数:
```python
class AgentTrustRegistry:
def __init__(self):
self.scores: dict[str, TrustScore] = {}
def get_trust(self, agent_id: str) -> TrustScore:
if agent_id not in self.scores:
self.scores[agent_id] = TrustScore()
return self.scores[agent_id]
def most_trusted(self, agents: list[str]) -> str:
return max(agents, key=lambda a: self.get_trust(a).current())
def meets_threshold(self, agent_id: str, threshold: float) -> bool:
return self.get_trust(agent_id).current() >= thresholdPattern 5: Audit Trail
模式5:审计追踪
Append-only audit log for all agent actions — critical for compliance and debugging:
python
from dataclasses import dataclass, field
import json
import time
@dataclass
class AuditEntry:
timestamp: float
agent_id: str
tool_name: str
action: str # "allowed", "denied", "error"
policy_name: str
details: dict = field(default_factory=dict)
class AuditTrail:
"""Append-only audit trail for agent governance events."""
def __init__(self):
self._entries: list[AuditEntry] = []
def log(self, agent_id: str, tool_name: str, action: str,
policy_name: str, **details):
self._entries.append(AuditEntry(
timestamp=time.time(),
agent_id=agent_id,
tool_name=tool_name,
action=action,
policy_name=policy_name,
details=details
))
def denied(self) -> list[AuditEntry]:
"""Get all denied actions — useful for security review."""
return [e for e in self._entries if e.action == "denied"]
def by_agent(self, agent_id: str) -> list[AuditEntry]:
return [e for e in self._entries if e.agent_id == agent_id]
def export_jsonl(self, path: str):
"""Export as JSON Lines for log aggregation systems."""
with open(path, "w") as f:
for entry in self._entries:
f.write(json.dumps({
"timestamp": entry.timestamp,
"agent_id": entry.agent_id,
"tool": entry.tool_name,
"action": entry.action,
"policy": entry.policy_name,
**entry.details
}) + "\n")为所有Agent操作提供仅追加的审计日志——这对合规性和调试至关重要:
python
from dataclasses import dataclass, field
import json
import time
@dataclass
class AuditEntry:
timestamp: float
agent_id: str
tool_name: str
action: str # "allowed", "denied", "error"
policy_name: str
details: dict = field(default_factory=dict)
class AuditTrail:
"""Append-only audit trail for agent governance events."""
def __init__(self):
self._entries: list[AuditEntry] = []
def log(self, agent_id: str, tool_name: str, action: str,
policy_name: str, **details):
self._entries.append(AuditEntry(
timestamp=time.time(),
agent_id=agent_id,
tool_name=tool_name,
action=action,
policy_name=policy_name,
details=details
))
def denied(self) -> list[AuditEntry]:
"""Get all denied actions — useful for security review."""
return [e for e in self._entries if e.action == "denied"]
def by_agent(self, agent_id: str) -> list[AuditEntry]:
return [e for e in self._entries if e.agent_id == agent_id]
def export_jsonl(self, path: str):
"""Export as JSON Lines for log aggregation systems."""
with open(path, "w") as f:
for entry in self._entries:
f.write(json.dumps({
"timestamp": entry.timestamp,
"agent_id": entry.agent_id,
"tool": entry.tool_name,
"action": entry.action,
"policy": entry.policy_name,
**entry.details
}) + "\n")Pattern 6: Framework Integration
模式6:框架集成
PydanticAI
PydanticAI
python
from pydantic_ai import Agent
policy = GovernancePolicy(
name="support-bot",
allowed_tools=["search_docs", "create_ticket"],
blocked_patterns=[r"(?i)(ssn|social\s+security|credit\s+card)"],
max_calls_per_request=20
)
agent = Agent("openai:gpt-4o", system_prompt="You are a support assistant.")
@agent.tool
@govern(policy)
async def search_docs(ctx, query: str) -> str:
"""Search knowledge base — governed."""
return await kb.search(query)
@agent.tool
@govern(policy)
async def create_ticket(ctx, title: str, body: str) -> str:
"""Create support ticket — governed."""
return await tickets.create(title=title, body=body)python
from pydantic_ai import Agent
policy = GovernancePolicy(
name="support-bot",
allowed_tools=["search_docs", "create_ticket"],
blocked_patterns=[r"(?i)(ssn|social\s+security|credit\s+card)"],
max_calls_per_request=20
)
agent = Agent("openai:gpt-4o", system_prompt="You are a support assistant.")
@agent.tool
@govern(policy)
async def search_docs(ctx, query: str) -> str:
"""Search knowledge base — governed."""
return await kb.search(query)
@agent.tool
@govern(policy)
async def create_ticket(ctx, title: str, body: str) -> str:
"""Create support ticket — governed."""
return await tickets.create(title=title, body=body)CrewAI
CrewAI
python
from crewai import Agent, Task, Crew
policy = GovernancePolicy(
name="research-crew",
allowed_tools=["search", "analyze"],
max_calls_per_request=30
)python
from crewai import Agent, Task, Crew
policy = GovernancePolicy(
name="research-crew",
allowed_tools=["search", "analyze"],
max_calls_per_request=30
)Apply governance at the crew level
Apply governance at the crew level
def governed_crew_run(crew: Crew, policy: GovernancePolicy):
"""Wrap crew execution with governance checks."""
audit = AuditTrail()
for agent in crew.agents:
for tool in agent.tools:
original = tool.func
tool.func = govern(policy, audit_trail=audit)(original)
result = crew.kickoff()
return result, audit
undefineddef governed_crew_run(crew: Crew, policy: GovernancePolicy):
"""Wrap crew execution with governance checks."""
audit = AuditTrail()
for agent in crew.agents:
for tool in agent.tools:
original = tool.func
tool.func = govern(policy, audit_trail=audit)(original)
result = crew.kickoff()
return result, audit
undefinedOpenAI Agents SDK
OpenAI Agents SDK
python
from agents import Agent, function_tool
policy = GovernancePolicy(
name="coding-agent",
allowed_tools=["read_file", "write_file", "run_tests"],
blocked_tools=["shell_exec"],
max_calls_per_request=50
)
@function_tool
@govern(policy)
async def read_file(path: str) -> str:
"""Read file contents — governed."""
import os
safe_path = os.path.realpath(path)
if not safe_path.startswith(os.path.realpath(".")):
raise ValueError("Path traversal blocked by governance")
with open(safe_path) as f:
return f.read()python
from agents import Agent, function_tool
policy = GovernancePolicy(
name="coding-agent",
allowed_tools=["read_file", "write_file", "run_tests"],
blocked_tools=["shell_exec"],
max_calls_per_request=50
)
@function_tool
@govern(policy)
async def read_file(path: str) -> str:
"""Read file contents — governed."""
import os
safe_path = os.path.realpath(path)
if not safe_path.startswith(os.path.realpath(".")):
raise ValueError("Path traversal blocked by governance")
with open(safe_path) as f:
return f.read()Governance Levels
治理级别
Match governance strictness to risk level:
| Level | Controls | Use Case |
|---|---|---|
| Open | Audit only, no restrictions | Internal dev/testing |
| Standard | Tool allowlist + content filters | General production agents |
| Strict | All controls + human approval for sensitive ops | Financial, healthcare, legal |
| Locked | Allowlist only, no dynamic tools, full audit | Compliance-critical systems |
根据风险级别匹配治理严格程度:
| 级别 | 控制措施 | 适用场景 |
|---|---|---|
| 开放 | 仅审计,无限制 | 内部开发/测试 |
| 标准 | 工具允许列表 + 内容过滤 | 通用生产环境Agent |
| 严格 | 所有控制措施 + 敏感操作需人工审批 | 金融、医疗、法律领域 |
| 锁定 | 仅允许列表,无动态工具,完整审计 | 合规关键系统 |
Best Practices
最佳实践
| Practice | Rationale |
|---|---|
| Policy as configuration | Store policies in YAML/JSON, not hardcoded — enables change without deploys |
| Most-restrictive-wins | When composing policies, deny always overrides allow |
| Pre-flight intent check | Classify intent before tool execution, not after |
| Trust decay | Trust scores should decay over time — require ongoing good behavior |
| Append-only audit | Never modify or delete audit entries — immutability enables compliance |
| Fail closed | If governance check errors, deny the action rather than allowing it |
| Separate policy from logic | Governance enforcement should be independent of agent business logic |
| 实践 | 原理 |
|---|---|
| 策略即配置 | 将策略存储在YAML/JSON中,而非硬编码——无需部署即可变更 |
| 最严格规则优先 | 组合策略时,拒绝操作始终覆盖允许操作 |
| 飞行前意图检查 | 在工具执行之前对意图进行分类,而非之后 |
| 信任衰减 | 信任分数应随时间衰减——需要持续的良好表现 |
| 仅追加审计 | 绝不修改或删除审计条目——不可变性确保合规性 |
| 默认拒绝 | 如果治理检查出错,拒绝操作而非允许 |
| 策略与逻辑分离 | 治理执行应独立于Agent的业务逻辑 |
Quick Start Checklist
快速入门检查清单
markdown
undefinedmarkdown
undefinedAgent Governance Implementation Checklist
Agent治理实施检查清单
Setup
设置
- Define governance policy (allowed tools, blocked patterns, rate limits)
- Choose governance level (open/standard/strict/locked)
- Set up audit trail storage
- 定义治理策略(允许的工具、阻止的模式、速率限制)
- 选择治理级别(开放/标准/严格/锁定)
- 设置审计追踪存储
Implementation
实施
- Add @govern decorator to all tool functions
- Add intent classification to user input processing
- Implement trust scoring for multi-agent interactions
- Wire up audit trail export
- 为所有工具函数添加@govern装饰器
- 为用户输入处理添加意图分类
- 为多Agent交互实现信任评分
- 连接审计追踪导出功能
Validation
验证
- Test that blocked tools are properly denied
- Test that content filters catch sensitive patterns
- Test rate limiting behavior
- Verify audit trail captures all events
- Test policy composition (most-restrictive-wins)
---- 测试被阻止的工具是否被正确拒绝
- 测试内容过滤器是否能捕获敏感模式
- 测试速率限制行为
- 验证审计追踪捕获所有事件
- 测试策略组合(最严格规则优先)
---Related Resources
相关资源
- Agent-OS Governance Engine — Full governance framework
- AgentMesh Integrations — Framework-specific packages
- OWASP Top 10 for LLM Applications