a2a-protocol

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

A2A Protocol Implementation Guide

A2A协议实现指南

This skill provides comprehensive knowledge for building, deploying, and interacting with agents using the Agent2Agent (A2A) Protocol v0.3.0.

本指南提供了使用Agent2Agent(A2A)协议v0.3.0构建、部署和交互Agent的全面知识。

Protocol Overview

协议概述

A2A is a standard protocol enabling AI agents to communicate and collaborate. It operates across three layers:
LayerDescription
Data ModelCore structures (Task, Message, AgentCard, Part, Artifact)
Abstract OperationsProtocol-agnostic capabilities (SendMessage, GetTask, etc.)
Protocol BindingsConcrete implementations (JSON-RPC 2.0, gRPC, HTTP/REST)

A2A是一款支持AI Agent通信与协作的标准协议,它分为三个层级运作:
层级描述
数据模型层核心结构(Task、Message、AgentCard、Part、Artifact)
抽象操作层与协议无关的能力(SendMessage、GetTask等)
协议绑定层具体实现方式(JSON-RPC 2.0、gRPC、HTTP/REST)

Core Data Structures

核心数据结构

1. AgentCard

1. AgentCard

Self-describing manifest hosted at
/.well-known/agent-card.json
:
json
{
  "name": "my_agent",
  "description": "Agent description",
  "url": "http://localhost:8080/",
  "version": "1.0.0",
  "protocolVersion": "0.3.0",
  "defaultInputModes": ["text"],
  "defaultOutputModes": ["text"],
  "capabilities": {
    "streaming": true,
    "pushNotifications": false,
    "extendedAgentCard": false
  },
  "skills": [
    {
      "id": "skill_id",
      "name": "Skill Name",
      "description": "What this skill does",
      "examples": ["Example query 1", "Example query 2"],
      "tags": []
    }
  ],
  "securitySchemes": {},
  "security": []
}
托管在
/.well-known/agent-card.json
的自描述清单:
json
{
  "name": "my_agent",
  "description": "Agent description",
  "url": "http://localhost:8080/",
  "version": "1.0.0",
  "protocolVersion": "0.3.0",
  "defaultInputModes": ["text"],
  "defaultOutputModes": ["text"],
  "capabilities": {
    "streaming": true,
    "pushNotifications": false,
    "extendedAgentCard": false
  },
  "skills": [
    {
      "id": "skill_id",
      "name": "Skill Name",
      "description": "What this skill does",
      "examples": ["Example query 1", "Example query 2"],
      "tags": []
    }
  ],
  "securitySchemes": {},
  "security": []
}

2. Task

2. Task

The core unit of work with lifecycle management:
json
{
  "id": "task_123",
  "contextId": "context_456",
  "status": {
    "state": "working",
    "timestamp": "2024-01-01T00:00:00Z"
  },
  "artifacts": [],
  "history": [],
  "metadata": {}
}
Task States:
StateDescription
submitted
Task received, not yet processing
working
Active processing
input-required
Awaiting client response
auth-required
Awaiting authentication
completed
Successfully finished
failed
Terminated with error
cancelled
Client-requested cancellation
rejected
Agent refused processing
具备生命周期管理的核心工作单元:
json
{
  "id": "task_123",
  "contextId": "context_456",
  "status": {
    "state": "working",
    "timestamp": "2024-01-01T00:00:00Z"
  },
  "artifacts": [],
  "history": [],
  "metadata": {}
}
Task状态:
状态描述
submitted
任务已接收,尚未开始处理
working
任务正在处理中
input-required
等待客户端响应
auth-required
等待身份验证
completed
任务已成功完成
failed
任务执行失败并终止
cancelled
客户端请求取消任务
rejected
Agent拒绝处理任务

3. Message

3. Message

Communication unit between client and server:
json
{
  "messageId": "msg_789",
  "role": "user",
  "parts": [
    {
      "type": "text",
      "text": "Hello, agent!"
    }
  ],
  "contextId": "context_456",
  "taskId": "task_123",
  "metadata": {}
}
Roles:
user
|
agent
客户端与服务器之间的通信单元:
json
{
  "messageId": "msg_789",
  "role": "user",
  "parts": [
    {
      "type": "text",
      "text": "Hello, agent!"
    }
  ],
  "contextId": "context_456",
  "taskId": "task_123",
  "metadata": {}
}
角色:
user
|
agent

4. Part

4. Part

Content container supporting multiple types:
TypeStructureDescription
Text
{"type": "text", "text": "..."}
Plain text, markdown, HTML
File
{"type": "file", "uri": "...", "mimeType": "..."}
File reference
Data
{"type": "data", "data": {...}}
Structured JSON
支持多种类型的内容容器:
类型结构描述
文本
{"type": "text", "text": "..."}
纯文本、Markdown、HTML
文件
{"type": "file", "uri": "...", "mimeType": "..."}
文件引用
数据
{"type": "data", "data": {...}}
结构化JSON数据

5. Artifact

5. Artifact

Task output representation:
json
{
  "id": "artifact_001",
  "name": "Result",
  "description": "Calculation result",
  "parts": [
    {"type": "text", "text": "42"}
  ],
  "metadata": {}
}

任务输出的表现形式:
json
{
  "id": "artifact_001",
  "name": "Result",
  "description": "Calculation result",
  "parts": [
    {"type": "text", "text": "42"}
  ],
  "metadata": {}
}

JSON-RPC 2.0 Operations

JSON-RPC 2.0操作

Method List

方法列表

MethodDescription
message/send
Send message, returns Task or Message
message/stream
Streaming variant with SSE
tasks/get
Retrieve task state by ID
tasks/list
List tasks with filtering/pagination
tasks/cancel
Cancel an active task
tasks/subscribe
Stream updates for existing task
tasks/pushNotificationConfig/create
Create webhook config
tasks/pushNotificationConfig/get
Get webhook config
tasks/pushNotificationConfig/list
List webhook configs
tasks/pushNotificationConfig/delete
Delete webhook config
agent/getExtendedCard
Get authenticated agent card
方法描述
message/send
发送消息,返回Task或Message
message/stream
基于SSE的流式传输变体
tasks/get
根据ID获取任务状态
tasks/list
列出任务,支持过滤与分页
tasks/cancel
取消活跃任务
tasks/subscribe
流式获取现有任务的更新
tasks/pushNotificationConfig/create
创建Webhook配置
tasks/pushNotificationConfig/get
获取Webhook配置
tasks/pushNotificationConfig/list
列出Webhook配置
tasks/pushNotificationConfig/delete
删除Webhook配置
agent/getExtendedCard
获取经过身份验证的Agent卡片

Request Format

请求格式

json
{
  "jsonrpc": "2.0",
  "id": "request_id",
  "method": "message/send",
  "params": {
    "message": {
      "role": "user",
      "parts": [{"type": "text", "text": "Hello"}],
      "messageId": "msg_001"
    }
  }
}
json
{
  "jsonrpc": "2.0",
  "id": "request_id",
  "method": "message/send",
  "params": {
    "message": {
      "role": "user",
      "parts": [{"type": "text", "text": "Hello"}],
      "messageId": "msg_001"
    }
  }
}

Response Format

响应格式

Success:
json
{
  "jsonrpc": "2.0",
  "id": "request_id",
  "result": {
    "task": { ... }
  }
}
Error:
json
{
  "jsonrpc": "2.0",
  "id": "request_id",
  "error": {
    "code": -32000,
    "message": "Task not found",
    "data": { "taskId": "invalid_id" }
  }
}
成功响应:
json
{
  "jsonrpc": "2.0",
  "id": "request_id",
  "result": {
    "task": { ... }
  }
}
错误响应:
json
{
  "jsonrpc": "2.0",
  "id": "request_id",
  "error": {
    "code": -32000,
    "message": "Task not found",
    "data": { "taskId": "invalid_id" }
  }
}

Error Codes

错误码

CodeNameDescription
-32700
Parse ErrorInvalid JSON
-32600
Invalid RequestInvalid JSON-RPC structure
-32601
Method Not FoundUnknown method
-32602
Invalid ParamsInvalid method parameters
-32603
Internal ErrorServer error
-32000
TaskNotFoundErrorTask does not exist
-32001
PushNotificationNotSupportedErrorWebhooks not supported
-32002
UnsupportedOperationErrorFeature not available
-32003
ContentTypeNotSupportedErrorUnsupported media type
-32004
VersionNotSupportedErrorProtocol version mismatch

代码名称描述
-32700
Parse ErrorJSON格式无效
-32600
Invalid RequestJSON-RPC结构无效
-32601
Method Not Found方法不存在
-32602
Invalid Params方法参数无效
-32603
Internal Error服务器内部错误
-32000
TaskNotFoundError任务不存在
-32001
PushNotificationNotSupportedError不支持Webhook
-32002
UnsupportedOperationError功能不可用
-32003
ContentTypeNotSupportedError不支持的媒体类型
-32004
VersionNotSupportedError协议版本不匹配

Streaming (Server-Sent Events)

流式传输(Server-Sent Events)

Stream Response Format

流式响应格式

event: message
data: {"task": {...}}

event: message
data: {"statusUpdate": {"taskId": "...", "state": "working"}}

event: message
data: {"artifactUpdate": {"taskId": "...", "artifact": {...}}}

event: done
data: {"status": "complete"}
event: message
data: {"task": {...}}

event: message
data: {"statusUpdate": {"taskId": "...", "state": "working"}}

event: message
data: {"artifactUpdate": {"taskId": "...", "artifact": {...}}}

event: done
data: {"status": "complete"}

Event Types

事件类型

EventDescription
task
Initial task state
message
Direct response message
statusUpdate
Task state change
artifactUpdate
New or updated artifact
Ordering Guarantee: Events MUST be delivered in generation order.

事件描述
task
初始任务状态
message
直接响应消息
statusUpdate
任务状态变更
artifactUpdate
新增或更新的输出产物
顺序保证: 事件必须按照生成顺序交付。

Security Schemes

安全方案

Supported Authentication

支持的身份验证方式

SchemeDescription
API KeyHeader, query, or cookie
HTTP AuthBearer, Basic, Digest
OAuth 2.0Authorization Code, Client Credentials, Device Code
OpenID ConnectIdentity layer on OAuth 2.0
Mutual TLSCertificate-based auth
方案描述
API密钥通过请求头、查询参数或Cookie传递
HTTP认证Bearer、Basic、Digest认证
OAuth 2.0授权码、客户端凭证、设备码模式
OpenID Connect基于OAuth 2.0的身份层
双向TLS基于证书的身份验证

Example Security Declaration

安全声明示例

json
{
  "securitySchemes": {
    "apiKey": {
      "type": "apiKey",
      "in": "header",
      "name": "X-API-Key"
    },
    "oauth2": {
      "type": "oauth2",
      "flows": {
        "clientCredentials": {
          "tokenUrl": "https://auth.example.com/token",
          "scopes": {
            "agent:read": "Read agent data",
            "agent:write": "Execute tasks"
          }
        }
      }
    }
  },
  "security": [{"apiKey": []}, {"oauth2": ["agent:read"]}]
}

json
{
  "securitySchemes": {
    "apiKey": {
      "type": "apiKey",
      "in": "header",
      "name": "X-API-Key"
    },
    "oauth2": {
      "type": "oauth2",
      "flows": {
        "clientCredentials": {
          "tokenUrl": "https://auth.example.com/token",
          "scopes": {
            "agent:read": "读取Agent数据",
            "agent:write": "执行任务"
          }
        }
      }
    }
  },
  "security": [{"apiKey": []}, {"oauth2": ["agent:read"]}]
}

Implementation Guide

实现指南

Dependencies

依赖安装

bash
pip install a2a-sdk uvicorn python-dotenv
bash
pip install a2a-sdk uvicorn python-dotenv

Server Implementation (3-File Pattern)

服务器实现(三文件模式)

1. agent.py - Agent Definition

1. agent.py - Agent定义

python
from a2a.types import AgentCapabilities, AgentSkill, AgentCard, ContentTypes

def create_agent_card(url: str) -> AgentCard:
    return AgentCard(
        name="my_agent",
        description="Agent description",
        url=url,
        version="1.0.0",
        protocolVersion="0.3.0",
        defaultInputModes=[ContentTypes.TEXT],
        defaultOutputModes=[ContentTypes.TEXT],
        capabilities=AgentCapabilities(
            streaming=True,
            pushNotifications=False,
        ),
        skills=[
            AgentSkill(
                id="main_skill",
                name="Main Skill",
                description="What this agent does",
                examples=["Example query"],
                tags=["category"],
            )
        ],
    )
python
from a2a.types import AgentCapabilities, AgentSkill, AgentCard, ContentTypes

def create_agent_card(url: str) -> AgentCard:
    return AgentCard(
        name="my_agent",
        description="Agent description",
        url=url,
        version="1.0.0",
        protocolVersion="0.3.0",
        defaultInputModes=[ContentTypes.TEXT],
        defaultOutputModes=[ContentTypes.TEXT],
        capabilities=AgentCapabilities(
            streaming=True,
            pushNotifications=False,
        ),
        skills=[
            AgentSkill(
                id="main_skill",
                name="Main Skill",
                description="What this agent does",
                examples=["Example query"],
                tags=["category"],
            )
        ],
    )

2. agent_executor.py - Business Logic

2. agent_executor.py - 业务逻辑

python
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.types import Part, TextPart, Task, TaskState, TaskStatus
from a2a.utils import completed_task, new_artifact, working_task

class MyAgentExecutor(AgentExecutor):
    async def execute(
        self,
        context: RequestContext,
        event_queue: EventQueue
    ) -> None:
        user_input = context.get_user_input()

        # Signal working state (optional, for long tasks)
        await event_queue.enqueue_event(
            working_task(context.task_id, context.context_id)
        )

        # --- YOUR AGENT LOGIC HERE ---
        result = await self.process(user_input)
        # ------------------------------

        # Create response parts
        parts = [Part(root=TextPart(text=result))]

        # Complete task with artifact
        await event_queue.enqueue_event(
            completed_task(
                context.task_id,
                context.context_id,
                artifacts=[new_artifact(parts, f"result_{context.task_id}")],
                history=[context.message],
            )
        )

    async def cancel(
        self,
        context: RequestContext,
        event_queue: EventQueue
    ) -> Task | None:
        # Handle cancellation request
        return None

    async def process(self, input_text: str) -> str:
        # Implement your logic
        return f"Processed: {input_text}"
python
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.types import Part, TextPart, Task, TaskState, TaskStatus
from a2a.utils import completed_task, new_artifact, working_task

class MyAgentExecutor(AgentExecutor):
    async def execute(
        self,
        context: RequestContext,
        event_queue: EventQueue
    ) -> None:
        user_input = context.get_user_input()

        # 发送任务处理中状态(可选,适用于长任务)
        await event_queue.enqueue_event(
            working_task(context.task_id, context.context_id)
        )

        # --- 在此处编写你的Agent逻辑 ---
        result = await self.process(user_input)
        # ------------------------------

        # 创建响应内容
        parts = [Part(root=TextPart(text=result))]

        # 完成任务并返回输出产物
        await event_queue.enqueue_event(
            completed_task(
                context.task_id,
                context.context_id,
                artifacts=[new_artifact(parts, f"result_{context.task_id}")],
                history=[context.message],
            )
        )

    async def cancel(
        self,
        context: RequestContext,
        event_queue: EventQueue
    ) -> Task | None:
        # 处理任务取消请求
        return None

    async def process(self, input_text: str) -> str:
        # 实现自定义处理逻辑
        return f"Processed: {input_text}"

3. main.py - Server Entry Point

3. main.py - 服务器入口

python
import uvicorn
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.apps import A2AStarletteApplication
from a2a.server.tasks import InMemoryTaskStore
from .agent import create_agent_card
from .agent_executor import MyAgentExecutor

def main():
    host = "0.0.0.0"
    port = 8080
    url = f"http://{host}:{port}/"

    agent_card = create_agent_card(url)

    handler = DefaultRequestHandler(
        agent_executor=MyAgentExecutor(),
        task_store=InMemoryTaskStore(),
    )

    app = A2AStarletteApplication(
        agent_card=agent_card,
        http_handler=handler,
    )

    print(f"A2A Agent running at {url}")
    print(f"Agent Card: {url}.well-known/agent-card.json")

    uvicorn.run(app.build(), host=host, port=port)

if __name__ == "__main__":
    main()
python
import uvicorn
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.apps import A2AStarletteApplication
from a2a.server.tasks import InMemoryTaskStore
from .agent import create_agent_card
from .agent_executor import MyAgentExecutor

def main():
    host = "0.0.0.0"
    port = 8080
    url = f"http://{host}:{port}/"

    agent_card = create_agent_card(url)

    handler = DefaultRequestHandler(
        agent_executor=MyAgentExecutor(),
        task_store=InMemoryTaskStore(),
    )

    app = A2AStarletteApplication(
        agent_card=agent_card,
        http_handler=handler,
    )

    print(f"A2A Agent运行于 {url}")
    print(f"Agent卡片地址: {url}.well-known/agent-card.json")

    uvicorn.run(app.build(), host=host, port=port)

if __name__ == "__main__":
    main()

Client Implementation

客户端实现

python
import httpx
from a2a.client import A2ACardResolver, A2AClient
from a2a.types import (
    SendMessageRequest,
    MessageSendParams,
    Message,
    Part,
    TextPart,
)

async def call_agent(agent_url: str, query: str):
    async with httpx.AsyncClient(timeout=60.0) as http:
        # 1. Discover agent
        resolver = A2ACardResolver(
            base_url=agent_url,
            httpx_client=http
        )
        card = await resolver.get_agent_card()
        print(f"Connected to: {card.name} v{card.version}")

        # 2. Create client
        client = A2AClient(http, card, url=agent_url)

        # 3. Build message
        message = Message(
            role="user",
            parts=[Part(root=TextPart(text=query))],
        )

        # 4. Send request
        request = SendMessageRequest(
            params=MessageSendParams(message=message)
        )

        response = await client.send_message(request)
        return response
python
import httpx
from a2a.client import A2ACardResolver, A2AClient
from a2a.types import (
    SendMessageRequest,
    MessageSendParams,
    Message,
    Part,
    TextPart,
)

async def call_agent(agent_url: str, query: str):
    async with httpx.AsyncClient(timeout=60.0) as http:
        # 1. 发现Agent
        resolver = A2ACardResolver(
            base_url=agent_url,
            httpx_client=http
        )
        card = await resolver.get_agent_card()
        print(f"已连接至: {card.name} v{card.version}")

        # 2. 创建客户端
        client = A2AClient(http, card, url=agent_url)

        # 3. 构建消息
        message = Message(
            role="user",
            parts=[Part(root=TextPart(text=query))],
        )

        # 4. 发送请求
        request = SendMessageRequest(
            params=MessageSendParams(message=message)
        )

        response = await client.send_message(request)
        return response

Streaming client

流式客户端

async def call_agent_streaming(agent_url: str, query: str): async with httpx.AsyncClient(timeout=None) as http: resolver = A2ACardResolver(base_url=agent_url, httpx_client=http) card = await resolver.get_agent_card() client = A2AClient(http, card, url=agent_url)
    message = Message(
        role="user",
        parts=[Part(root=TextPart(text=query))],
    )
    request = SendMessageRequest(
        params=MessageSendParams(message=message)
    )

    async for event in client.send_message_streaming(request):
        if hasattr(event, 'task'):
            print(f"Task: {event.task.status.state}")
        elif hasattr(event, 'artifact'):
            print(f"Artifact: {event.artifact}")

---
async def call_agent_streaming(agent_url: str, query: str): async with httpx.AsyncClient(timeout=None) as http: resolver = A2ACardResolver(base_url=agent_url, httpx_client=http) card = await resolver.get_agent_card() client = A2AClient(http, card, url=agent_url)
    message = Message(
        role="user",
        parts=[Part(root=TextPart(text=query))],
    )
    request = SendMessageRequest(
        params=MessageSendParams(message=message)
    )

    async for event in client.send_message_streaming(request):
        if hasattr(event, 'task'):
            print(f"任务状态: {event.task.status.state}")
        elif hasattr(event, 'artifact'):
            print(f"输出产物: {event.artifact}")

---

Best Practices

最佳实践

1. Agent Discovery

1. Agent发现

Always fetch
AgentCard
before interaction to adapt to capability changes.
在与Agent交互前,务必获取
AgentCard
以适配其能力变化。

2. Streaming

2. 流式传输

Use SSE for long-running tasks to provide real-time updates.
对于长时间运行的任务,使用SSE实现实时更新。

3. Artifacts vs Messages

3. 输出产物与消息的区别

  • Artifacts: Final deliverables (files, structured data)
  • Messages: Conversational updates, status information
  • 输出产物(Artifacts):最终交付成果(文件、结构化数据)
  • 消息(Messages):会话式更新、状态信息

4. Error Handling

4. 错误处理

python
try:
    response = await client.send_message(request)
except A2AError as e:
    if e.code == -32000:
        print("Task not found")
    elif e.code == -32602:
        print("Invalid parameters")
python
try:
    response = await client.send_message(request)
except A2AError as e:
    if e.code == -32000:
        print("任务不存在")
    elif e.code == -32602:
        print("参数无效")

5. Pagination

5. 分页

python
undefined
python
undefined

List tasks with pagination

分页列出任务

params = TaskQueryParams( contextId="ctx_123", status=["completed", "failed"], pageSize=50, pageToken=None, # For first page ) response = await client.list_tasks(params) next_page_token = response.nextPageToken
undefined
params = TaskQueryParams( contextId="ctx_123", status=["completed", "failed"], pageSize=50, pageToken=None, # 第一页 ) response = await client.list_tasks(params) next_page_token = response.nextPageToken
undefined

6. Push Notifications

6. 推送通知

python
undefined
python
undefined

Configure webhook for async updates

配置Webhook以接收异步更新

config = PushNotificationConfig( url="https://myserver.com/webhook", authentication={ "type": "bearer", "token": "secret_token" } ) await client.create_push_notification_config(task_id, config)

---
config = PushNotificationConfig( url="https://myserver.com/webhook", authentication={ "type": "bearer", "token": "secret_token" } ) await client.create_push_notification_config(task_id, config)

---

Quick Reference

快速参考

Endpoints

端点

EndpointMethodDescription
/.well-known/agent-card.json
GETPublic agent card
/
POSTJSON-RPC endpoint
端点请求方法描述
/.well-known/agent-card.json
GET公开Agent卡片
/
POSTJSON-RPC端点

Headers

请求头

HeaderDescription
Content-Type
application/json
Accept
application/json
or
text/event-stream
A2A-Version
Protocol version (e.g.,
0.3.0
)
请求头描述
Content-Type
application/json
Accept
application/json
text/event-stream
A2A-Version
协议版本(例如:
0.3.0

SDK Utilities

SDK工具函数

python
from a2a.utils import (
    completed_task,    # Create completed task event
    failed_task,       # Create failed task event
    working_task,      # Create working status event
    input_required,    # Request user input
    new_artifact,      # Create new artifact
    new_message,       # Create new message
)

python
from a2a.utils import (
    completed_task,    # 创建任务完成事件
    failed_task,       # 创建任务失败事件
    working_task,      # 创建任务处理中状态事件
    input_required,    # 请求用户输入
    new_artifact,      # 创建新的输出产物
    new_message,       # 创建新消息
)

References

参考链接