foundry-security-spec
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseFoundry Security Spec
Foundry安全规范
Skill by ara.so — Security Skills collection.
Foundry is an open specification from Cisco for building agentic AI security evaluation systems. It defines a multi-agent architecture with 8 core roles and 5 extension roles that coordinate to discover, validate, and report security findings. This is NOT a tool to install—it's a blueprint for building your own security evaluation system.
由ara.so提供的技能——安全技能合集。
Foundry是Cisco推出的用于构建智能体AI安全评估系统的开放规范。它定义了包含8个核心角色和5个扩展角色的多智能体架构,这些角色协同工作以发现、验证并报告安全问题。这并非一款可安装的工具——而是用于构建您专属安全评估系统的蓝图。
Core Concepts
核心概念
Foundry provides:
- Architecture: 8 core agent roles (Orchestrator, Planner, Navigator, Detector, Explorer, Validator, Investigator, Publisher)
- Finding Lifecycle: States, verdicts, evidence gates, fingerprinting
- Coordination Model: Atomic claims, heartbeat liveness, auto-blocking
- Governance: Sandboxing, budgets, yield-gated auto-stop, coverage gates
- Detection-to-Prevention Flywheel: Rules catch known issues, explorers find new ones, gaps become new rules
Works with CodeGuard rule format for portable detection rules that transfer between evaluation and prevention.
Foundry提供以下内容:
- 架构:8个核心智能体角色(Orchestrator、Planner、Navigator、Detector、Explorer、Validator、Investigator、Publisher)
- 问题生命周期:状态、判定、证据门控、指纹识别
- 协同模型:原子声明、心跳存活检测、自动阻塞
- 治理机制:沙箱隔离、预算控制、基于产出的自动停止、覆盖度门控
- 检测到预防的飞轮效应:规则捕获已知问题,探索者发现新问题,漏洞缺口转化为新规则
支持CodeGuard规则格式,可在评估与预防系统间移植检测规则。
Repository Structure
仓库结构
foundry-security-spec/
├── spec.md # Main specification (~130 functional requirements)
├── constitution.md # 11 inviolable principles
├── GLOSSARY.md # Terminology reference
└── README.md # Implementation guidefoundry-security-spec/
├── spec.md # 主规范文档(约130项功能需求)
├── constitution.md # 11项不可违背的原则
├── GLOSSARY.md # 术语参考
└── README.md # 实施指南Implementation Workflow
实施工作流
Step 1: Read the Constitution
步骤1:阅读原则文档
bash
undefinedbash
undefinedThe constitution contains 11 principles that constrain all implementations
原则文档包含约束所有实施的11项原则
cat constitution.md
Key principles to understand:
- **No unsupervised execution**: Every finding requires explicit confirmation
- **Evidence-gated findings**: Claims without evidence don't become findings
- **Reproducibility**: Every finding must be reproducible from its evidence
- **Atomic progress**: Claims are indivisible units of work
- **Fail-safe defaults**: When stuck, escalate or yield—never guesscat constitution.md
需要理解的核心原则:
- **无无监督执行**:每个问题都需要明确确认
- **证据门控问题**:无证据的声明不能成为正式问题
- **可复现性**:每个问题必须能通过其证据复现
- **原子性进展**:声明是不可分割的工作单元
- **故障安全默认值**:遇到阻塞时,升级或终止——绝不猜测Step 2: Install spec-kit
步骤2:安装spec-kit
bash
undefinedbash
undefinedIn your project directory
在您的项目目录中
npm install -g @github/spec-kit
npm install -g @github/spec-kit
or follow spec-kit installation for your coding agent
或为您的编码智能体遵循spec-kit安装流程
Initialize in your project
在项目中初始化
cd your-security-eval-project
speckit init
This creates `.specify/` directory for spec-driven development.cd your-security-eval-project
speckit init
这会创建`.specify/`目录用于规范驱动开发。Step 3: Register the Constitution
步骤3:注册原则文档
bash
undefinedbash
undefinedCopy constitution into spec-kit memory
将原则文档复制到spec-kit内存目录
cp path/to/foundry-security-spec/constitution.md .specify/memory/constitution.md
cp path/to/foundry-security-spec/constitution.md .specify/memory/constitution.md
Register it with your agent
向您的智能体注册该文档
/speckit.constitution
/speckit.constitution
Select "adopt existing constitution" when prompted
出现提示时选择“采用现有原则文档”
undefinedundefinedStep 4: Seed Your Specification
步骤4:导入规范种子
bash
undefinedbash
undefinedCreate specs directory
创建规范目录
mkdir -p specs/001-foundry
mkdir -p specs/001-foundry
Copy seed specification
复制种子规范
cp path/to/foundry-security-spec/spec.md specs/001-foundry/spec.md
undefinedcp path/to/foundry-security-spec/spec.md specs/001-foundry/spec.md
undefinedStep 5: Clarify for Your Environment
步骤5:针对您的环境明确细节
bash
undefinedbash
undefinedRun clarification workflow
运行澄清工作流
/speckit.clarify
Answer questions in these categories:
**Identity & Scope:**Q: What is your system name?
A: acme-security-eval
Q: Does "authorized evaluation with source access" hold?
A: yes
Q: Merge, split, or keep the 8 core roles as-is?
A: keep as-is for first implementation
**Integration Choices:**Q: Version control system?
A: GitLab self-hosted at https://gitlab.internal
Q: Issue tracker?
A: Jira at https://jira.acme.com
Q: LLM provider?
A: OpenAI via internal gateway at https://llm.internal/v1
Q: Datastore?
A: PostgreSQL
Q: Isolation runtime?
A: Docker containers with network isolation
Q: Deployment target?
A: Kubernetes cluster
**Policy Choices:**Q: Severity taxonomy?
A: Critical/High/Medium/Low matching our existing CVE scale
Q: Surface needs-review findings?
A: No, validator rejects inconclusive findings
Q: Label naming convention?
A: foundry:role/name format
**Extension Scope (recommend NO for first build):**Q: Include Attack-Mapper role?
A: no
Q: Include Regression-Tracker role?
A: no
Q: Include Compliance-Mapper role?
A: no
Q: Include Impact-Assessor role?
A: no
Q: Include Remediation-Drafter role?
A: no
undefined/speckit.clarify
回答以下类别的问题:
**身份与范围:**Q: 您的系统名称是什么?
A: acme-security-eval
Q: 是否支持“授权访问源码的评估”?
A: yes
Q: 对8个核心角色是合并、拆分还是保持原样?
A: 首次实施保持原样
**集成选择:**Q: 版本控制系统?
A: GitLab自托管,地址为https://gitlab.internal
Q: 问题追踪工具?
A: Jira,地址为https://jira.acme.com
Q: LLM提供商?
A: OpenAI,通过内部网关https://llm.internal/v1访问
Q: 数据存储?
A: PostgreSQL
Q: 隔离运行时?
A: 带网络隔离的Docker容器
Q: 部署目标?
A: Kubernetes集群
**策略选择:**Q: 严重程度分类?
A: Critical/High/Medium/Low,与我们现有CVE标准匹配
Q: 是否显示需审核的问题?
A: 否,验证器会拒绝不确定的问题
Q: 标签命名规范?
A: foundry:role/name格式
**扩展范围(首次构建建议选择NO):**Q: 是否包含Attack-Mapper角色?
A: no
Q: 是否包含Regression-Tracker角色?
A: no
Q: 是否包含Compliance-Mapper角色?
A: no
Q: 是否包含Impact-Assessor角色?
A: no
Q: 是否包含Remediation-Drafter角色?
A: no
undefinedStep 6: Generate Your Specification
步骤6:生成您的专属规范
bash
undefinedbash
undefinedHarden clarified spec
固化已澄清的规范
/speckit.specify
/speckit.specify
Check for remaining clarifications
检查是否仍有需澄清的内容
/speckit.clarify
/speckit.clarify
Repeat until no markers remain
重复操作直到没有标记项剩余
Your `specs/001-foundry/spec.md` now contains YOUR specification with decisions filled in.
您的`specs/001-foundry/spec.md`现在包含已填入决策的专属规范。Step 7: Implement
步骤7:实施开发
bash
undefinedbash
undefinedGenerate technical design
生成技术设计
/speckit.plan
/speckit.plan
Generate task backlog
生成任务待办列表
/speckit.tasks
/speckit.tasks
Start implementation
启动实施
/speckit.implement
undefined/speckit.implement
undefinedAgent Role Implementation Examples
智能体角色实施示例
Orchestrator Pattern
Orchestrator模式
python
undefinedpython
undefinedorchestrator.py
orchestrator.py
import asyncio
from typing import List, Dict
from datastore import FindingStore, ClaimStore
from agents import Planner, Detector, Explorer, Validator
class Orchestrator:
def init(self,
llm_client,
finding_store: FindingStore,
claim_store: ClaimStore,
budget_manager):
self.llm = llm_client
self.findings = finding_store
self.claims = claim_store
self.budget = budget_manager
# Initialize agent roles
self.planner = Planner(llm_client)
self.detector = Detector(llm_client, rules_corpus)
self.explorer = Explorer(llm_client)
self.validator = Validator(llm_client)
async def evaluate(self, target_repo: str) -> Dict:
"""Run complete evaluation coordinating all agents."""
# FR-001: Orchestrator creates evaluation record
eval_id = await self.findings.create_evaluation(
target=target_repo,
status="running"
)
try:
# FR-010: Planner creates work plan
plan = await self.planner.create_plan(target_repo)
await self.claims.store_plan(eval_id, plan)
# FR-020: Distribute work to detection and exploration
detection_task = asyncio.create_task(
self.run_detection(eval_id, plan)
)
exploration_task = asyncio.create_task(
self.run_exploration(eval_id, plan)
)
# FR-005: Monitor heartbeats and budgets
monitor_task = asyncio.create_task(
self.monitor_health(eval_id)
)
# Wait for completion
await asyncio.gather(
detection_task,
exploration_task,
monitor_task
)
# FR-006: Check coverage gate before completion
coverage = await self.calculate_coverage(eval_id)
if coverage < plan.required_coverage:
raise InsufficientCoverageError(
f"Coverage {coverage}% < required {plan.required_coverage}%"
)
# Mark evaluation complete
await self.findings.update_evaluation(
eval_id,
status="complete",
coverage=coverage
)
return {
"eval_id": eval_id,
"status": "complete",
"findings": await self.findings.count(eval_id),
"coverage": coverage
}
except Exception as e:
# FR-008: Fail-safe: mark evaluation failed
await self.findings.update_evaluation(
eval_id,
status="failed",
error=str(e)
)
raise
async def monitor_health(self, eval_id: str):
"""Monitor agent heartbeats and budgets."""
while True:
await asyncio.sleep(30)
# FR-007: Check heartbeats
stalled = await self.claims.find_stalled_claims(
eval_id,
heartbeat_threshold=300 # 5 minutes
)
for claim in stalled:
# FR-007: Auto-block stalled claims
await self.claims.block_claim(
claim.id,
reason="heartbeat_timeout"
)
# FR-009: Check budget exhaustion
if await self.budget.is_exhausted(eval_id):
await self.findings.update_evaluation(
eval_id,
status="budget_exhausted"
)
breakundefinedimport asyncio
from typing import List, Dict
from datastore import FindingStore, ClaimStore
from agents import Planner, Detector, Explorer, Validator
class Orchestrator:
def init(self,
llm_client,
finding_store: FindingStore,
claim_store: ClaimStore,
budget_manager):
self.llm = llm_client
self.findings = finding_store
self.claims = claim_store
self.budget = budget_manager
# Initialize agent roles
self.planner = Planner(llm_client)
self.detector = Detector(llm_client, rules_corpus)
self.explorer = Explorer(llm_client)
self.validator = Validator(llm_client)
async def evaluate(self, target_repo: str) -> Dict:
"""Run complete evaluation coordinating all agents."""
# FR-001: Orchestrator creates evaluation record
eval_id = await self.findings.create_evaluation(
target=target_repo,
status="running"
)
try:
# FR-010: Planner creates work plan
plan = await self.planner.create_plan(target_repo)
await self.claims.store_plan(eval_id, plan)
# FR-020: Distribute work to detection and exploration
detection_task = asyncio.create_task(
self.run_detection(eval_id, plan)
)
exploration_task = asyncio.create_task(
self.run_exploration(eval_id, plan)
)
# FR-005: Monitor heartbeats and budgets
monitor_task = asyncio.create_task(
self.monitor_health(eval_id)
)
# Wait for completion
await asyncio.gather(
detection_task,
exploration_task,
monitor_task
)
# FR-006: Check coverage gate before completion
coverage = await self.calculate_coverage(eval_id)
if coverage < plan.required_coverage:
raise InsufficientCoverageError(
f"Coverage {coverage}% < required {plan.required_coverage}%"
)
# Mark evaluation complete
await self.findings.update_evaluation(
eval_id,
status="complete",
coverage=coverage
)
return {
"eval_id": eval_id,
"status": "complete",
"findings": await self.findings.count(eval_id),
"coverage": coverage
}
except Exception as e:
# FR-008: Fail-safe: mark evaluation failed
await self.findings.update_evaluation(
eval_id,
status="failed",
error=str(e)
)
raise
async def monitor_health(self, eval_id: str):
"""Monitor agent heartbeats and budgets."""
while True:
await asyncio.sleep(30)
# FR-007: Check heartbeats
stalled = await self.claims.find_stalled_claims(
eval_id,
heartbeat_threshold=300 # 5 minutes
)
for claim in stalled:
# FR-007: Auto-block stalled claims
await self.claims.block_claim(
claim.id,
reason="heartbeat_timeout"
)
# FR-009: Check budget exhaustion
if await self.budget.is_exhausted(eval_id):
await self.findings.update_evaluation(
eval_id,
status="budget_exhausted"
)
breakundefinedDetector with CodeGuard Rules
集成CodeGuard规则的Detector
python
undefinedpython
undefineddetector.py
detector.py
from typing import List
from codeguard import RuleEngine, Finding as CodeGuardFinding
from models import Claim, Finding
class Detector:
def init(self, llm_client, rules_corpus_path: str):
self.llm = llm_client
# FR-030: Load CodeGuard rules
self.rule_engine = RuleEngine.load(rules_corpus_path)
async def process_claim(self, claim: Claim) -> List[Finding]:
"""Apply detection rules to a code claim."""
# FR-031: Extract relevant code from claim
code_units = await self.extract_code_units(claim)
findings = []
for unit in code_units:
# FR-032: Run rule engine
rule_hits = await self.rule_engine.evaluate(
code=unit.content,
context=unit.context,
language=unit.language
)
for hit in rule_hits:
# FR-033: Convert rule hit to finding
finding = Finding(
claim_id=claim.id,
rule_id=hit.rule_id,
severity=hit.severity,
weakness_id=hit.cwe_id,
location=hit.location,
evidence={
"rule_match": hit.matched_pattern,
"code_snippet": unit.content,
"line_range": hit.line_range
},
verdict="confirmed", # Rules are deterministic
status="validated"
)
findings.append(finding)
# FR-034: Record coverage
await self.record_coverage(claim.id, unit.path)
return findings
async def extract_code_units(self, claim: Claim):
"""Use LLM to identify relevant code units in claim scope."""
prompt = f"""
Claim: {claim.description}
Scope: {claim.scope}
Identify all code units (functions, methods, classes) that should be
evaluated for security issues related to this claim.
Return as JSON array with: path, name, start_line, end_line
"""
response = await self.llm.complete(prompt)
return parse_code_units(response)undefinedfrom typing import List
from codeguard import RuleEngine, Finding as CodeGuardFinding
from models import Claim, Finding
class Detector:
def init(self, llm_client, rules_corpus_path: str):
self.llm = llm_client
# FR-030: Load CodeGuard rules
self.rule_engine = RuleEngine.load(rules_corpus_path)
async def process_claim(self, claim: Claim) -> List[Finding]:
"""Apply detection rules to a code claim."""
# FR-031: Extract relevant code from claim
code_units = await self.extract_code_units(claim)
findings = []
for unit in code_units:
# FR-032: Run rule engine
rule_hits = await self.rule_engine.evaluate(
code=unit.content,
context=unit.context,
language=unit.language
)
for hit in rule_hits:
# FR-033: Convert rule hit to finding
finding = Finding(
claim_id=claim.id,
rule_id=hit.rule_id,
severity=hit.severity,
weakness_id=hit.cwe_id,
location=hit.location,
evidence={
"rule_match": hit.matched_pattern,
"code_snippet": unit.content,
"line_range": hit.line_range
},
verdict="confirmed", # Rules are deterministic
status="validated"
)
findings.append(finding)
# FR-034: Record coverage
await self.record_coverage(claim.id, unit.path)
return findings
async def extract_code_units(self, claim: Claim):
"""Use LLM to identify relevant code units in claim scope."""
prompt = f"""
Claim: {claim.description}
Scope: {claim.scope}
Identify all code units (functions, methods, classes) that should be
evaluated for security issues related to this claim.
Return as JSON array with: path, name, start_line, end_line
"""
response = await self.llm.complete(prompt)
return parse_code_units(response)undefinedExplorer for Novel Issues
用于发现新型问题的Explorer
python
undefinedpython
undefinedexplorer.py
explorer.py
import asyncio
from typing import List, Optional
from models import Claim, Finding, RuleGap
class Explorer:
def init(self, llm_client, sandbox_runtime):
self.llm = llm_client
self.sandbox = sandbox_runtime
async def investigate_claim(self, claim: Claim) -> List[Finding]:
"""Creative exploration beyond static rules."""
findings = []
# FR-040: Generate investigation hypotheses
hypotheses = await self.generate_hypotheses(claim)
for hypothesis in hypotheses:
# FR-041: Execute in isolated sandbox
async with self.sandbox.session() as session:
result = await self.test_hypothesis(
session,
hypothesis,
claim
)
if result.is_vulnerability:
# FR-042: Evidence-gated finding
if not result.has_reproduction:
# Don't create finding without evidence
continue
finding = Finding(
claim_id=claim.id,
severity=result.severity,
weakness_id=result.weakness_id,
description=result.description,
evidence=result.evidence,
verdict="needs-validation",
status="pending"
)
findings.append(finding)
# FR-043: Check if rules missed this
if await self.should_have_detected(finding):
await self.record_rule_gap(finding)
return findings
async def generate_hypotheses(self, claim: Claim) -> List[Dict]:
"""Use LLM to generate creative test hypotheses."""
prompt = f"""
You are exploring code for security issues that static rules may miss.
Claim: {claim.description}
Code scope: {claim.scope}
Generate 3-5 security hypotheses to test:
- Focus on logic bugs, state confusion, race conditions
- Consider what rules can't express (context-dependent issues)
- Prioritize high-impact scenarios
For each hypothesis provide:
- What to test
- Why it might be vulnerable
- How to reproduce if vulnerable
Return as JSON array.
"""
response = await self.llm.complete(
prompt,
temperature=0.7 # Higher for creative exploration
)
return parse_hypotheses(response)
async def record_rule_gap(self, finding: Finding):
"""Record that rules failed to detect this issue."""
gap = RuleGap(
finding_id=finding.id,
weakness_id=finding.weakness_id,
pattern=finding.evidence.get("vulnerable_pattern"),
reason="explorer_found_missed_by_detector",
suggested_rule=await self.draft_rule(finding)
)
# FR-044: Feed into rule corpus improvement
await self.rule_gaps.store(gap)undefinedimport asyncio
from typing import List, Optional
from models import Claim, Finding, RuleGap
class Explorer:
def init(self, llm_client, sandbox_runtime):
self.llm = llm_client
self.sandbox = sandbox_runtime
async def investigate_claim(self, claim: Claim) -> List[Finding]:
"""Creative exploration beyond static rules."""
findings = []
# FR-040: Generate investigation hypotheses
hypotheses = await self.generate_hypotheses(claim)
for hypothesis in hypotheses:
# FR-041: Execute in isolated sandbox
async with self.sandbox.session() as session:
result = await self.test_hypothesis(
session,
hypothesis,
claim
)
if result.is_vulnerability:
# FR-042: Evidence-gated finding
if not result.has_reproduction:
# Don't create finding without evidence
continue
finding = Finding(
claim_id=claim.id,
severity=result.severity,
weakness_id=result.weakness_id,
description=result.description,
evidence=result.evidence,
verdict="needs-validation",
status="pending"
)
findings.append(finding)
# FR-043: Check if rules missed this
if await self.should_have_detected(finding):
await self.record_rule_gap(finding)
return findings
async def generate_hypotheses(self, claim: Claim) -> List[Dict]:
"""Use LLM to generate creative test hypotheses."""
prompt = f"""
You are exploring code for security issues that static rules may miss.
Claim: {claim.description}
Code scope: {claim.scope}
Generate 3-5 security hypotheses to test:
- Focus on logic bugs, state confusion, race conditions
- Consider what rules can't express (context-dependent issues)
- Prioritize high-impact scenarios
For each hypothesis provide:
- What to test
- Why it might be vulnerable
- How to reproduce if vulnerable
Return as JSON array.
"""
response = await self.llm.complete(
prompt,
temperature=0.7 # Higher for creative exploration
)
return parse_hypotheses(response)
async def record_rule_gap(self, finding: Finding):
"""Record that rules failed to detect this issue."""
gap = RuleGap(
finding_id=finding.id,
weakness_id=finding.weakness_id,
pattern=finding.evidence.get("vulnerable_pattern"),
reason="explorer_found_missed_by_detector",
suggested_rule=await self.draft_rule(finding)
)
# FR-044: Feed into rule corpus improvement
await self.rule_gaps.store(gap)undefinedValidator for Finding Confirmation
用于确认问题的Validator
python
undefinedpython
undefinedvalidator.py
validator.py
from models import Finding, ValidationResult
class Validator:
def init(self, llm_client, sandbox_runtime):
self.llm = llm_client
self.sandbox = sandbox_runtime
async def validate_finding(self, finding: Finding) -> ValidationResult:
"""Reproduce and confirm finding from evidence."""
# FR-050: Check evidence completeness
if not self.has_sufficient_evidence(finding):
return ValidationResult(
verdict="rejected",
reason="insufficient_evidence"
)
# FR-051: Attempt reproduction
async with self.sandbox.session() as session:
reproduced = await self.reproduce_issue(
session,
finding.evidence
)
if not reproduced:
return ValidationResult(
verdict="rejected",
reason="not_reproducible"
)
# FR-052: Verify severity assessment
actual_severity = await self.assess_severity(
session,
finding
)
if actual_severity != finding.severity:
finding.severity = actual_severity
finding.evidence["severity_adjustment"] = {
"original": finding.severity,
"validated": actual_severity
}
# FR-053: Generate fingerprint for deduplication
fingerprint = await self.generate_fingerprint(finding)
return ValidationResult(
verdict="confirmed",
fingerprint=fingerprint,
severity=actual_severity,
reproduction_evidence=session.get_transcript()
)
def has_sufficient_evidence(self, finding: Finding) -> bool:
"""Check if finding has required evidence."""
required = ["location", "description"]
if finding.severity in ["critical", "high"]:
required.extend(["reproduction_steps", "impact"])
return all(k in finding.evidence for k in required)
async def generate_fingerprint(self, finding: Finding) -> str:
"""Create stable fingerprint for deduplication."""
# FR-054: Fingerprint combines weakness + location + root cause
components = [
finding.weakness_id,
finding.location.get("file_path"),
finding.location.get("function_name"),
finding.evidence.get("root_cause_pattern")
]
fingerprint_input = "|".join(str(c) for c in components if c)
return hashlib.sha256(fingerprint_input.encode()).hexdigest()[:16]undefinedfrom models import Finding, ValidationResult
class Validator:
def init(self, llm_client, sandbox_runtime):
self.llm = llm_client
self.sandbox = sandbox_runtime
async def validate_finding(self, finding: Finding) -> ValidationResult:
"""Reproduce and confirm finding from evidence."""
# FR-050: Check evidence completeness
if not self.has_sufficient_evidence(finding):
return ValidationResult(
verdict="rejected",
reason="insufficient_evidence"
)
# FR-051: Attempt reproduction
async with self.sandbox.session() as session:
reproduced = await self.reproduce_issue(
session,
finding.evidence
)
if not reproduced:
return ValidationResult(
verdict="rejected",
reason="not_reproducible"
)
# FR-052: Verify severity assessment
actual_severity = await self.assess_severity(
session,
finding
)
if actual_severity != finding.severity:
finding.severity = actual_severity
finding.evidence["severity_adjustment"] = {
"original": finding.severity,
"validated": actual_severity
}
# FR-053: Generate fingerprint for deduplication
fingerprint = await self.generate_fingerprint(finding)
return ValidationResult(
verdict="confirmed",
fingerprint=fingerprint,
severity=actual_severity,
reproduction_evidence=session.get_transcript()
)
def has_sufficient_evidence(self, finding: Finding) -> bool:
"""Check if finding has required evidence."""
required = ["location", "description"]
if finding.severity in ["critical", "high"]:
required.extend(["reproduction_steps", "impact"])
return all(k in finding.evidence for k in required)
async def generate_fingerprint(self, finding: Finding) -> str:
"""Create stable fingerprint for deduplication."""
# FR-054: Fingerprint combines weakness + location + root cause
components = [
finding.weakness_id,
finding.location.get("file_path"),
finding.location.get("function_name"),
finding.evidence.get("root_cause_pattern")
]
fingerprint_input = "|".join(str(c) for c in components if c)
return hashlib.sha256(fingerprint_input.encode()).hexdigest()[:16]undefinedPublisher for Issue Tracker Integration
集成问题追踪工具的Publisher
python
undefinedpython
undefinedpublisher.py
publisher.py
import aiohttp
from typing import Dict
from models import Finding
class Publisher:
def init(self, issue_tracker_config: Dict):
self.tracker_url = issue_tracker_config["url"]
self.project_key = issue_tracker_config["project"]
self.api_token = issue_tracker_config["token"] # From env: ISSUE_TRACKER_TOKEN
async def publish_finding(self, finding: Finding) -> str:
"""Create issue in tracker for confirmed finding."""
# FR-060: Only publish confirmed findings
if finding.verdict != "confirmed":
raise ValueError(f"Cannot publish {finding.verdict} finding")
# FR-061: Check for existing issue via fingerprint
existing = await self.find_existing_issue(finding.fingerprint)
if existing:
return existing.issue_id
# FR-062: Format issue according to tracker schema
issue_body = self.format_issue(finding)
# FR-063: Create issue
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.tracker_url}/rest/api/2/issue",
headers={
"Authorization": f"Bearer {self.api_token}",
"Content-Type": "application/json"
},
json=issue_body
) as resp:
resp.raise_for_status()
result = await resp.json()
issue_id = result["key"]
# FR-064: Update finding with issue reference
finding.issue_id = issue_id
finding.status = "published"
await finding.save()
return issue_id
def format_issue(self, finding: Finding) -> Dict:
"""Format finding as issue tracker ticket."""
description = f"""
*Security Finding from Foundry Evaluation*
*Severity:* {finding.severity.upper()}
*Weakness:* {finding.weakness_id}
*Location:* {finding.location.get('file_path')}:{finding.location.get('line_number')}
h3. Description
{finding.description}
h3. Evidence
{self.format_evidence(finding.evidence)}
h3. Reproduction
{finding.evidence.get('reproduction_steps', 'See evidence above')}
---
Fingerprint: {finding.fingerprint}
Evaluation ID: {finding.eval_id}
"""
return {
"fields": {
"project": {"key": self.project_key},
"summary": f"[{finding.severity.upper()}] {finding.weakness_id}: {finding.get_short_description()}",
"description": description,
"issuetype": {"name": "Security Vulnerability"},
"priority": {"name": self.map_severity_to_priority(finding.severity)},
"labels": [
f"foundry:eval:{finding.eval_id}",
f"foundry:weakness:{finding.weakness_id}",
f"foundry:fingerprint:{finding.fingerprint}"
]
}
}undefinedimport aiohttp
from typing import Dict
from models import Finding
class Publisher:
def init(self, issue_tracker_config: Dict):
self.tracker_url = issue_tracker_config["url"]
self.project_key = issue_tracker_config["project"]
self.api_token = issue_tracker_config["token"] # From env: ISSUE_TRACKER_TOKEN
async def publish_finding(self, finding: Finding) -> str:
"""Create issue in tracker for confirmed finding."""
# FR-060: Only publish confirmed findings
if finding.verdict != "confirmed":
raise ValueError(f"Cannot publish {finding.verdict} finding")
# FR-061: Check for existing issue via fingerprint
existing = await self.find_existing_issue(finding.fingerprint)
if existing:
return existing.issue_id
# FR-062: Format issue according to tracker schema
issue_body = self.format_issue(finding)
# FR-063: Create issue
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.tracker_url}/rest/api/2/issue",
headers={
"Authorization": f"Bearer {self.api_token}",
"Content-Type": "application/json"
},
json=issue_body
) as resp:
resp.raise_for_status()
result = await resp.json()
issue_id = result["key"]
# FR-064: Update finding with issue reference
finding.issue_id = issue_id
finding.status = "published"
await finding.save()
return issue_id
def format_issue(self, finding: Finding) -> Dict:
"""Format finding as issue tracker ticket."""
description = f"""
*Security Finding from Foundry Evaluation*
*Severity:* {finding.severity.upper()}
*Weakness:* {finding.weakness_id}
*Location:* {finding.location.get('file_path')}:{finding.location.get('line_number')}
h3. Description
{finding.description}
h3. Evidence
{self.format_evidence(finding.evidence)}
h3. Reproduction
{finding.evidence.get('reproduction_steps', 'See evidence above')}
---
Fingerprint: {finding.fingerprint}
Evaluation ID: {finding.eval_id}
"""
return {
"fields": {
"project": {"key": self.project_key},
"summary": f"[{finding.severity.upper()}] {finding.weakness_id}: {finding.get_short_description()}",
"description": description,
"issuetype": {"name": "Security Vulnerability"},
"priority": {"name": self.map_severity_to_priority(finding.severity)},
"labels": [
f"foundry:eval:{finding.eval_id}",
f"foundry:weakness:{finding.weakness_id}",
f"foundry:fingerprint:{finding.fingerprint}"
]
}
}undefinedConfiguration Examples
配置示例
Evaluation Configuration
评估配置
yaml
undefinedyaml
undefinedconfig/evaluation.yaml
config/evaluation.yaml
evaluation:
name: "acme-security-eval"
Orchestrator settings
orchestrator:
max_concurrent_claims: 10
heartbeat_interval: 60
stall_threshold: 300
Budget limits
budget:
max_tokens: 10000000
max_duration_hours: 48
per_agent_token_limit: 1000000
Coverage requirements
coverage:
required_percentage: 80
scope:
- src//*.py
- lib//*.js
exclude:
- tests/**
- docs/**
Agent roles enabled
agents:
core:
- orchestrator
- planner
- navigator
- detector
- explorer
- validator
- investigator
- publisher
extensions: [] # Start with core only
Integration endpoints
integrations:
llm:
provider: "openai"
endpoint: "${LLM_GATEWAY_URL}"
model: "gpt-4"
api_key: "${LLM_API_KEY}"
vcs:
type: "gitlab"
url: "${GITLAB_URL}"
token: "${GITLAB_TOKEN}"
issue_tracker:
type: "jira"
url: "${JIRA_URL}"
project: "SEC"
token: "${JIRA_TOKEN}"
datastore:
type: "postgresql"
connection_string: "${DATABASE_URL}"Security controls
sandbox:
runtime: "docker"
network_isolation: true
timeout_seconds: 300
resource_limits:
cpu: "1"
memory: "2Gi"
undefinedevaluation:
name: "acme-security-eval"
Orchestrator设置
orchestrator:
max_concurrent_claims: 10
heartbeat_interval: 60
stall_threshold: 300
预算限制
budget:
max_tokens: 10000000
max_duration_hours: 48
per_agent_token_limit: 1000000
覆盖度要求
coverage:
required_percentage: 80
scope:
- src//*.py
- lib//*.js
exclude:
- tests/**
- docs/**
启用的智能体角色
agents:
core:
- orchestrator
- planner
- navigator
- detector
- explorer
- validator
- investigator
- publisher
extensions: [] # 先从核心角色开始
集成端点
integrations:
llm:
provider: "openai"
endpoint: "${LLM_GATEWAY_URL}"
model: "gpt-4"
api_key: "${LLM_API_KEY}"
vcs:
type: "gitlab"
url: "${GITLAB_URL}"
token: "${GITLAB_TOKEN}"
issue_tracker:
type: "jira"
url: "${JIRA_URL}"
project: "SEC"
token: "${JIRA_TOKEN}"
datastore:
type: "postgresql"
connection_string: "${DATABASE_URL}"安全控制
sandbox:
runtime: "docker"
network_isolation: true
timeout_seconds: 300
resource_limits:
cpu: "1"
memory: "2Gi"
undefinedDetection Rules Configuration
检测规则配置
yaml
undefinedyaml
undefinedconfig/detection.yaml
config/detection.yaml
detection:
CodeGuard rules corpus
rules_corpus: "/etc/foundry/rules/codeguard"
Rule categories to enable
enabled_categories:
- injection
- authentication
- authorization
- cryptography
- data-exposure
- configuration
Severity mapping
severity_mapping:
critical: ["CWE-89", "CWE-78", "CWE-79"] # SQLi, Command Injection, XSS
high: ["CWE-306", "CWE-862"] # Missing Auth, Authorization
medium: ["CWE-327", "CWE-338"] # Weak Crypto, Weak PRNG
low: ["CWE-209", "CWE-532"] # Info Leak, Log Injection
Rule gap tracking
rule_gaps:
enabled: true
auto_draft_rules: true
review_queue: "rule-improvements"
undefineddetection:
CodeGuard规则库
rules_corpus: "/etc/foundry/rules/codeguard"
启用的规则类别
enabled_categories:
- injection
- authentication
- authorization
- cryptography
- data-exposure
- configuration
严重程度映射
severity_mapping:
critical: ["CWE-89", "CWE-78", "CWE-79"] # SQL注入、命令注入、XSS
high: ["CWE-306", "CWE-862"] # 缺失认证、权限控制缺失
medium: ["CWE-327", "CWE-338"] # 弱加密、弱随机数生成器
low: ["CWE-209", "CWE-532"] # 信息泄露、日志注入
规则缺口追踪
rule_gaps:
enabled: true
auto_draft_rules: true
review_queue: "rule-improvements"
undefinedRunning an Evaluation
运行评估
python
undefinedpython
undefinedrun_evaluation.py
run_evaluation.py
import asyncio
from orchestrator import Orchestrator
from config import load_config
async def main():
# Load configuration
config = load_config("config/evaluation.yaml")
# Initialize orchestrator
orchestrator = Orchestrator(
llm_client=create_llm_client(config.integrations.llm),
finding_store=FindingStore(config.integrations.datastore),
claim_store=ClaimStore(config.integrations.datastore),
budget_manager=BudgetManager(config.budget)
)
# Run evaluation
result = await orchestrator.evaluate(
target_repo="https://gitlab.internal/acme/webapp"
)
print(f"Evaluation {result['eval_id']} complete")
print(f"Findings: {result['findings']}")
print(f"Coverage: {result['coverage']}%")if name == "main":
asyncio.run(main())
undefinedimport asyncio
from orchestrator import Orchestrator
from config import load_config
async def main():
# 加载配置
config = load_config("config/evaluation.yaml")
# 初始化Orchestrator
orchestrator = Orchestrator(
llm_client=create_llm_client(config.integrations.llm),
finding_store=FindingStore(config.integrations.datastore),
claim_store=ClaimStore(config.integrations.datastore),
budget_manager=BudgetManager(config.budget)
)
# 运行评估
result = await orchestrator.evaluate(
target_repo="https://gitlab.internal/acme/webapp"
)
print(f"Evaluation {result['eval_id']} complete")
print(f"Findings: {result['findings']}")
print(f"Coverage: {result['coverage']}%")if name == "main":
asyncio.run(main())
undefinedCommon Patterns
通用模式
Atomic Claim Processing
原子声明处理
python
undefinedpython
undefinedEvery agent processes claims atomically
每个智能体都以原子方式处理声明
async def process_claim(self, claim: Claim):
# Claim the work atomically
if not await self.claims.try_claim(claim.id, self.agent_id):
return # Another agent got it
try:
# Send heartbeat while working
heartbeat_task = asyncio.create_task(
self.send_heartbeats(claim.id)
)
# Do the work
result = await self.do_work(claim)
# Mark complete
await self.claims.complete(claim.id, result)
except Exception as e:
# Fail claim, don't retry (constitution principle)
await self.claims.fail(claim.id, str(e))
finally:
heartbeat_task.cancel()undefinedasync def process_claim(self, claim: Claim):
# 原子性抢占工作
if not await self.claims.try_claim(claim.id, self.agent_id):
return # 其他智能体已抢占
try:
# 工作时发送心跳
heartbeat_task = asyncio.create_task(
self.send_heartbeats(claim.id)
)
# 执行工作
result = await self.do_work(claim)
# 标记完成
await self.claims.complete(claim.id, result)
except Exception as e:
# 声明失败,不重试(原则要求)
await self.claims.fail(claim.id, str(e))
finally:
heartbeat_task.cancel()undefinedEvidence-Gated Finding Creation
证据门控的问题创建
python
undefinedpython
undefinedNever create findings without evidence
绝不创建无证据的问题
def create_finding(self, claim: Claim, issue: Dict) -> Optional[Finding]:
# Check evidence requirements
if not issue.get("location"):
logger.warning(f"No location for issue in {claim.id}, skipping")
return None
if not issue.get("reproduction"):
logger.warning(f"No reproduction for issue in {claim.id}, skipping")
return None
# Evidence is sufficient
return Finding(
claim_id=claim.id,
location=issue["location"],
evidence={
"reproduction": issue["reproduction"],
"impact": issue["impact"],
"code_snippet": issue["code"]
},
verdict="needs-validation"
)undefineddef create_finding(self, claim: Claim, issue: Dict) -> Optional[Finding]:
# 检查证据要求
if not issue.get("location"):
logger.warning(f"Claim {claim.id}中的问题无位置信息,跳过")
return None
if not issue.get("reproduction"):
logger.warning(f"Claim {claim.id}中的问题无复现信息,跳过")
return None
# 证据充足
return Finding(
claim_id=claim.id,
location=issue["location"],
evidence={
"reproduction": issue["reproduction"],
"impact": issue["impact"],
"code_snippet": issue["code"]
},
verdict="needs-validation"
)undefinedBudget Enforcement
预算强制执行
python
class BudgetManager:
async def check_budget(self, eval_id: str, tokens_requested: int) -> bool:
"""Check if budget allows operation."""
used = await self.get_tokens_used(eval_id)
limit = self.config.max_tokens
if used + tokens_requested > limit:
await self.notify_budget_exhausted(eval_id)
return False
return True
async def record_usage(self, eval_id: str, tokens: int):
"""Record token usage."""
await self.db.execute(
"INSERT INTO token_usage (eval_id, tokens, timestamp) VALUES ($1, $2, NOW())",
eval_id, tokens
)python
class BudgetManager:
async def check_budget(self, eval_id: str, tokens_requested: int) -> bool:
"""检查预算是否允许操作。"""
used = await self.get_tokens_used(eval_id)
limit = self.config.max_tokens
if used + tokens_requested > limit:
await self.notify_budget_exhausted(eval_id)
return False
return True
async def record_usage(self, eval_id: str, tokens: int):
"""记录token使用量。"""
await self.db.execute(
"INSERT INTO token_usage (eval_id, tokens, timestamp) VALUES ($1, $2, NOW())",
eval_id, tokens
)Troubleshooting
故障排查
Agents Not Finding Issues
智能体未发现问题
Check rule corpus:
bash
undefined检查规则库:
bash
undefinedVerify rules loaded
验证规则已加载
foundry-ctl list-rules --corpus /etc/foundry/rules/codeguard
foundry-ctl list-rules --corpus /etc/foundry/rules/codeguard
Test rule against sample code
测试规则对示例代码的有效性
foundry-ctl test-rule CWE-89 --file sample.py
**Check explorer creativity:**
```pythonfoundry-ctl test-rule CWE-89 --file sample.py
**检查探索者的创造性:**
```pythonIncrease temperature for hypothesis generation
提高假设生成的温度值
hypotheses = await self.llm.complete(
prompt,
temperature=0.8 # Higher = more creative, less reliable
)
undefinedhypotheses = await self.llm.complete(
prompt,
temperature=0.8 # 值越高越具创造性,可靠性越低
)
undefinedClaims Stalling
声明停滞
Check heartbeat configuration:
python
undefined检查心跳配置:
python
undefinedEnsure heartbeats are sent frequently enough
确保心跳发送频率足够高
HEARTBEAT_INTERVAL = 60 # seconds
STALL_THRESHOLD = 300 # 5 minutes
HEARTBEAT_INTERVAL = 60 # 秒
STALL_THRESHOLD = 300 # 5分钟
Heartbeats must be faster than threshold
心跳间隔必须小于阈值的一半
assert HEARTBEAT_INTERVAL < STALL_THRESHOLD / 2
**Check for deadlocks:**
```sql
-- Find stalled claims
SELECT claim_id, agent_id, last_heartbeat, status
FROM claims
WHERE status = 'in_progress'
AND last_heartbeat < NOW() - INTERVAL '5 minutes';assert HEARTBEAT_INTERVAL < STALL_THRESHOLD / 2
**检查死锁:**
```sqlFindings Not Publishing
查找停滞的声明
Check verdict state:
python
undefinedSELECT claim_id, agent_id, last_heartbeat, status
FROM claims
WHERE status = 'in_progress'
AND last_heartbeat < NOW() - INTERVAL '5 minutes';
undefinedOnly confirmed findings publish
问题未发布
if finding.verdict != "confirmed":
logger.error(f"Cannot publish {finding.id}, verdict={finding.verdict}")
# Finding needs validation first
**Check deduplication:**
```python检查判定状态:
python
undefinedVerify fingerprints are stable
仅已确认的问题会被发布
f1 = generate_fingerprint(finding)
f2 = generate_fingerprint(finding)
assert f1 == f2, "Fingerprints must be deterministic"
undefinedif finding.verdict != "confirmed":
logger.error(f"无法发布问题{finding.id},判定状态={finding.verdict}")
# 问题需先经过验证
**检查去重机制:**
```pythonLow Coverage
验证指纹是否稳定
Check scope configuration:
yaml
undefinedf1 = generate_fingerprint(finding)
f2 = generate_fingerprint(finding)
assert f1 == f2, "指纹必须具有确定性"
undefinedEnsure scope includes all target code
覆盖度低
coverage:
scope:
- src//*.py # Include all source
- lib//*.{js,ts} # Multiple extensions
exclude:
- tests/** # Don't count tests
- vendor/** # Don't count dependencies
**Check claim distribution:**
```python检查范围配置:
yaml
undefinedVerify planner creates sufficient claims
确保范围包含所有目标代码
plan = await planner.create_plan(target)
print(f"Claims created: {len(plan.claims)}")
print(f"Surface area: {plan.surface_area}")
coverage:
scope:
- src//*.py # 包含所有源码
- lib//*.{js,ts} # 支持多种扩展名
exclude:
- tests/** # 不统计测试代码
- vendor/** # 不统计依赖库
**检查声明分配:**
```pythonClaims should cover all in-scope files
验证规划器是否创建了足够的声明
undefinedplan = await planner.create_plan(target)
print(f"创建的声明数量: {len(plan.claims)}")
print(f"覆盖范围: {plan.surface_area}")
Integration with CodeGuard
声明应覆盖所有范围内的文件
python
undefinedundefinedLoad CodeGuard rules as Detector corpus
与CodeGuard集成
from codeguard import RuleEngine
class Detector:
def init(self, rules_path: str):
# Load portable CodeGuard rules
self.rules = RuleEngine.load(rules_path)
async def scan(self, code: str, language: str):
# Rules execute deterministically
return await self.rules.evaluate(code, language)python
undefinedExport rule gaps back to CodeGuard corpus
加载CodeGuard规则作为Detector的规则库
async def export_rule_gap(gap: RuleGap):
"""Convert discovered gap into CodeGuard rule."""
rule = {
"id": f"CWE-{gap.weakness_id}-{gap.pattern_hash}",
"name": gap.suggested_name,
"description": gap.description,
"pattern": gap.pattern,
"severity": gap.severity,
"languages": gap.applicable_languages
}
# Write to CodeGuard format
await write_codeguard_rule(rule, "rules/corpus/custom/")undefinedfrom codeguard import RuleEngine
class Detector:
def init(self, rules_path: str):
# 加载可移植的CodeGuard规则
self.rules = RuleEngine.load(rules_path)
async def scan(self, code: str, language: str):
# 规则执行具有确定性
return await self.rules.evaluate(code, language)Best Practices
将规则缺口导出回CodeGuard规则库
- Start with 8 core roles only — Get foundational pipeline working before adding extensions
- Constitution is non-negotiable — Each principle prevents a real production failure
- Evidence gates everything — No evidence = no finding, regardless of confidence
- Fingerprints must be stable — Same issue in same place = same fingerprint always
- Budgets prevent runaway — Set token limits, enforce them, auto-stop when exhausted
- Coverage before completion — Don't mark evaluation complete until coverage gate passes
- Rule gaps feed corpus — When explorer finds something detector missed, create a rule
- Sandbox everything — Never execute in evaluation environment, always isolate
async def export_rule_gap(gap: RuleGap):
"""将发现的缺口转换为CodeGuard规则。"""
rule = {
"id": f"CWE-{gap.weakness_id}-{gap.pattern_hash}",
"name": gap.suggested_name,
"description": gap.description,
"pattern": gap.pattern,
"severity": gap.severity,
"languages": gap.applicable_languages
}
# 写入CodeGuard格式
await write_codeguard_rule(rule, "rules/corpus/custom/")undefinedReferences
最佳实践
- Specification: — 130 functional requirements with rationale
spec.md - Constitution: — 11 inviolable principles
constitution.md - Glossary: — Foundry terminology
GLOSSARY.md - CodeGuard: https://project-codeguard.org — Portable rule format
- spec-kit: https://github.com/github/spec-kit — Spec-driven development workflow
- 仅从8个核心角色开始 — 在添加扩展角色前,先确保基础流水线正常运行
- 原则文档不可违背 — 每项原则都能避免实际生产中的故障
- 一切以证据为门控 — 无论置信度如何,无证据则不认定为问题
- 指纹必须稳定 — 同一位置的同一问题必须始终生成相同指纹
- 预算防止失控 — 设置token限制,强制执行,耗尽时自动停止
- 完成前检查覆盖度 — 未通过覆盖度门控前,不要标记评估完成
- 规则缺口反馈至规则库 — 当探索者发现检测器遗漏的问题时,创建新规则
- 所有操作都在沙箱中执行 — 绝不在评估环境中直接执行,始终保持隔离
—
参考资料
—
- 规范文档:— 包含130项功能需求及设计理由
spec.md - 原则文档:— 11项不可违背的原则
constitution.md - 术语表:— Foundry术语说明
GLOSSARY.md - CodeGuard:https://project-codeguard.org — 可移植规则格式
- spec-kit:https://github.com/github/spec-kit — 规范驱动开发工作流