This skill provides comprehensive knowledge for building, deploying, and interacting with agents using the Agent2Agent (A2A) Protocol v0.3.0.
A2A is a standard protocol enabling AI agents to communicate and collaborate. It operates across three layers:
Self-describing manifest hosted at
/.well-known/agent-card.json
:
json
{
"name": "my_agent",
"description": "Agent description",
"url": "http://localhost:8080/",
"version": "1.0.0",
"protocolVersion": "0.3.0",
"defaultInputModes": ["text"],
"defaultOutputModes": ["text"],
"capabilities": {
"streaming": true,
"pushNotifications": false,
"extendedAgentCard": false
},
"skills": [
{
"id": "skill_id",
"name": "Skill Name",
"description": "What this skill does",
"examples": ["Example query 1", "Example query 2"],
"tags": []
}
],
"securitySchemes": {},
"security": []
}
json
{
"messageId": "msg_789",
"role": "user",
"parts": [
{
"type": "text",
"text": "Hello, agent!"
}
],
"contextId": "context_456",
"taskId": "task_123",
"metadata": {}
}
json
{
"securitySchemes": {
"apiKey": {
"type": "apiKey",
"in": "header",
"name": "X-API-Key"
},
"oauth2": {
"type": "oauth2",
"flows": {
"clientCredentials": {
"tokenUrl": "https://auth.example.com/token",
"scopes": {
"agent:read": "Read agent data",
"agent:write": "Execute tasks"
}
}
}
}
},
"security": [{"apiKey": []}, {"oauth2": ["agent:read"]}]
}
python
from a2a.types import AgentCapabilities, AgentSkill, AgentCard, ContentTypes
def create_agent_card(url: str) -> AgentCard:
return AgentCard(
name="my_agent",
description="Agent description",
url=url,
version="1.0.0",
protocolVersion="0.3.0",
defaultInputModes=[ContentTypes.TEXT],
defaultOutputModes=[ContentTypes.TEXT],
capabilities=AgentCapabilities(
streaming=True,
pushNotifications=False,
),
skills=[
AgentSkill(
id="main_skill",
name="Main Skill",
description="What this agent does",
examples=["Example query"],
tags=["category"],
)
],
)
python
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.types import Part, TextPart, Task, TaskState, TaskStatus
from a2a.utils import completed_task, new_artifact, working_task
class MyAgentExecutor(AgentExecutor):
async def execute(
self,
context: RequestContext,
event_queue: EventQueue
) -> None:
user_input = context.get_user_input()
# Signal working state (optional, for long tasks)
await event_queue.enqueue_event(
working_task(context.task_id, context.context_id)
)
# --- YOUR AGENT LOGIC HERE ---
result = await self.process(user_input)
# ------------------------------
# Create response parts
parts = [Part(root=TextPart(text=result))]
# Complete task with artifact
await event_queue.enqueue_event(
completed_task(
context.task_id,
context.context_id,
artifacts=[new_artifact(parts, f"result_{context.task_id}")],
history=[context.message],
)
)
async def cancel(
self,
context: RequestContext,
event_queue: EventQueue
) -> Task | None:
# Handle cancellation request
return None
async def process(self, input_text: str) -> str:
# Implement your logic
return f"Processed: {input_text}"
python
import uvicorn
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.apps import A2AStarletteApplication
from a2a.server.tasks import InMemoryTaskStore
from .agent import create_agent_card
from .agent_executor import MyAgentExecutor
def main():
host = "0.0.0.0"
port = 8080
url = f"http://{host}:{port}/"
agent_card = create_agent_card(url)
handler = DefaultRequestHandler(
agent_executor=MyAgentExecutor(),
task_store=InMemoryTaskStore(),
)
app = A2AStarletteApplication(
agent_card=agent_card,
http_handler=handler,
)
print(f"A2A Agent running at {url}")
print(f"Agent Card: {url}.well-known/agent-card.json")
uvicorn.run(app.build(), host=host, port=port)
if __name__ == "__main__":
main()
python
import httpx
from a2a.client import A2ACardResolver, A2AClient
from a2a.types import (
SendMessageRequest,
MessageSendParams,
Message,
Part,
TextPart,
)
async def call_agent(agent_url: str, query: str):
async with httpx.AsyncClient(timeout=60.0) as http:
# 1. Discover agent
resolver = A2ACardResolver(
base_url=agent_url,
httpx_client=http
)
card = await resolver.get_agent_card()
print(f"Connected to: {card.name} v{card.version}")
# 2. Create client
client = A2AClient(http, card, url=agent_url)
# 3. Build message
message = Message(
role="user",
parts=[Part(root=TextPart(text=query))],
)
# 4. Send request
request = SendMessageRequest(
params=MessageSendParams(message=message)
)
response = await client.send_message(request)
return response
# Streaming client
async def call_agent_streaming(agent_url: str, query: str):
async with httpx.AsyncClient(timeout=None) as http:
resolver = A2ACardResolver(base_url=agent_url, httpx_client=http)
card = await resolver.get_agent_card()
client = A2AClient(http, card, url=agent_url)
message = Message(
role="user",
parts=[Part(root=TextPart(text=query))],
)
request = SendMessageRequest(
params=MessageSendParams(message=message)
)
async for event in client.send_message_streaming(request):
if hasattr(event, 'task'):
print(f"Task: {event.task.status.state}")
elif hasattr(event, 'artifact'):
print(f"Artifact: {event.artifact}")
Always fetch
before interaction to adapt to capability changes.
Use SSE for long-running tasks to provide real-time updates.
python
from a2a.utils import (
completed_task, # Create completed task event
failed_task, # Create failed task event
working_task, # Create working status event
input_required, # Request user input
new_artifact, # Create new artifact
new_message, # Create new message
)