agentica-infrastructure

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Agentica Infrastructure Reference

Agentica基础设施参考

Complete API specification for Agentica multi-agent coordination infrastructure.
Agentica多智能体协调基础设施的完整API规范。

When to Use

使用场景

  • Building multi-agent workflows with Agentica patterns
  • Need exact constructor signatures for pattern classes
  • Want to understand coordination database schema
  • Implementing custom patterns using primitives
  • Debugging agent tracking or orphan detection
  • 使用Agentica模式构建多智能体工作流
  • 需要模式类的精确构造函数签名
  • 希望了解协调数据库的架构
  • 使用原语实现自定义模式
  • 调试智能体追踪或孤立智能体检测

Quick Reference

快速参考

11 Pattern Classes

11种模式类

PatternPurposeKey Method
Swarm
Parallel perspectives
.execute(query)
Pipeline
Sequential stages
.run(initial_state)
Hierarchical
Coordinator + specialists
.execute(task)
Jury
Voting consensus
.decide(return_type, question)
GeneratorCritic
Iterative refinement
.run(task)
CircuitBreaker
Failure fallback
.execute(query)
Adversarial
Debate + judge
.resolve(question)
ChainOfResponsibility
Route to handler
.process(query)
MapReduce
Fan out + reduce
.execute(query, chunks)
Blackboard
Shared state
.solve(query)
EventDriven
Event bus
.publish(event)
Pattern用途核心方法
Swarm
并行视角收集
.execute(query)
Pipeline
串行阶段执行
.run(initial_state)
Hierarchical
协调者+专家模式
.execute(task)
Jury
投票共识决策
.decide(return_type, question)
GeneratorCritic
迭代优化
.run(task)
CircuitBreaker
故障降级
.execute(query)
Adversarial
辩论+裁决
.resolve(question)
ChainOfResponsibility
责任链路由
.process(query)
MapReduce
扇出+归约
.execute(query, chunks)
Blackboard
共享状态管理
.solve(query)
EventDriven
事件驱动
.publish(event)

Core Infrastructure

核心基础设施

ComponentFilePurpose
CoordinationDB
coordination.py
SQLite tracking
tracked_spawn
tracked_agent.py
Agent with tracking
HandoffAtom
handoff_atom.py
Universal handoff format
BlackboardCache
blackboard.py
Hot tier communication
MemoryService
memory_service.py
Core + Archival memory
create_claude_scope
claude_scope.py
Scope with file ops
组件文件用途
CoordinationDB
coordination.py
SQLite追踪管理
tracked_spawn
tracked_agent.py
带追踪功能的智能体
HandoffAtom
handoff_atom.py
通用智能体交接格式
BlackboardCache
blackboard.py
热层通信缓存
MemoryService
memory_service.py
核心+归档记忆服务
create_claude_scope
claude_scope.py
带文件操作的作用域

Primitives

原语

PrimitivePurpose
Consensus
Voting (MAJORITY, UNANIMOUS, THRESHOLD)
Aggregator
Combine results (MERGE, CONCAT, BEST)
HandoffState
Structured agent handoff
build_premise
Structured premise builder
gather_fail_fast
TaskGroup-based parallel execution
原语用途
Consensus
投票机制(多数决、一致决、阈值决)
Aggregator
结果合并(合并、拼接、最优选择)
HandoffState
结构化智能体交接状态
build_premise
结构化前提构建器
gather_fail_fast
基于TaskGroup的快速失败并行执行

Full API Spec

完整API规范

See:
API_SPEC.md
in this skill directory
详见:本技能目录下的
API_SPEC.md

Usage Example

使用示例

python
from scripts.agentica_patterns.patterns import Swarm, Jury
from scripts.agentica_patterns.primitives import ConsensusMode
from scripts.agentica_patterns.coordination import CoordinationDB
from scripts.agentica_patterns.tracked_agent import tracked_spawn
python
from scripts.agentica_patterns.patterns import Swarm, Jury
from scripts.agentica_patterns.primitives import ConsensusMode
from scripts.agentica_patterns.coordination import CoordinationDB
from scripts.agentica_patterns.tracked_agent import tracked_spawn

Create tracking database

Create tracking database

db = CoordinationDB(session_id="my-session")
db = CoordinationDB(session_id="my-session")

Swarm with tracking

Swarm with tracking

swarm = Swarm( perspectives=["Security expert", "Performance expert"], db=db ) result = await swarm.execute("Review this code")
swarm = Swarm( perspectives=["Security expert", "Performance expert"], db=db ) result = await swarm.execute("Review this code")

Jury with consensus

Jury with consensus

jury = Jury( num_jurors=3, consensus_mode=ConsensusMode.MAJORITY, premise="You evaluate code quality", db=db ) verdict = await jury.decide(bool, "Is this code production ready?")
undefined
jury = Jury( num_jurors=3, consensus_mode=ConsensusMode.MAJORITY, premise="You evaluate code quality", db=db ) verdict = await jury.decide(bool, "Is this code production ready?")
undefined

Location

位置

API spec:
.claude/skills/agentica-infrastructure/API_SPEC.md
Source:
scripts/agentica_patterns/
API规范:
.claude/skills/agentica-infrastructure/API_SPEC.md
源码:
scripts/agentica_patterns/