foundry-security-spec

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Foundry Security Spec

Foundry安全规范

Skill by ara.so — Security Skills collection.
Foundry is an open specification from Cisco for building agentic AI security evaluation systems. It defines a multi-agent architecture with 8 core roles and 5 extension roles that coordinate to discover, validate, and report security findings. This is NOT a tool to install—it's a blueprint for building your own security evaluation system.
ara.so提供的技能——安全技能合集。
Foundry是Cisco推出的用于构建智能体AI安全评估系统的开放规范。它定义了包含8个核心角色和5个扩展角色的多智能体架构,这些角色协同工作以发现、验证并报告安全问题。这并非一款可安装的工具——而是用于构建您专属安全评估系统的蓝图。

Core Concepts

核心概念

Foundry provides:
  • Architecture: 8 core agent roles (Orchestrator, Planner, Navigator, Detector, Explorer, Validator, Investigator, Publisher)
  • Finding Lifecycle: States, verdicts, evidence gates, fingerprinting
  • Coordination Model: Atomic claims, heartbeat liveness, auto-blocking
  • Governance: Sandboxing, budgets, yield-gated auto-stop, coverage gates
  • Detection-to-Prevention Flywheel: Rules catch known issues, explorers find new ones, gaps become new rules
Works with CodeGuard rule format for portable detection rules that transfer between evaluation and prevention.
Foundry提供以下内容:
  • 架构:8个核心智能体角色(Orchestrator、Planner、Navigator、Detector、Explorer、Validator、Investigator、Publisher)
  • 问题生命周期:状态、判定、证据门控、指纹识别
  • 协同模型:原子声明、心跳存活检测、自动阻塞
  • 治理机制:沙箱隔离、预算控制、基于产出的自动停止、覆盖度门控
  • 检测到预防的飞轮效应:规则捕获已知问题,探索者发现新问题,漏洞缺口转化为新规则
支持CodeGuard规则格式,可在评估与预防系统间移植检测规则。

Repository Structure

仓库结构

foundry-security-spec/
├── spec.md              # Main specification (~130 functional requirements)
├── constitution.md      # 11 inviolable principles
├── GLOSSARY.md         # Terminology reference
└── README.md           # Implementation guide
foundry-security-spec/
├── spec.md              # 主规范文档(约130项功能需求)
├── constitution.md      # 11项不可违背的原则
├── GLOSSARY.md         # 术语参考
└── README.md           # 实施指南

Implementation Workflow

实施工作流

Step 1: Read the Constitution

步骤1:阅读原则文档

bash
undefined
bash
undefined

The constitution contains 11 principles that constrain all implementations

原则文档包含约束所有实施的11项原则

cat constitution.md

Key principles to understand:
- **No unsupervised execution**: Every finding requires explicit confirmation
- **Evidence-gated findings**: Claims without evidence don't become findings
- **Reproducibility**: Every finding must be reproducible from its evidence
- **Atomic progress**: Claims are indivisible units of work
- **Fail-safe defaults**: When stuck, escalate or yield—never guess
cat constitution.md

需要理解的核心原则:
- **无无监督执行**:每个问题都需要明确确认
- **证据门控问题**:无证据的声明不能成为正式问题
- **可复现性**:每个问题必须能通过其证据复现
- **原子性进展**:声明是不可分割的工作单元
- **故障安全默认值**:遇到阻塞时,升级或终止——绝不猜测

Step 2: Install spec-kit

步骤2:安装spec-kit

bash
undefined
bash
undefined

In your project directory

在您的项目目录中

npm install -g @github/spec-kit
npm install -g @github/spec-kit

or follow spec-kit installation for your coding agent

或为您的编码智能体遵循spec-kit安装流程

Initialize in your project

在项目中初始化

cd your-security-eval-project speckit init

This creates `.specify/` directory for spec-driven development.
cd your-security-eval-project speckit init

这会创建`.specify/`目录用于规范驱动开发。

Step 3: Register the Constitution

步骤3:注册原则文档

bash
undefined
bash
undefined

Copy constitution into spec-kit memory

将原则文档复制到spec-kit内存目录

cp path/to/foundry-security-spec/constitution.md .specify/memory/constitution.md
cp path/to/foundry-security-spec/constitution.md .specify/memory/constitution.md

Register it with your agent

向您的智能体注册该文档

/speckit.constitution
/speckit.constitution

Select "adopt existing constitution" when prompted

出现提示时选择“采用现有原则文档”

undefined
undefined

Step 4: Seed Your Specification

步骤4:导入规范种子

bash
undefined
bash
undefined

Create specs directory

创建规范目录

mkdir -p specs/001-foundry
mkdir -p specs/001-foundry

Copy seed specification

复制种子规范

cp path/to/foundry-security-spec/spec.md specs/001-foundry/spec.md
undefined
cp path/to/foundry-security-spec/spec.md specs/001-foundry/spec.md
undefined

Step 5: Clarify for Your Environment

步骤5:针对您的环境明确细节

bash
undefined
bash
undefined

Run clarification workflow

运行澄清工作流

/speckit.clarify

Answer questions in these categories:

**Identity & Scope:**
Q: What is your system name? A: acme-security-eval
Q: Does "authorized evaluation with source access" hold? A: yes
Q: Merge, split, or keep the 8 core roles as-is? A: keep as-is for first implementation

**Integration Choices:**
Q: Version control system? A: GitLab self-hosted at https://gitlab.internal
Q: Issue tracker? A: Jira at https://jira.acme.com
Q: LLM provider? A: OpenAI via internal gateway at https://llm.internal/v1
Q: Datastore? A: PostgreSQL
Q: Isolation runtime? A: Docker containers with network isolation
Q: Deployment target? A: Kubernetes cluster

**Policy Choices:**
Q: Severity taxonomy? A: Critical/High/Medium/Low matching our existing CVE scale
Q: Surface needs-review findings? A: No, validator rejects inconclusive findings
Q: Label naming convention? A: foundry:role/name format

**Extension Scope (recommend NO for first build):**
Q: Include Attack-Mapper role? A: no
Q: Include Regression-Tracker role? A: no
Q: Include Compliance-Mapper role? A: no
Q: Include Impact-Assessor role? A: no
Q: Include Remediation-Drafter role? A: no
undefined
/speckit.clarify

回答以下类别的问题:

**身份与范围:**
Q: 您的系统名称是什么? A: acme-security-eval
Q: 是否支持“授权访问源码的评估”? A: yes
Q: 对8个核心角色是合并、拆分还是保持原样? A: 首次实施保持原样

**集成选择:**
Q: 版本控制系统? A: GitLab自托管,地址为https://gitlab.internal
Q: 问题追踪工具? A: Jira,地址为https://jira.acme.com
Q: LLM提供商? A: OpenAI,通过内部网关https://llm.internal/v1访问
Q: 数据存储? A: PostgreSQL
Q: 隔离运行时? A: 带网络隔离的Docker容器
Q: 部署目标? A: Kubernetes集群

**策略选择:**
Q: 严重程度分类? A: Critical/High/Medium/Low,与我们现有CVE标准匹配
Q: 是否显示需审核的问题? A: 否,验证器会拒绝不确定的问题
Q: 标签命名规范? A: foundry:role/name格式

**扩展范围(首次构建建议选择NO):**
Q: 是否包含Attack-Mapper角色? A: no
Q: 是否包含Regression-Tracker角色? A: no
Q: 是否包含Compliance-Mapper角色? A: no
Q: 是否包含Impact-Assessor角色? A: no
Q: 是否包含Remediation-Drafter角色? A: no
undefined

Step 6: Generate Your Specification

步骤6:生成您的专属规范

bash
undefined
bash
undefined

Harden clarified spec

固化已澄清的规范

/speckit.specify
/speckit.specify

Check for remaining clarifications

检查是否仍有需澄清的内容

/speckit.clarify
/speckit.clarify

Repeat until no markers remain

重复操作直到没有标记项剩余


Your `specs/001-foundry/spec.md` now contains YOUR specification with decisions filled in.

您的`specs/001-foundry/spec.md`现在包含已填入决策的专属规范。

Step 7: Implement

步骤7:实施开发

bash
undefined
bash
undefined

Generate technical design

生成技术设计

/speckit.plan
/speckit.plan

Generate task backlog

生成任务待办列表

/speckit.tasks
/speckit.tasks

Start implementation

启动实施

/speckit.implement
undefined
/speckit.implement
undefined

Agent Role Implementation Examples

智能体角色实施示例

Orchestrator Pattern

Orchestrator模式

python
undefined
python
undefined

orchestrator.py

orchestrator.py

import asyncio from typing import List, Dict from datastore import FindingStore, ClaimStore from agents import Planner, Detector, Explorer, Validator
class Orchestrator: def init(self, llm_client, finding_store: FindingStore, claim_store: ClaimStore, budget_manager): self.llm = llm_client self.findings = finding_store self.claims = claim_store self.budget = budget_manager
    # Initialize agent roles
    self.planner = Planner(llm_client)
    self.detector = Detector(llm_client, rules_corpus)
    self.explorer = Explorer(llm_client)
    self.validator = Validator(llm_client)

async def evaluate(self, target_repo: str) -> Dict:
    """Run complete evaluation coordinating all agents."""
    
    # FR-001: Orchestrator creates evaluation record
    eval_id = await self.findings.create_evaluation(
        target=target_repo,
        status="running"
    )
    
    try:
        # FR-010: Planner creates work plan
        plan = await self.planner.create_plan(target_repo)
        await self.claims.store_plan(eval_id, plan)
        
        # FR-020: Distribute work to detection and exploration
        detection_task = asyncio.create_task(
            self.run_detection(eval_id, plan)
        )
        exploration_task = asyncio.create_task(
            self.run_exploration(eval_id, plan)
        )
        
        # FR-005: Monitor heartbeats and budgets
        monitor_task = asyncio.create_task(
            self.monitor_health(eval_id)
        )
        
        # Wait for completion
        await asyncio.gather(
            detection_task,
            exploration_task,
            monitor_task
        )
        
        # FR-006: Check coverage gate before completion
        coverage = await self.calculate_coverage(eval_id)
        if coverage < plan.required_coverage:
            raise InsufficientCoverageError(
                f"Coverage {coverage}% < required {plan.required_coverage}%"
            )
        
        # Mark evaluation complete
        await self.findings.update_evaluation(
            eval_id,
            status="complete",
            coverage=coverage
        )
        
        return {
            "eval_id": eval_id,
            "status": "complete",
            "findings": await self.findings.count(eval_id),
            "coverage": coverage
        }
        
    except Exception as e:
        # FR-008: Fail-safe: mark evaluation failed
        await self.findings.update_evaluation(
            eval_id,
            status="failed",
            error=str(e)
        )
        raise

async def monitor_health(self, eval_id: str):
    """Monitor agent heartbeats and budgets."""
    while True:
        await asyncio.sleep(30)
        
        # FR-007: Check heartbeats
        stalled = await self.claims.find_stalled_claims(
            eval_id,
            heartbeat_threshold=300  # 5 minutes
        )
        
        for claim in stalled:
            # FR-007: Auto-block stalled claims
            await self.claims.block_claim(
                claim.id,
                reason="heartbeat_timeout"
            )
        
        # FR-009: Check budget exhaustion
        if await self.budget.is_exhausted(eval_id):
            await self.findings.update_evaluation(
                eval_id,
                status="budget_exhausted"
            )
            break
undefined
import asyncio from typing import List, Dict from datastore import FindingStore, ClaimStore from agents import Planner, Detector, Explorer, Validator
class Orchestrator: def init(self, llm_client, finding_store: FindingStore, claim_store: ClaimStore, budget_manager): self.llm = llm_client self.findings = finding_store self.claims = claim_store self.budget = budget_manager
    # Initialize agent roles
    self.planner = Planner(llm_client)
    self.detector = Detector(llm_client, rules_corpus)
    self.explorer = Explorer(llm_client)
    self.validator = Validator(llm_client)

async def evaluate(self, target_repo: str) -> Dict:
    """Run complete evaluation coordinating all agents."""
    
    # FR-001: Orchestrator creates evaluation record
    eval_id = await self.findings.create_evaluation(
        target=target_repo,
        status="running"
    )
    
    try:
        # FR-010: Planner creates work plan
        plan = await self.planner.create_plan(target_repo)
        await self.claims.store_plan(eval_id, plan)
        
        # FR-020: Distribute work to detection and exploration
        detection_task = asyncio.create_task(
            self.run_detection(eval_id, plan)
        )
        exploration_task = asyncio.create_task(
            self.run_exploration(eval_id, plan)
        )
        
        # FR-005: Monitor heartbeats and budgets
        monitor_task = asyncio.create_task(
            self.monitor_health(eval_id)
        )
        
        # Wait for completion
        await asyncio.gather(
            detection_task,
            exploration_task,
            monitor_task
        )
        
        # FR-006: Check coverage gate before completion
        coverage = await self.calculate_coverage(eval_id)
        if coverage < plan.required_coverage:
            raise InsufficientCoverageError(
                f"Coverage {coverage}% < required {plan.required_coverage}%"
            )
        
        # Mark evaluation complete
        await self.findings.update_evaluation(
            eval_id,
            status="complete",
            coverage=coverage
        )
        
        return {
            "eval_id": eval_id,
            "status": "complete",
            "findings": await self.findings.count(eval_id),
            "coverage": coverage
        }
        
    except Exception as e:
        # FR-008: Fail-safe: mark evaluation failed
        await self.findings.update_evaluation(
            eval_id,
            status="failed",
            error=str(e)
        )
        raise

async def monitor_health(self, eval_id: str):
    """Monitor agent heartbeats and budgets."""
    while True:
        await asyncio.sleep(30)
        
        # FR-007: Check heartbeats
        stalled = await self.claims.find_stalled_claims(
            eval_id,
            heartbeat_threshold=300  # 5 minutes
        )
        
        for claim in stalled:
            # FR-007: Auto-block stalled claims
            await self.claims.block_claim(
                claim.id,
                reason="heartbeat_timeout"
            )
        
        # FR-009: Check budget exhaustion
        if await self.budget.is_exhausted(eval_id):
            await self.findings.update_evaluation(
                eval_id,
                status="budget_exhausted"
            )
            break
undefined

Detector with CodeGuard Rules

集成CodeGuard规则的Detector

python
undefined
python
undefined

detector.py

detector.py

from typing import List from codeguard import RuleEngine, Finding as CodeGuardFinding from models import Claim, Finding
class Detector: def init(self, llm_client, rules_corpus_path: str): self.llm = llm_client # FR-030: Load CodeGuard rules self.rule_engine = RuleEngine.load(rules_corpus_path)
async def process_claim(self, claim: Claim) -> List[Finding]:
    """Apply detection rules to a code claim."""
    
    # FR-031: Extract relevant code from claim
    code_units = await self.extract_code_units(claim)
    
    findings = []
    
    for unit in code_units:
        # FR-032: Run rule engine
        rule_hits = await self.rule_engine.evaluate(
            code=unit.content,
            context=unit.context,
            language=unit.language
        )
        
        for hit in rule_hits:
            # FR-033: Convert rule hit to finding
            finding = Finding(
                claim_id=claim.id,
                rule_id=hit.rule_id,
                severity=hit.severity,
                weakness_id=hit.cwe_id,
                location=hit.location,
                evidence={
                    "rule_match": hit.matched_pattern,
                    "code_snippet": unit.content,
                    "line_range": hit.line_range
                },
                verdict="confirmed",  # Rules are deterministic
                status="validated"
            )
            findings.append(finding)
            
            # FR-034: Record coverage
            await self.record_coverage(claim.id, unit.path)
    
    return findings

async def extract_code_units(self, claim: Claim):
    """Use LLM to identify relevant code units in claim scope."""
    
    prompt = f"""
    Claim: {claim.description}
    Scope: {claim.scope}
    
    Identify all code units (functions, methods, classes) that should be
    evaluated for security issues related to this claim.
    
    Return as JSON array with: path, name, start_line, end_line
    """
    
    response = await self.llm.complete(prompt)
    return parse_code_units(response)
undefined
from typing import List from codeguard import RuleEngine, Finding as CodeGuardFinding from models import Claim, Finding
class Detector: def init(self, llm_client, rules_corpus_path: str): self.llm = llm_client # FR-030: Load CodeGuard rules self.rule_engine = RuleEngine.load(rules_corpus_path)
async def process_claim(self, claim: Claim) -> List[Finding]:
    """Apply detection rules to a code claim."""
    
    # FR-031: Extract relevant code from claim
    code_units = await self.extract_code_units(claim)
    
    findings = []
    
    for unit in code_units:
        # FR-032: Run rule engine
        rule_hits = await self.rule_engine.evaluate(
            code=unit.content,
            context=unit.context,
            language=unit.language
        )
        
        for hit in rule_hits:
            # FR-033: Convert rule hit to finding
            finding = Finding(
                claim_id=claim.id,
                rule_id=hit.rule_id,
                severity=hit.severity,
                weakness_id=hit.cwe_id,
                location=hit.location,
                evidence={
                    "rule_match": hit.matched_pattern,
                    "code_snippet": unit.content,
                    "line_range": hit.line_range
                },
                verdict="confirmed",  # Rules are deterministic
                status="validated"
            )
            findings.append(finding)
            
            # FR-034: Record coverage
            await self.record_coverage(claim.id, unit.path)
    
    return findings

async def extract_code_units(self, claim: Claim):
    """Use LLM to identify relevant code units in claim scope."""
    
    prompt = f"""
    Claim: {claim.description}
    Scope: {claim.scope}
    
    Identify all code units (functions, methods, classes) that should be
    evaluated for security issues related to this claim.
    
    Return as JSON array with: path, name, start_line, end_line
    """
    
    response = await self.llm.complete(prompt)
    return parse_code_units(response)
undefined

Explorer for Novel Issues

用于发现新型问题的Explorer

python
undefined
python
undefined

explorer.py

explorer.py

import asyncio from typing import List, Optional from models import Claim, Finding, RuleGap
class Explorer: def init(self, llm_client, sandbox_runtime): self.llm = llm_client self.sandbox = sandbox_runtime
async def investigate_claim(self, claim: Claim) -> List[Finding]:
    """Creative exploration beyond static rules."""
    
    findings = []
    
    # FR-040: Generate investigation hypotheses
    hypotheses = await self.generate_hypotheses(claim)
    
    for hypothesis in hypotheses:
        # FR-041: Execute in isolated sandbox
        async with self.sandbox.session() as session:
            result = await self.test_hypothesis(
                session,
                hypothesis,
                claim
            )
            
            if result.is_vulnerability:
                # FR-042: Evidence-gated finding
                if not result.has_reproduction:
                    # Don't create finding without evidence
                    continue
                
                finding = Finding(
                    claim_id=claim.id,
                    severity=result.severity,
                    weakness_id=result.weakness_id,
                    description=result.description,
                    evidence=result.evidence,
                    verdict="needs-validation",
                    status="pending"
                )
                findings.append(finding)
                
                # FR-043: Check if rules missed this
                if await self.should_have_detected(finding):
                    await self.record_rule_gap(finding)
    
    return findings

async def generate_hypotheses(self, claim: Claim) -> List[Dict]:
    """Use LLM to generate creative test hypotheses."""
    
    prompt = f"""
    You are exploring code for security issues that static rules may miss.
    
    Claim: {claim.description}
    Code scope: {claim.scope}
    
    Generate 3-5 security hypotheses to test:
    - Focus on logic bugs, state confusion, race conditions
    - Consider what rules can't express (context-dependent issues)
    - Prioritize high-impact scenarios
    
    For each hypothesis provide:
    - What to test
    - Why it might be vulnerable
    - How to reproduce if vulnerable
    
    Return as JSON array.
    """
    
    response = await self.llm.complete(
        prompt,
        temperature=0.7  # Higher for creative exploration
    )
    return parse_hypotheses(response)

async def record_rule_gap(self, finding: Finding):
    """Record that rules failed to detect this issue."""
    
    gap = RuleGap(
        finding_id=finding.id,
        weakness_id=finding.weakness_id,
        pattern=finding.evidence.get("vulnerable_pattern"),
        reason="explorer_found_missed_by_detector",
        suggested_rule=await self.draft_rule(finding)
    )
    
    # FR-044: Feed into rule corpus improvement
    await self.rule_gaps.store(gap)
undefined
import asyncio from typing import List, Optional from models import Claim, Finding, RuleGap
class Explorer: def init(self, llm_client, sandbox_runtime): self.llm = llm_client self.sandbox = sandbox_runtime
async def investigate_claim(self, claim: Claim) -> List[Finding]:
    """Creative exploration beyond static rules."""
    
    findings = []
    
    # FR-040: Generate investigation hypotheses
    hypotheses = await self.generate_hypotheses(claim)
    
    for hypothesis in hypotheses:
        # FR-041: Execute in isolated sandbox
        async with self.sandbox.session() as session:
            result = await self.test_hypothesis(
                session,
                hypothesis,
                claim
            )
            
            if result.is_vulnerability:
                # FR-042: Evidence-gated finding
                if not result.has_reproduction:
                    # Don't create finding without evidence
                    continue
                
                finding = Finding(
                    claim_id=claim.id,
                    severity=result.severity,
                    weakness_id=result.weakness_id,
                    description=result.description,
                    evidence=result.evidence,
                    verdict="needs-validation",
                    status="pending"
                )
                findings.append(finding)
                
                # FR-043: Check if rules missed this
                if await self.should_have_detected(finding):
                    await self.record_rule_gap(finding)
    
    return findings

async def generate_hypotheses(self, claim: Claim) -> List[Dict]:
    """Use LLM to generate creative test hypotheses."""
    
    prompt = f"""
    You are exploring code for security issues that static rules may miss.
    
    Claim: {claim.description}
    Code scope: {claim.scope}
    
    Generate 3-5 security hypotheses to test:
    - Focus on logic bugs, state confusion, race conditions
    - Consider what rules can't express (context-dependent issues)
    - Prioritize high-impact scenarios
    
    For each hypothesis provide:
    - What to test
    - Why it might be vulnerable
    - How to reproduce if vulnerable
    
    Return as JSON array.
    """
    
    response = await self.llm.complete(
        prompt,
        temperature=0.7  # Higher for creative exploration
    )
    return parse_hypotheses(response)

async def record_rule_gap(self, finding: Finding):
    """Record that rules failed to detect this issue."""
    
    gap = RuleGap(
        finding_id=finding.id,
        weakness_id=finding.weakness_id,
        pattern=finding.evidence.get("vulnerable_pattern"),
        reason="explorer_found_missed_by_detector",
        suggested_rule=await self.draft_rule(finding)
    )
    
    # FR-044: Feed into rule corpus improvement
    await self.rule_gaps.store(gap)
undefined

Validator for Finding Confirmation

用于确认问题的Validator

python
undefined
python
undefined

validator.py

validator.py

from models import Finding, ValidationResult
class Validator: def init(self, llm_client, sandbox_runtime): self.llm = llm_client self.sandbox = sandbox_runtime
async def validate_finding(self, finding: Finding) -> ValidationResult:
    """Reproduce and confirm finding from evidence."""
    
    # FR-050: Check evidence completeness
    if not self.has_sufficient_evidence(finding):
        return ValidationResult(
            verdict="rejected",
            reason="insufficient_evidence"
        )
    
    # FR-051: Attempt reproduction
    async with self.sandbox.session() as session:
        reproduced = await self.reproduce_issue(
            session,
            finding.evidence
        )
        
        if not reproduced:
            return ValidationResult(
                verdict="rejected",
                reason="not_reproducible"
            )
        
        # FR-052: Verify severity assessment
        actual_severity = await self.assess_severity(
            session,
            finding
        )
        
        if actual_severity != finding.severity:
            finding.severity = actual_severity
            finding.evidence["severity_adjustment"] = {
                "original": finding.severity,
                "validated": actual_severity
            }
        
        # FR-053: Generate fingerprint for deduplication
        fingerprint = await self.generate_fingerprint(finding)
        
        return ValidationResult(
            verdict="confirmed",
            fingerprint=fingerprint,
            severity=actual_severity,
            reproduction_evidence=session.get_transcript()
        )

def has_sufficient_evidence(self, finding: Finding) -> bool:
    """Check if finding has required evidence."""
    required = ["location", "description"]
    
    if finding.severity in ["critical", "high"]:
        required.extend(["reproduction_steps", "impact"])
    
    return all(k in finding.evidence for k in required)

async def generate_fingerprint(self, finding: Finding) -> str:
    """Create stable fingerprint for deduplication."""
    
    # FR-054: Fingerprint combines weakness + location + root cause
    components = [
        finding.weakness_id,
        finding.location.get("file_path"),
        finding.location.get("function_name"),
        finding.evidence.get("root_cause_pattern")
    ]
    
    fingerprint_input = "|".join(str(c) for c in components if c)
    return hashlib.sha256(fingerprint_input.encode()).hexdigest()[:16]
undefined
from models import Finding, ValidationResult
class Validator: def init(self, llm_client, sandbox_runtime): self.llm = llm_client self.sandbox = sandbox_runtime
async def validate_finding(self, finding: Finding) -> ValidationResult:
    """Reproduce and confirm finding from evidence."""
    
    # FR-050: Check evidence completeness
    if not self.has_sufficient_evidence(finding):
        return ValidationResult(
            verdict="rejected",
            reason="insufficient_evidence"
        )
    
    # FR-051: Attempt reproduction
    async with self.sandbox.session() as session:
        reproduced = await self.reproduce_issue(
            session,
            finding.evidence
        )
        
        if not reproduced:
            return ValidationResult(
                verdict="rejected",
                reason="not_reproducible"
            )
        
        # FR-052: Verify severity assessment
        actual_severity = await self.assess_severity(
            session,
            finding
        )
        
        if actual_severity != finding.severity:
            finding.severity = actual_severity
            finding.evidence["severity_adjustment"] = {
                "original": finding.severity,
                "validated": actual_severity
            }
        
        # FR-053: Generate fingerprint for deduplication
        fingerprint = await self.generate_fingerprint(finding)
        
        return ValidationResult(
            verdict="confirmed",
            fingerprint=fingerprint,
            severity=actual_severity,
            reproduction_evidence=session.get_transcript()
        )

def has_sufficient_evidence(self, finding: Finding) -> bool:
    """Check if finding has required evidence."""
    required = ["location", "description"]
    
    if finding.severity in ["critical", "high"]:
        required.extend(["reproduction_steps", "impact"])
    
    return all(k in finding.evidence for k in required)

async def generate_fingerprint(self, finding: Finding) -> str:
    """Create stable fingerprint for deduplication."""
    
    # FR-054: Fingerprint combines weakness + location + root cause
    components = [
        finding.weakness_id,
        finding.location.get("file_path"),
        finding.location.get("function_name"),
        finding.evidence.get("root_cause_pattern")
    ]
    
    fingerprint_input = "|".join(str(c) for c in components if c)
    return hashlib.sha256(fingerprint_input.encode()).hexdigest()[:16]
undefined

Publisher for Issue Tracker Integration

集成问题追踪工具的Publisher

python
undefined
python
undefined

publisher.py

publisher.py

import aiohttp from typing import Dict from models import Finding
class Publisher: def init(self, issue_tracker_config: Dict): self.tracker_url = issue_tracker_config["url"] self.project_key = issue_tracker_config["project"] self.api_token = issue_tracker_config["token"] # From env: ISSUE_TRACKER_TOKEN
async def publish_finding(self, finding: Finding) -> str:
    """Create issue in tracker for confirmed finding."""
    
    # FR-060: Only publish confirmed findings
    if finding.verdict != "confirmed":
        raise ValueError(f"Cannot publish {finding.verdict} finding")
    
    # FR-061: Check for existing issue via fingerprint
    existing = await self.find_existing_issue(finding.fingerprint)
    if existing:
        return existing.issue_id
    
    # FR-062: Format issue according to tracker schema
    issue_body = self.format_issue(finding)
    
    # FR-063: Create issue
    async with aiohttp.ClientSession() as session:
        async with session.post(
            f"{self.tracker_url}/rest/api/2/issue",
            headers={
                "Authorization": f"Bearer {self.api_token}",
                "Content-Type": "application/json"
            },
            json=issue_body
        ) as resp:
            resp.raise_for_status()
            result = await resp.json()
            issue_id = result["key"]
    
    # FR-064: Update finding with issue reference
    finding.issue_id = issue_id
    finding.status = "published"
    await finding.save()
    
    return issue_id

def format_issue(self, finding: Finding) -> Dict:
    """Format finding as issue tracker ticket."""
    
    description = f"""
    *Security Finding from Foundry Evaluation*
    
    *Severity:* {finding.severity.upper()}
    *Weakness:* {finding.weakness_id}
    *Location:* {finding.location.get('file_path')}:{finding.location.get('line_number')}
    
    h3. Description
    {finding.description}
    
    h3. Evidence
    {self.format_evidence(finding.evidence)}
    
    h3. Reproduction
    {finding.evidence.get('reproduction_steps', 'See evidence above')}
    
    ---
    Fingerprint: {finding.fingerprint}
    Evaluation ID: {finding.eval_id}
    """
    
    return {
        "fields": {
            "project": {"key": self.project_key},
            "summary": f"[{finding.severity.upper()}] {finding.weakness_id}: {finding.get_short_description()}",
            "description": description,
            "issuetype": {"name": "Security Vulnerability"},
            "priority": {"name": self.map_severity_to_priority(finding.severity)},
            "labels": [
                f"foundry:eval:{finding.eval_id}",
                f"foundry:weakness:{finding.weakness_id}",
                f"foundry:fingerprint:{finding.fingerprint}"
            ]
        }
    }
undefined
import aiohttp from typing import Dict from models import Finding
class Publisher: def init(self, issue_tracker_config: Dict): self.tracker_url = issue_tracker_config["url"] self.project_key = issue_tracker_config["project"] self.api_token = issue_tracker_config["token"] # From env: ISSUE_TRACKER_TOKEN
async def publish_finding(self, finding: Finding) -> str:
    """Create issue in tracker for confirmed finding."""
    
    # FR-060: Only publish confirmed findings
    if finding.verdict != "confirmed":
        raise ValueError(f"Cannot publish {finding.verdict} finding")
    
    # FR-061: Check for existing issue via fingerprint
    existing = await self.find_existing_issue(finding.fingerprint)
    if existing:
        return existing.issue_id
    
    # FR-062: Format issue according to tracker schema
    issue_body = self.format_issue(finding)
    
    # FR-063: Create issue
    async with aiohttp.ClientSession() as session:
        async with session.post(
            f"{self.tracker_url}/rest/api/2/issue",
            headers={
                "Authorization": f"Bearer {self.api_token}",
                "Content-Type": "application/json"
            },
            json=issue_body
        ) as resp:
            resp.raise_for_status()
            result = await resp.json()
            issue_id = result["key"]
    
    # FR-064: Update finding with issue reference
    finding.issue_id = issue_id
    finding.status = "published"
    await finding.save()
    
    return issue_id

def format_issue(self, finding: Finding) -> Dict:
    """Format finding as issue tracker ticket."""
    
    description = f"""
    *Security Finding from Foundry Evaluation*
    
    *Severity:* {finding.severity.upper()}
    *Weakness:* {finding.weakness_id}
    *Location:* {finding.location.get('file_path')}:{finding.location.get('line_number')}
    
    h3. Description
    {finding.description}
    
    h3. Evidence
    {self.format_evidence(finding.evidence)}
    
    h3. Reproduction
    {finding.evidence.get('reproduction_steps', 'See evidence above')}
    
    ---
    Fingerprint: {finding.fingerprint}
    Evaluation ID: {finding.eval_id}
    """
    
    return {
        "fields": {
            "project": {"key": self.project_key},
            "summary": f"[{finding.severity.upper()}] {finding.weakness_id}: {finding.get_short_description()}",
            "description": description,
            "issuetype": {"name": "Security Vulnerability"},
            "priority": {"name": self.map_severity_to_priority(finding.severity)},
            "labels": [
                f"foundry:eval:{finding.eval_id}",
                f"foundry:weakness:{finding.weakness_id}",
                f"foundry:fingerprint:{finding.fingerprint}"
            ]
        }
    }
undefined

Configuration Examples

配置示例

Evaluation Configuration

评估配置

yaml
undefined
yaml
undefined

config/evaluation.yaml

config/evaluation.yaml

evaluation: name: "acme-security-eval"

Orchestrator settings

orchestrator: max_concurrent_claims: 10 heartbeat_interval: 60 stall_threshold: 300

Budget limits

budget: max_tokens: 10000000 max_duration_hours: 48 per_agent_token_limit: 1000000

Coverage requirements

coverage: required_percentage: 80 scope: - src//*.py - lib//*.js exclude: - tests/** - docs/**

Agent roles enabled

agents: core: - orchestrator - planner - navigator - detector - explorer - validator - investigator - publisher extensions: [] # Start with core only

Integration endpoints

integrations: llm: provider: "openai" endpoint: "${LLM_GATEWAY_URL}" model: "gpt-4" api_key: "${LLM_API_KEY}"
vcs:
  type: "gitlab"
  url: "${GITLAB_URL}"
  token: "${GITLAB_TOKEN}"

issue_tracker:
  type: "jira"
  url: "${JIRA_URL}"
  project: "SEC"
  token: "${JIRA_TOKEN}"

datastore:
  type: "postgresql"
  connection_string: "${DATABASE_URL}"

Security controls

sandbox: runtime: "docker" network_isolation: true timeout_seconds: 300 resource_limits: cpu: "1" memory: "2Gi"
undefined
evaluation: name: "acme-security-eval"

Orchestrator设置

orchestrator: max_concurrent_claims: 10 heartbeat_interval: 60 stall_threshold: 300

预算限制

budget: max_tokens: 10000000 max_duration_hours: 48 per_agent_token_limit: 1000000

覆盖度要求

coverage: required_percentage: 80 scope: - src//*.py - lib//*.js exclude: - tests/** - docs/**

启用的智能体角色

agents: core: - orchestrator - planner - navigator - detector - explorer - validator - investigator - publisher extensions: [] # 先从核心角色开始

集成端点

integrations: llm: provider: "openai" endpoint: "${LLM_GATEWAY_URL}" model: "gpt-4" api_key: "${LLM_API_KEY}"
vcs:
  type: "gitlab"
  url: "${GITLAB_URL}"
  token: "${GITLAB_TOKEN}"

issue_tracker:
  type: "jira"
  url: "${JIRA_URL}"
  project: "SEC"
  token: "${JIRA_TOKEN}"

datastore:
  type: "postgresql"
  connection_string: "${DATABASE_URL}"

安全控制

sandbox: runtime: "docker" network_isolation: true timeout_seconds: 300 resource_limits: cpu: "1" memory: "2Gi"
undefined

Detection Rules Configuration

检测规则配置

yaml
undefined
yaml
undefined

config/detection.yaml

config/detection.yaml

detection:

CodeGuard rules corpus

rules_corpus: "/etc/foundry/rules/codeguard"

Rule categories to enable

enabled_categories: - injection - authentication - authorization - cryptography - data-exposure - configuration

Severity mapping

severity_mapping: critical: ["CWE-89", "CWE-78", "CWE-79"] # SQLi, Command Injection, XSS high: ["CWE-306", "CWE-862"] # Missing Auth, Authorization medium: ["CWE-327", "CWE-338"] # Weak Crypto, Weak PRNG low: ["CWE-209", "CWE-532"] # Info Leak, Log Injection

Rule gap tracking

rule_gaps: enabled: true auto_draft_rules: true review_queue: "rule-improvements"
undefined
detection:

CodeGuard规则库

rules_corpus: "/etc/foundry/rules/codeguard"

启用的规则类别

enabled_categories: - injection - authentication - authorization - cryptography - data-exposure - configuration

严重程度映射

severity_mapping: critical: ["CWE-89", "CWE-78", "CWE-79"] # SQL注入、命令注入、XSS high: ["CWE-306", "CWE-862"] # 缺失认证、权限控制缺失 medium: ["CWE-327", "CWE-338"] # 弱加密、弱随机数生成器 low: ["CWE-209", "CWE-532"] # 信息泄露、日志注入

规则缺口追踪

rule_gaps: enabled: true auto_draft_rules: true review_queue: "rule-improvements"
undefined

Running an Evaluation

运行评估

python
undefined
python
undefined

run_evaluation.py

run_evaluation.py

import asyncio from orchestrator import Orchestrator from config import load_config
async def main(): # Load configuration config = load_config("config/evaluation.yaml")
# Initialize orchestrator
orchestrator = Orchestrator(
    llm_client=create_llm_client(config.integrations.llm),
    finding_store=FindingStore(config.integrations.datastore),
    claim_store=ClaimStore(config.integrations.datastore),
    budget_manager=BudgetManager(config.budget)
)

# Run evaluation
result = await orchestrator.evaluate(
    target_repo="https://gitlab.internal/acme/webapp"
)

print(f"Evaluation {result['eval_id']} complete")
print(f"Findings: {result['findings']}")
print(f"Coverage: {result['coverage']}%")
if name == "main": asyncio.run(main())
undefined
import asyncio from orchestrator import Orchestrator from config import load_config
async def main(): # 加载配置 config = load_config("config/evaluation.yaml")
# 初始化Orchestrator
orchestrator = Orchestrator(
    llm_client=create_llm_client(config.integrations.llm),
    finding_store=FindingStore(config.integrations.datastore),
    claim_store=ClaimStore(config.integrations.datastore),
    budget_manager=BudgetManager(config.budget)
)

# 运行评估
result = await orchestrator.evaluate(
    target_repo="https://gitlab.internal/acme/webapp"
)

print(f"Evaluation {result['eval_id']} complete")
print(f"Findings: {result['findings']}")
print(f"Coverage: {result['coverage']}%")
if name == "main": asyncio.run(main())
undefined

Common Patterns

通用模式

Atomic Claim Processing

原子声明处理

python
undefined
python
undefined

Every agent processes claims atomically

每个智能体都以原子方式处理声明

async def process_claim(self, claim: Claim): # Claim the work atomically if not await self.claims.try_claim(claim.id, self.agent_id): return # Another agent got it
try:
    # Send heartbeat while working
    heartbeat_task = asyncio.create_task(
        self.send_heartbeats(claim.id)
    )
    
    # Do the work
    result = await self.do_work(claim)
    
    # Mark complete
    await self.claims.complete(claim.id, result)
    
except Exception as e:
    # Fail claim, don't retry (constitution principle)
    await self.claims.fail(claim.id, str(e))
finally:
    heartbeat_task.cancel()
undefined
async def process_claim(self, claim: Claim): # 原子性抢占工作 if not await self.claims.try_claim(claim.id, self.agent_id): return # 其他智能体已抢占
try:
    # 工作时发送心跳
    heartbeat_task = asyncio.create_task(
        self.send_heartbeats(claim.id)
    )
    
    # 执行工作
    result = await self.do_work(claim)
    
    # 标记完成
    await self.claims.complete(claim.id, result)
    
except Exception as e:
    # 声明失败,不重试(原则要求)
    await self.claims.fail(claim.id, str(e))
finally:
    heartbeat_task.cancel()
undefined

Evidence-Gated Finding Creation

证据门控的问题创建

python
undefined
python
undefined

Never create findings without evidence

绝不创建无证据的问题

def create_finding(self, claim: Claim, issue: Dict) -> Optional[Finding]: # Check evidence requirements if not issue.get("location"): logger.warning(f"No location for issue in {claim.id}, skipping") return None
if not issue.get("reproduction"):
    logger.warning(f"No reproduction for issue in {claim.id}, skipping")
    return None

# Evidence is sufficient
return Finding(
    claim_id=claim.id,
    location=issue["location"],
    evidence={
        "reproduction": issue["reproduction"],
        "impact": issue["impact"],
        "code_snippet": issue["code"]
    },
    verdict="needs-validation"
)
undefined
def create_finding(self, claim: Claim, issue: Dict) -> Optional[Finding]: # 检查证据要求 if not issue.get("location"): logger.warning(f"Claim {claim.id}中的问题无位置信息,跳过") return None
if not issue.get("reproduction"):
    logger.warning(f"Claim {claim.id}中的问题无复现信息,跳过")
    return None

# 证据充足
return Finding(
    claim_id=claim.id,
    location=issue["location"],
    evidence={
        "reproduction": issue["reproduction"],
        "impact": issue["impact"],
        "code_snippet": issue["code"]
    },
    verdict="needs-validation"
)
undefined

Budget Enforcement

预算强制执行

python
class BudgetManager:
    async def check_budget(self, eval_id: str, tokens_requested: int) -> bool:
        """Check if budget allows operation."""
        used = await self.get_tokens_used(eval_id)
        limit = self.config.max_tokens
        
        if used + tokens_requested > limit:
            await self.notify_budget_exhausted(eval_id)
            return False
        
        return True
    
    async def record_usage(self, eval_id: str, tokens: int):
        """Record token usage."""
        await self.db.execute(
            "INSERT INTO token_usage (eval_id, tokens, timestamp) VALUES ($1, $2, NOW())",
            eval_id, tokens
        )
python
class BudgetManager:
    async def check_budget(self, eval_id: str, tokens_requested: int) -> bool:
        """检查预算是否允许操作。"""
        used = await self.get_tokens_used(eval_id)
        limit = self.config.max_tokens
        
        if used + tokens_requested > limit:
            await self.notify_budget_exhausted(eval_id)
            return False
        
        return True
    
    async def record_usage(self, eval_id: str, tokens: int):
        """记录token使用量。"""
        await self.db.execute(
            "INSERT INTO token_usage (eval_id, tokens, timestamp) VALUES ($1, $2, NOW())",
            eval_id, tokens
        )

Troubleshooting

故障排查

Agents Not Finding Issues

智能体未发现问题

Check rule corpus:
bash
undefined
检查规则库:
bash
undefined

Verify rules loaded

验证规则已加载

foundry-ctl list-rules --corpus /etc/foundry/rules/codeguard
foundry-ctl list-rules --corpus /etc/foundry/rules/codeguard

Test rule against sample code

测试规则对示例代码的有效性

foundry-ctl test-rule CWE-89 --file sample.py

**Check explorer creativity:**
```python
foundry-ctl test-rule CWE-89 --file sample.py

**检查探索者的创造性:**
```python

Increase temperature for hypothesis generation

提高假设生成的温度值

hypotheses = await self.llm.complete( prompt, temperature=0.8 # Higher = more creative, less reliable )
undefined
hypotheses = await self.llm.complete( prompt, temperature=0.8 # 值越高越具创造性,可靠性越低 )
undefined

Claims Stalling

声明停滞

Check heartbeat configuration:
python
undefined
检查心跳配置:
python
undefined

Ensure heartbeats are sent frequently enough

确保心跳发送频率足够高

HEARTBEAT_INTERVAL = 60 # seconds STALL_THRESHOLD = 300 # 5 minutes
HEARTBEAT_INTERVAL = 60 # 秒 STALL_THRESHOLD = 300 # 5分钟

Heartbeats must be faster than threshold

心跳间隔必须小于阈值的一半

assert HEARTBEAT_INTERVAL < STALL_THRESHOLD / 2

**Check for deadlocks:**
```sql
-- Find stalled claims
SELECT claim_id, agent_id, last_heartbeat, status
FROM claims
WHERE status = 'in_progress'
  AND last_heartbeat < NOW() - INTERVAL '5 minutes';
assert HEARTBEAT_INTERVAL < STALL_THRESHOLD / 2

**检查死锁:**
```sql

Findings Not Publishing

查找停滞的声明

Check verdict state:
python
undefined
SELECT claim_id, agent_id, last_heartbeat, status FROM claims WHERE status = 'in_progress' AND last_heartbeat < NOW() - INTERVAL '5 minutes';
undefined

Only confirmed findings publish

问题未发布

if finding.verdict != "confirmed": logger.error(f"Cannot publish {finding.id}, verdict={finding.verdict}") # Finding needs validation first

**Check deduplication:**
```python
检查判定状态:
python
undefined

Verify fingerprints are stable

仅已确认的问题会被发布

f1 = generate_fingerprint(finding) f2 = generate_fingerprint(finding) assert f1 == f2, "Fingerprints must be deterministic"
undefined
if finding.verdict != "confirmed": logger.error(f"无法发布问题{finding.id},判定状态={finding.verdict}") # 问题需先经过验证

**检查去重机制:**
```python

Low Coverage

验证指纹是否稳定

Check scope configuration:
yaml
undefined
f1 = generate_fingerprint(finding) f2 = generate_fingerprint(finding) assert f1 == f2, "指纹必须具有确定性"
undefined

Ensure scope includes all target code

覆盖度低

coverage: scope: - src//*.py # Include all source - lib//*.{js,ts} # Multiple extensions exclude: - tests/** # Don't count tests - vendor/** # Don't count dependencies

**Check claim distribution:**
```python
检查范围配置:
yaml
undefined

Verify planner creates sufficient claims

确保范围包含所有目标代码

plan = await planner.create_plan(target) print(f"Claims created: {len(plan.claims)}") print(f"Surface area: {plan.surface_area}")
coverage: scope: - src//*.py # 包含所有源码 - lib//*.{js,ts} # 支持多种扩展名 exclude: - tests/** # 不统计测试代码 - vendor/** # 不统计依赖库

**检查声明分配:**
```python

Claims should cover all in-scope files

验证规划器是否创建了足够的声明

undefined
plan = await planner.create_plan(target) print(f"创建的声明数量: {len(plan.claims)}") print(f"覆盖范围: {plan.surface_area}")

Integration with CodeGuard

声明应覆盖所有范围内的文件

python
undefined
undefined

Load CodeGuard rules as Detector corpus

与CodeGuard集成

from codeguard import RuleEngine
class Detector: def init(self, rules_path: str): # Load portable CodeGuard rules self.rules = RuleEngine.load(rules_path)
async def scan(self, code: str, language: str):
    # Rules execute deterministically
    return await self.rules.evaluate(code, language)
python
undefined

Export rule gaps back to CodeGuard corpus

加载CodeGuard规则作为Detector的规则库

async def export_rule_gap(gap: RuleGap): """Convert discovered gap into CodeGuard rule.""" rule = { "id": f"CWE-{gap.weakness_id}-{gap.pattern_hash}", "name": gap.suggested_name, "description": gap.description, "pattern": gap.pattern, "severity": gap.severity, "languages": gap.applicable_languages }
# Write to CodeGuard format
await write_codeguard_rule(rule, "rules/corpus/custom/")
undefined
from codeguard import RuleEngine
class Detector: def init(self, rules_path: str): # 加载可移植的CodeGuard规则 self.rules = RuleEngine.load(rules_path)
async def scan(self, code: str, language: str):
    # 规则执行具有确定性
    return await self.rules.evaluate(code, language)

Best Practices

将规则缺口导出回CodeGuard规则库

  1. Start with 8 core roles only — Get foundational pipeline working before adding extensions
  2. Constitution is non-negotiable — Each principle prevents a real production failure
  3. Evidence gates everything — No evidence = no finding, regardless of confidence
  4. Fingerprints must be stable — Same issue in same place = same fingerprint always
  5. Budgets prevent runaway — Set token limits, enforce them, auto-stop when exhausted
  6. Coverage before completion — Don't mark evaluation complete until coverage gate passes
  7. Rule gaps feed corpus — When explorer finds something detector missed, create a rule
  8. Sandbox everything — Never execute in evaluation environment, always isolate
async def export_rule_gap(gap: RuleGap): """将发现的缺口转换为CodeGuard规则。""" rule = { "id": f"CWE-{gap.weakness_id}-{gap.pattern_hash}", "name": gap.suggested_name, "description": gap.description, "pattern": gap.pattern, "severity": gap.severity, "languages": gap.applicable_languages }
# 写入CodeGuard格式
await write_codeguard_rule(rule, "rules/corpus/custom/")
undefined

References

最佳实践

  • Specification:
    spec.md
    — 130 functional requirements with rationale
  • Constitution:
    constitution.md
    — 11 inviolable principles
  • Glossary:
    GLOSSARY.md
    — Foundry terminology
  • CodeGuard: https://project-codeguard.org — Portable rule format
  • spec-kit: https://github.com/github/spec-kit — Spec-driven development workflow
  1. 仅从8个核心角色开始 — 在添加扩展角色前,先确保基础流水线正常运行
  2. 原则文档不可违背 — 每项原则都能避免实际生产中的故障
  3. 一切以证据为门控 — 无论置信度如何,无证据则不认定为问题
  4. 指纹必须稳定 — 同一位置的同一问题必须始终生成相同指纹
  5. 预算防止失控 — 设置token限制,强制执行,耗尽时自动停止
  6. 完成前检查覆盖度 — 未通过覆盖度门控前,不要标记评估完成
  7. 规则缺口反馈至规则库 — 当探索者发现检测器遗漏的问题时,创建新规则
  8. 所有操作都在沙箱中执行 — 绝不在评估环境中直接执行,始终保持隔离

参考资料

  • 规范文档
    spec.md
    — 包含130项功能需求及设计理由
  • 原则文档
    constitution.md
    — 11项不可违背的原则
  • 术语表
    GLOSSARY.md
    — Foundry术语说明
  • CodeGuardhttps://project-codeguard.org — 可移植规则格式
  • spec-kithttps://github.com/github/spec-kit — 规范驱动开发工作流