migrating-ai-sdk-to-common-ai
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseMigrate airflow-ai-sdk to apache-airflow-providers-common-ai
将airflow-ai-sdk迁移至apache-airflow-providers-common-ai
This skill migrates Airflow projects from to (0.1.0+), the official Airflow AI provider built on PydanticAI.
airflow-ai-sdkapache-airflow-providers-common-aiCRITICAL: The new provider requires Airflow 3.0+ and pydantic-ai-slim >= 1.34.0. The API surface has changed: LLM configuration moves from code (model strings/objects) to Airflow connections (type). There is nopydanticaiin the new provider.@task.embed
本技能可将Airflow项目从迁移至基于PydanticAI构建的官方Airflow AI提供者(0.1.0及以上版本)。
airflow-ai-sdkapache-airflow-providers-common-ai重要提示:新提供者要求Airflow 3.0及以上版本以及pydantic-ai-slim >= 1.34.0。API层面已发生变更:LLM配置从代码中的模型字符串/对象转移至Airflow连接(类型)。新提供者中不再包含pydanticai装饰器。@task.embed
Before starting
开始之前
Use the Grep tool with the pattern below to inventory everything that needs to migrate:
airflow_ai_sdk|airflow-ai-sdk|ai_sdk|@task\.llm|@task\.agent|@task\.llm_branch|@task\.embedFrom the results, capture:
- All files importing /
airflow-ai-sdkairflow_ai_sdk - Which decorators are in use: ,
@task.llm,@task.agent,@task.llm_branch@task.embed - The model configuration pattern (string names like , or
"gpt-5"objects)OpenAIModel(...) - Any subclasses used as
airflow_ai_sdk.BaseModeloutput_type
Use this inventory to drive the steps below.
使用Grep工具,通过以下模式排查所有需要迁移的内容:
airflow_ai_sdk|airflow-ai-sdk|ai_sdk|@task\.llm|@task\.agent|@task\.llm_branch|@task\.embed从结果中收集:
- 所有导入/
airflow-ai-sdk的文件airflow_ai_sdk - 当前使用的装饰器类型:、
@task.llm、@task.agent、@task.llm_branch@task.embed - 模型配置模式(如这类字符串名称,或
"gpt-5"对象)OpenAIModel(...) - 任何用作的
output_type子类airflow_ai_sdk.BaseModel
根据上述清单执行后续步骤。
Step 1: Update requirements.txt
步骤1:更新requirements.txt
Remove:
airflow-ai-sdk[openai]移除以下内容:
airflow-ai-sdk[openai]or any variant: airflow-ai-sdk[openai]==0.1.7, airflow-ai-sdk[anthropic], etc.
或其变体形式:airflow-ai-sdk[openai]==0.1.7、airflow-ai-sdk[anthropic]等
**Add:**apache-airflow-providers-common-ai[openai]>=0.1.0
Use the latest available 0.x version unless the user has pinned a specific one. Available extras match the LLM provider: `[openai]`, `[anthropic]`, `[google]`, `[bedrock]`, `[groq]`, `[mistral]`, `[mcp]`.
Keep `sentence-transformers` and `torch` if the project uses embeddings (they now run via plain `@task` instead of `@task.embed`).
---
**添加以下内容:**apache-airflow-providers-common-ai[openai]>=0.1.0
除非用户指定了特定版本,否则请使用最新的0.x版本。可用的扩展包与LLM提供者对应:`[openai]`、`[anthropic]`、`[google]`、`[bedrock]`、`[groq]`、`[mistral]`、`[mcp]`。
如果项目使用嵌入功能,请保留`sentence-transformers`和`torch`(它们现在通过普通`@task`运行,而非`@task.embed`)。
---Step 2: Create PydanticAI connection
步骤2:创建PydanticAI连接
The new provider uses an Airflow connection instead of model strings or objects in code.
Connection type:
Default connection ID:
pydanticaipydanticai_default新提供者使用Airflow连接替代代码中的模型字符串或对象。
连接类型:
默认连接ID:
pydanticaipydanticai_defaultVia environment variable (.env)
通过环境变量(.env)配置
bash
AIRFLOW_CONN_PYDANTICAI_DEFAULT='{
"conn_type": "pydanticai",
"password": "<api-key>",
"extra": {
"model": "<provider>:<model-name>"
}
}'bash
AIRFLOW_CONN_PYDANTICAI_DEFAULT='{
"conn_type": "pydanticai",
"password": "<api-key>",
"extra": {
"model": "<provider>:<model-name>"
}
}'Model format
模型格式
The model field uses format:
provider:model| Provider | Example model value |
|---|---|
| OpenAI | |
| Anthropic | |
| |
| Groq | |
| Mistral | |
| Bedrock | |
模型字段采用格式:
provider:model| 提供者 | 模型值示例 |
|---|---|
| OpenAI | |
| Anthropic | |
| |
| Groq | |
| Mistral | |
| Bedrock | |
Custom endpoints (Ollama, vLLM, Snowflake Cortex, etc.)
自定义端点(Ollama、vLLM、Snowflake Cortex等)
Set to the base URL:
hostbash
AIRFLOW_CONN_PYDANTICAI_CORTEX='{
"conn_type": "pydanticai",
"password": "<api-key>",
"host": "https://my-endpoint.com/v1",
"extra": {
"model": "openai:<model-name>"
}
}'Use the prefix for any OpenAI-compatible API, regardless of the actual provider.
openai:将设置为基础URL:
hostbash
AIRFLOW_CONN_PYDANTICAI_CORTEX='{
"conn_type": "pydanticai",
"password": "<api-key>",
"host": "https://my-endpoint.com/v1",
"extra": {
"model": "openai:<model-name>"
}
}'对于任何兼容OpenAI的API,无论实际提供者是谁,都使用前缀。
openai:Connection ID convention
连接ID命名规则
The env var name determines the connection ID:
- creates
AIRFLOW_CONN_PYDANTICAI_DEFAULTpydanticai_default - creates
AIRFLOW_CONN_PYDANTICAI_CORTEXpydanticai_cortex
环境变量名称决定连接ID:
- 创建
AIRFLOW_CONN_PYDANTICAI_DEFAULTpydanticai_default - 创建
AIRFLOW_CONN_PYDANTICAI_CORTEXpydanticai_cortex
Model resolution priority
模型解析优先级
- parameter on the decorator/operator (highest)
model_id - in connection's extra JSON (fallback)
model
- 装饰器/操作符上的参数(优先级最高)
model_id - 连接extra JSON中的(备选)
model
Step 3: Migrate decorators
步骤3:迁移装饰器
@task.llm
@task.llm
python
undefinedpython
undefinedBEFORE (airflow-ai-sdk)
迁移前(airflow-ai-sdk)
import airflow_ai_sdk as ai_sdk
class MyOutput(ai_sdk.BaseModel):
field: str
@task.llm(
model="gpt-5", # or model=OpenAIModel(...)
system_prompt="You are helpful.",
output_type=MyOutput,
)
def my_task(text: str) -> str:
return text
import airflow_ai_sdk as ai_sdk
class MyOutput(ai_sdk.BaseModel):
field: str
@task.llm(
model="gpt-5", # 或 model=OpenAIModel(...)
system_prompt="You are helpful.",
output_type=MyOutput,
)
def my_task(text: str) -> str:
return text
AFTER (apache-airflow-providers-common-ai)
迁移后(apache-airflow-providers-common-ai)
from pydantic import BaseModel
class MyOutput(BaseModel):
field: str
@task.llm(
llm_conn_id="pydanticai_default", # Airflow connection ID
system_prompt="You are helpful.",
output_type=MyOutput,
)
def my_task(text: str) -> str:
return text
**Parameter mapping:**
| airflow-ai-sdk | common-ai provider | Notes |
|----------------|-------------------|-------|
| `model="gpt-5"` | `llm_conn_id="pydanticai_default"` | Model specified in connection |
| `model=OpenAIModel(...)` | `llm_conn_id="pydanticai_default"` | Model + endpoint in connection |
| `system_prompt="..."` | `system_prompt="..."` | Unchanged |
| `output_type=MyModel` | `output_type=MyModel` | Unchanged |
| `result_type=MyModel` | `output_type=MyModel` | `result_type` was already deprecated |
| (not available) | `model_id="openai:gpt-5"` | Override connection's model |
| (not available) | `require_approval=True` | Built-in HITL review |
| (not available) | `agent_params={...}` | Extra kwargs for pydantic-ai Agent |from pydantic import BaseModel
class MyOutput(BaseModel):
field: str
@task.llm(
llm_conn_id="pydanticai_default", # Airflow连接ID
system_prompt="You are helpful.",
output_type=MyOutput,
)
def my_task(text: str) -> str:
return text
**参数映射:**
| airflow-ai-sdk | common-ai提供者 | 说明 |
|----------------|-------------------|-------|
| `model="gpt-5"` | `llm_conn_id="pydanticai_default"` | 模型在连接中指定 |
| `model=OpenAIModel(...)` | `llm_conn_id="pydanticai_default"` | 模型与端点在连接中配置 |
| `system_prompt="..."` | `system_prompt="..."` | 保持不变 |
| `output_type=MyModel` | `output_type=MyModel` | 保持不变 |
| `result_type=MyModel` | `output_type=MyModel` | `result_type`已被弃用 |
| (无此参数) | `model_id="openai:gpt-5"` | 覆盖连接中的模型 |
| (无此参数) | `require_approval=True` | 内置人工审核功能 |
| (无此参数) | `agent_params={...}` | 传递给pydantic-ai Agent的额外参数 |@task.llm_branch
@task.llm_branch
python
undefinedpython
undefinedBEFORE
迁移前
@task.llm_branch(
model="gpt-5",
system_prompt="Choose a team...",
allow_multiple_branches=False,
)
def route(text: str) -> str:
return text
@task.llm_branch(
model="gpt-5",
system_prompt="Choose a team...",
allow_multiple_branches=False,
)
def route(text: str) -> str:
return text
AFTER
迁移后
@task.llm_branch(
llm_conn_id="pydanticai_default",
system_prompt="Choose a team...",
allow_multiple_branches=False, # same parameter, unchanged
)
def route(text: str) -> str:
return text
Only change: `model=` becomes `llm_conn_id=`.@task.llm_branch(
llm_conn_id="pydanticai_default",
system_prompt="Choose a team...",
allow_multiple_branches=False, # 参数保持不变
)
def route(text: str) -> str:
return text
仅需修改:将`model=`替换为`llm_conn_id=`。@task.agent
@task.agent
This has the biggest API change. The Agent is no longer pre-built in user code.
python
undefined此装饰器的API变更最大。Agent不再需要在用户代码中预构建。
python
undefinedBEFORE (airflow-ai-sdk) - Agent built at module level
迁移前(airflow-ai-sdk)- 在模块级别构建Agent
from pydantic_ai import Agent
my_agent = Agent(
"gpt-5",
system_prompt="You are a research assistant.",
tools=[search_tool, lookup_tool],
)
@task.agent(agent=my_agent)
def research(question: str) -> str:
return question
from pydantic_ai import Agent
my_agent = Agent(
"gpt-5",
system_prompt="You are a research assistant.",
tools=[search_tool, lookup_tool],
)
@task.agent(agent=my_agent)
def research(question: str) -> str:
return question
AFTER (common-ai provider) - No Agent object, config via parameters
迁移后(common-ai提供者)- 无需Agent对象,通过参数配置
@task.agent(
llm_conn_id="pydanticai_default",
system_prompt="You are a research assistant.",
agent_params={"tools": [search_tool, lookup_tool]},
)
def research(question: str) -> str:
return question
**Parameter mapping:**
| airflow-ai-sdk | common-ai provider | Notes |
|----------------|-------------------|-------|
| `agent=Agent(model, ...)` | `llm_conn_id="..."` | Model from connection |
| Agent's `system_prompt` | `system_prompt="..."` | Now a decorator param |
| Agent's `tools=[...]` | `agent_params={"tools": [...]}` | Tools via agent_params dict |
| Agent's `output_type` | `output_type=MyModel` | Now a decorator param |
| (not available) | `toolsets=[...]` | pydantic-ai 1.x Toolset objects |
| (not available) | `durable=True` | Step-level caching |
| (not available) | `enable_hitl_review=True` | Iterative human review loop |
**Key insight:** Everything that was configured on the `Agent()` constructor now goes into either a top-level decorator parameter or `agent_params`. The `agent_params` dict is passed directly to pydantic-ai's `Agent` constructor.@task.agent(
llm_conn_id="pydanticai_default",
system_prompt="You are a research assistant.",
agent_params={"tools": [search_tool, lookup_tool]},
)
def research(question: str) -> str:
return question
**参数映射:**
| airflow-ai-sdk | common-ai提供者 | 说明 |
|----------------|-------------------|-------|
| `agent=Agent(model, ...)` | `llm_conn_id="..."` | 模型来自连接配置 |
| Agent的`system_prompt` | `system_prompt="..."` | 现在是装饰器参数 |
| Agent的`tools=[...]` | `agent_params={"tools": [...]}` | 通过agent_params字典配置工具 |
| Agent的`output_type` | `output_type=MyModel` | 现在是装饰器参数 |
| (无此参数) | `toolsets=[...]` | pydantic-ai 1.x Toolset对象 |
| (无此参数) | `durable=True` | 步骤级缓存 |
| (无此参数) | `enable_hitl_review=True` | 迭代式人工审核循环 |
**核心要点:** 之前在`Agent()`构造函数中配置的所有内容,现在都要放在顶层装饰器参数或`agent_params`中。`agent_params`字典会直接传递给pydantic-ai的`Agent`构造函数。@task.embed (NO EQUIVALENT)
@task.embed(无替代方案)
The new provider does NOT include an embed decorator. Replace with a plain :
@taskpython
undefined新提供者不包含embed装饰器,请替换为普通的:
@taskpython
undefinedBEFORE (airflow-ai-sdk)
迁移前(airflow-ai-sdk)
@task.embed(
model_name="all-MiniLM-L6-v2",
encode_kwargs={"normalize_embeddings": True},
max_active_tis_per_dagrun=1,
)
def embed_text(text: str) -> str:
return text
@task.embed(
model_name="all-MiniLM-L6-v2",
encode_kwargs={"normalize_embeddings": True},
max_active_tis_per_dagrun=1,
)
def embed_text(text: str) -> str:
return text
AFTER (plain @task with sentence-transformers)
迁移后(使用sentence-transformers的普通@task)
@task(max_active_tis_per_dagrun=1)
def embed_text(text: str) -> list[float]:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-MiniLM-L6-v2")
return model.encode(text, normalize_embeddings=True).tolist()
Note: The model is loaded on each task execution. For small workloads this is fine. For large batches, consider embedding all texts in a single task instead of using `.expand()`.
---@task(max_active_tis_per_dagrun=1)
def embed_text(text: str) -> list[float]:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-MiniLM-L6-v2")
return model.encode(text, normalize_embeddings=True).tolist()
注意:模型会在每次任务执行时加载。对于小型工作负载,此方式可行。对于大规模批量任务,建议在单个任务中完成所有文本嵌入,而非使用`.expand()`。
---Step 4: Update imports
步骤4:更新导入语句
| Old import | New import |
|---|---|
| Remove entirely |
| |
| |
| |
| Remove if Agent was only used for |
| Remove (model config in connection now) |
The , , decorators are auto-registered by the provider. No explicit import needed beyond .
@task.llm@task.agent@task.llm_branchfrom airflow.sdk import taskpydantic_aiBinaryContentpydantic-ai-slim>=1.34.0| 旧导入语句 | 新导入语句 |
|---|---|
| 完全移除 |
| |
| |
| |
| 若Agent仅用于 |
| 移除(模型配置现在在连接中) |
@task.llm@task.agent@task.llm_branchfrom airflow.sdk import task非装饰器场景下的导入(如用于多模态的)仍然有效,因为新提供者依赖。
pydantic_aiBinaryContentpydantic-ai-slim>=1.34.0Step 5: Update connections.yaml (if used for local testing)
步骤5:更新connections.yaml(若用于本地测试)
yaml
pydanticai_default:
conn_type: pydanticai
password: <api-key>
extra:
model: "openai:gpt-5"For custom endpoints:
yaml
pydanticai_cortex:
conn_type: pydanticai
password: <api-key>
host: https://my-endpoint.com/v1
extra:
model: "openai:llama3.1-8b"yaml
pydanticai_default:
conn_type: pydanticai
password: <api-key>
extra:
model: "openai:gpt-5"自定义端点配置:
yaml
pydanticai_cortex:
conn_type: pydanticai
password: <api-key>
host: https://my-endpoint.com/v1
extra:
model: "openai:llama3.1-8b"Step 6: Clean up env vars
步骤6:清理环境变量
The new provider reads model config from the connection, so env vars that previously fed the model in code are usually redundant. Before removing any of them, grep the project (and any sibling scripts/services) to confirm nothing else still references them:
pydanticaiOPENAI_API_KEY|OPENAI_BASE_URL|ANTHROPIC_API_KEY|GOOGLE_API_KEYCandidates for removal only if no other code references them:
- (now in the pydanticai connection's password field)
OPENAI_API_KEY - (now in the connection's host field)
OPENAI_BASE_URL - Custom model name vars (now in the connection's extra.model)
If anything outside the migrated DAGs still uses them (other DAGs not yet migrated, helper scripts, non-Airflow services sharing the ), leave them in place.
.envKeep env vars for all connections.
AIRFLOW_CONN_*新提供者从连接读取模型配置,因此之前在代码中用于模型配置的环境变量通常已冗余。移除前,请通过Grep排查项目(及相关脚本/服务),确认没有其他地方仍在引用这些变量:
pydanticaiOPENAI_API_KEY|OPENAI_BASE_URL|ANTHROPIC_API_KEY|GOOGLE_API_KEY仅当无其他代码引用时可移除的变量:
- (现在存储在pydanticai连接的password字段中)
OPENAI_API_KEY - (现在存储在连接的host字段中)
OPENAI_BASE_URL - 自定义模型名称变量(现在存储在连接的extra.model中)
如果迁移后的DAG之外还有其他内容在使用这些变量(如未迁移的其他DAG、辅助脚本、共享的非Airflow服务),请保留它们。
.env请保留所有连接对应的环境变量。
AIRFLOW_CONN_*Step 7: Verify
步骤7:验证
After migration, grep the codebase to confirm no stale references remain:
airflow_ai_sdk|airflow-ai-sdk|ai_sdk\.BaseModel|from pydantic_ai import Agent|from pydantic_ai.modelsVerify:
- No imports from
airflow_ai_sdk - No objects created for
Agent()(unless used outside decorators)@task.agent - No parameter on LLM decorators (should be
model=)llm_conn_id= - All replaced with plain
@task.embed@task - connection configured in
pydanticaior connections.yaml.env - has
requirements.txtinstead ofapache-airflow-providers-common-ai[...]airflow-ai-sdk[...]
迁移完成后,通过Grep排查代码库,确认无残留引用:
airflow_ai_sdk|airflow-ai-sdk|ai_sdk\.BaseModel|from pydantic_ai import Agent|from pydantic_ai.models验证项:
- 无相关导入
airflow_ai_sdk - 无用于的
@task.agent对象(除非在装饰器外使用)Agent() - LLM装饰器上无参数(应替换为
model=)llm_conn_id= - 所有已替换为普通
@task.embed@task - 或connections.yaml中已配置
.env连接pydanticai - 中包含
requirements.txt,而非apache-airflow-providers-common-ai[...]airflow-ai-sdk[...]
Quick reference: New features in common-ai provider
快速参考:common-ai提供者的新特性
These features are available after migration but have no airflow-ai-sdk equivalent:
| Feature | Parameter | Description |
|---|---|---|
| HITL approval | | Pause for human review before returning |
| HITL review loop | | Iterative review with regeneration |
| Durable execution | | Step-level caching for resilience |
| Tool logging | | INFO-level tool call logs (default: on) |
| Model override | | Override connection's model per-task |
| File analysis | | Analyze files/images via ObjectStoragePath |
| NL-to-SQL | | Generate SQL from natural language |
以下特性在迁移后可用,但airflow-ai-sdk中无对应功能:
| 特性 | 参数 | 描述 |
|---|---|---|
| 人工审核 | | 返回结果前暂停以进行人工审核 |
| 人工审核循环 | | 带重新生成功能的迭代式审核 |
| 持久化执行 | | 用于恢复能力的步骤级缓存 |
| 工具日志 | | INFO级工具调用日志(默认开启) |
| 模型覆盖 | | 按任务覆盖连接中的模型 |
| 文件分析 | | 通过ObjectStoragePath分析文件/图片 |
| 自然语言转SQL | | 从自然语言生成SQL |