migrating-ai-sdk-to-common-ai

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Migrate airflow-ai-sdk to apache-airflow-providers-common-ai

将airflow-ai-sdk迁移至apache-airflow-providers-common-ai

This skill migrates Airflow projects from
airflow-ai-sdk
to
apache-airflow-providers-common-ai
(0.1.0+), the official Airflow AI provider built on PydanticAI.
CRITICAL: The new provider requires Airflow 3.0+ and pydantic-ai-slim >= 1.34.0. The API surface has changed: LLM configuration moves from code (model strings/objects) to Airflow connections (
pydanticai
type). There is no
@task.embed
in the new provider.
本技能可将Airflow项目从
airflow-ai-sdk
迁移至基于PydanticAI构建的官方Airflow AI提供者
apache-airflow-providers-common-ai
(0.1.0及以上版本)。
重要提示:新提供者要求Airflow 3.0及以上版本以及pydantic-ai-slim >= 1.34.0。API层面已发生变更:LLM配置从代码中的模型字符串/对象转移至Airflow连接(
pydanticai
类型)。新提供者中不再包含
@task.embed
装饰器。

Before starting

开始之前

Use the Grep tool with the pattern below to inventory everything that needs to migrate:
airflow_ai_sdk|airflow-ai-sdk|ai_sdk|@task\.llm|@task\.agent|@task\.llm_branch|@task\.embed
From the results, capture:
  1. All files importing
    airflow-ai-sdk
    /
    airflow_ai_sdk
  2. Which decorators are in use:
    @task.llm
    ,
    @task.agent
    ,
    @task.llm_branch
    ,
    @task.embed
  3. The model configuration pattern (string names like
    "gpt-5"
    , or
    OpenAIModel(...)
    objects)
  4. Any
    airflow_ai_sdk.BaseModel
    subclasses used as
    output_type
Use this inventory to drive the steps below.

使用Grep工具,通过以下模式排查所有需要迁移的内容:
airflow_ai_sdk|airflow-ai-sdk|ai_sdk|@task\.llm|@task\.agent|@task\.llm_branch|@task\.embed
从结果中收集:
  1. 所有导入
    airflow-ai-sdk
    /
    airflow_ai_sdk
    的文件
  2. 当前使用的装饰器类型:
    @task.llm
    @task.agent
    @task.llm_branch
    @task.embed
  3. 模型配置模式(如
    "gpt-5"
    这类字符串名称,或
    OpenAIModel(...)
    对象)
  4. 任何用作
    output_type
    airflow_ai_sdk.BaseModel
    子类
根据上述清单执行后续步骤。

Step 1: Update requirements.txt

步骤1:更新requirements.txt

Remove:
airflow-ai-sdk[openai]
移除以下内容:
airflow-ai-sdk[openai]

or any variant: airflow-ai-sdk[openai]==0.1.7, airflow-ai-sdk[anthropic], etc.

或其变体形式:airflow-ai-sdk[openai]==0.1.7、airflow-ai-sdk[anthropic]等


**Add:**
apache-airflow-providers-common-ai[openai]>=0.1.0

Use the latest available 0.x version unless the user has pinned a specific one. Available extras match the LLM provider: `[openai]`, `[anthropic]`, `[google]`, `[bedrock]`, `[groq]`, `[mistral]`, `[mcp]`.

Keep `sentence-transformers` and `torch` if the project uses embeddings (they now run via plain `@task` instead of `@task.embed`).

---

**添加以下内容:**
apache-airflow-providers-common-ai[openai]>=0.1.0

除非用户指定了特定版本,否则请使用最新的0.x版本。可用的扩展包与LLM提供者对应:`[openai]`、`[anthropic]`、`[google]`、`[bedrock]`、`[groq]`、`[mistral]`、`[mcp]`。

如果项目使用嵌入功能,请保留`sentence-transformers`和`torch`(它们现在通过普通`@task`运行,而非`@task.embed`)。

---

Step 2: Create PydanticAI connection

步骤2:创建PydanticAI连接

The new provider uses an Airflow connection instead of model strings or objects in code.
Connection type:
pydanticai
Default connection ID:
pydanticai_default
新提供者使用Airflow连接替代代码中的模型字符串或对象。
连接类型:
pydanticai
默认连接ID:
pydanticai_default

Via environment variable (.env)

通过环境变量(.env)配置

bash
AIRFLOW_CONN_PYDANTICAI_DEFAULT='{
    "conn_type": "pydanticai",
    "password": "<api-key>",
    "extra": {
        "model": "<provider>:<model-name>"
    }
}'
bash
AIRFLOW_CONN_PYDANTICAI_DEFAULT='{
    "conn_type": "pydanticai",
    "password": "<api-key>",
    "extra": {
        "model": "<provider>:<model-name>"
    }
}'

Model format

模型格式

The model field uses
provider:model
format:
ProviderExample model value
OpenAI
openai:gpt-5
Anthropic
anthropic:claude-sonnet-4-20250514
Google
google:gemini-2.5-pro
Groq
groq:llama-3.3-70b-versatile
Mistral
mistral:mistral-large-latest
Bedrock
bedrock:us.anthropic.claude-sonnet-4-20250514-v1:0
模型字段采用
provider:model
格式:
提供者模型值示例
OpenAI
openai:gpt-5
Anthropic
anthropic:claude-sonnet-4-20250514
Google
google:gemini-2.5-pro
Groq
groq:llama-3.3-70b-versatile
Mistral
mistral:mistral-large-latest
Bedrock
bedrock:us.anthropic.claude-sonnet-4-20250514-v1:0

Custom endpoints (Ollama, vLLM, Snowflake Cortex, etc.)

自定义端点(Ollama、vLLM、Snowflake Cortex等)

Set
host
to the base URL:
bash
AIRFLOW_CONN_PYDANTICAI_CORTEX='{
    "conn_type": "pydanticai",
    "password": "<api-key>",
    "host": "https://my-endpoint.com/v1",
    "extra": {
        "model": "openai:<model-name>"
    }
}'
Use the
openai:
prefix for any OpenAI-compatible API, regardless of the actual provider.
host
设置为基础URL:
bash
AIRFLOW_CONN_PYDANTICAI_CORTEX='{
    "conn_type": "pydanticai",
    "password": "<api-key>",
    "host": "https://my-endpoint.com/v1",
    "extra": {
        "model": "openai:<model-name>"
    }
}'
对于任何兼容OpenAI的API,无论实际提供者是谁,都使用
openai:
前缀。

Connection ID convention

连接ID命名规则

The env var name determines the connection ID:
  • AIRFLOW_CONN_PYDANTICAI_DEFAULT
    creates
    pydanticai_default
  • AIRFLOW_CONN_PYDANTICAI_CORTEX
    creates
    pydanticai_cortex
环境变量名称决定连接ID:
  • AIRFLOW_CONN_PYDANTICAI_DEFAULT
    创建
    pydanticai_default
  • AIRFLOW_CONN_PYDANTICAI_CORTEX
    创建
    pydanticai_cortex

Model resolution priority

模型解析优先级

  1. model_id
    parameter on the decorator/operator (highest)
  2. model
    in connection's extra JSON (fallback)

  1. 装饰器/操作符上的
    model_id
    参数(优先级最高)
  2. 连接extra JSON中的
    model
    (备选)

Step 3: Migrate decorators

步骤3:迁移装饰器

@task.llm

@task.llm

python
undefined
python
undefined

BEFORE (airflow-ai-sdk)

迁移前(airflow-ai-sdk)

import airflow_ai_sdk as ai_sdk
class MyOutput(ai_sdk.BaseModel): field: str
@task.llm( model="gpt-5", # or model=OpenAIModel(...) system_prompt="You are helpful.", output_type=MyOutput, ) def my_task(text: str) -> str: return text
import airflow_ai_sdk as ai_sdk
class MyOutput(ai_sdk.BaseModel): field: str
@task.llm( model="gpt-5", # 或 model=OpenAIModel(...) system_prompt="You are helpful.", output_type=MyOutput, ) def my_task(text: str) -> str: return text

AFTER (apache-airflow-providers-common-ai)

迁移后(apache-airflow-providers-common-ai)

from pydantic import BaseModel
class MyOutput(BaseModel): field: str
@task.llm( llm_conn_id="pydanticai_default", # Airflow connection ID system_prompt="You are helpful.", output_type=MyOutput, ) def my_task(text: str) -> str: return text

**Parameter mapping:**

| airflow-ai-sdk | common-ai provider | Notes |
|----------------|-------------------|-------|
| `model="gpt-5"` | `llm_conn_id="pydanticai_default"` | Model specified in connection |
| `model=OpenAIModel(...)` | `llm_conn_id="pydanticai_default"` | Model + endpoint in connection |
| `system_prompt="..."` | `system_prompt="..."` | Unchanged |
| `output_type=MyModel` | `output_type=MyModel` | Unchanged |
| `result_type=MyModel` | `output_type=MyModel` | `result_type` was already deprecated |
| (not available) | `model_id="openai:gpt-5"` | Override connection's model |
| (not available) | `require_approval=True` | Built-in HITL review |
| (not available) | `agent_params={...}` | Extra kwargs for pydantic-ai Agent |
from pydantic import BaseModel
class MyOutput(BaseModel): field: str
@task.llm( llm_conn_id="pydanticai_default", # Airflow连接ID system_prompt="You are helpful.", output_type=MyOutput, ) def my_task(text: str) -> str: return text

**参数映射:**

| airflow-ai-sdk | common-ai提供者 | 说明 |
|----------------|-------------------|-------|
| `model="gpt-5"` | `llm_conn_id="pydanticai_default"` | 模型在连接中指定 |
| `model=OpenAIModel(...)` | `llm_conn_id="pydanticai_default"` | 模型与端点在连接中配置 |
| `system_prompt="..."` | `system_prompt="..."` | 保持不变 |
| `output_type=MyModel` | `output_type=MyModel` | 保持不变 |
| `result_type=MyModel` | `output_type=MyModel` | `result_type`已被弃用 |
| (无此参数) | `model_id="openai:gpt-5"` | 覆盖连接中的模型 |
| (无此参数) | `require_approval=True` | 内置人工审核功能 |
| (无此参数) | `agent_params={...}` | 传递给pydantic-ai Agent的额外参数 |

@task.llm_branch

@task.llm_branch

python
undefined
python
undefined

BEFORE

迁移前

@task.llm_branch( model="gpt-5", system_prompt="Choose a team...", allow_multiple_branches=False, ) def route(text: str) -> str: return text
@task.llm_branch( model="gpt-5", system_prompt="Choose a team...", allow_multiple_branches=False, ) def route(text: str) -> str: return text

AFTER

迁移后

@task.llm_branch( llm_conn_id="pydanticai_default", system_prompt="Choose a team...", allow_multiple_branches=False, # same parameter, unchanged ) def route(text: str) -> str: return text

Only change: `model=` becomes `llm_conn_id=`.
@task.llm_branch( llm_conn_id="pydanticai_default", system_prompt="Choose a team...", allow_multiple_branches=False, # 参数保持不变 ) def route(text: str) -> str: return text

仅需修改:将`model=`替换为`llm_conn_id=`。

@task.agent

@task.agent

This has the biggest API change. The Agent is no longer pre-built in user code.
python
undefined
此装饰器的API变更最大。Agent不再需要在用户代码中预构建。
python
undefined

BEFORE (airflow-ai-sdk) - Agent built at module level

迁移前(airflow-ai-sdk)- 在模块级别构建Agent

from pydantic_ai import Agent
my_agent = Agent( "gpt-5", system_prompt="You are a research assistant.", tools=[search_tool, lookup_tool], )
@task.agent(agent=my_agent) def research(question: str) -> str: return question
from pydantic_ai import Agent
my_agent = Agent( "gpt-5", system_prompt="You are a research assistant.", tools=[search_tool, lookup_tool], )
@task.agent(agent=my_agent) def research(question: str) -> str: return question

AFTER (common-ai provider) - No Agent object, config via parameters

迁移后(common-ai提供者)- 无需Agent对象,通过参数配置

@task.agent( llm_conn_id="pydanticai_default", system_prompt="You are a research assistant.", agent_params={"tools": [search_tool, lookup_tool]}, ) def research(question: str) -> str: return question

**Parameter mapping:**

| airflow-ai-sdk | common-ai provider | Notes |
|----------------|-------------------|-------|
| `agent=Agent(model, ...)` | `llm_conn_id="..."` | Model from connection |
| Agent's `system_prompt` | `system_prompt="..."` | Now a decorator param |
| Agent's `tools=[...]` | `agent_params={"tools": [...]}` | Tools via agent_params dict |
| Agent's `output_type` | `output_type=MyModel` | Now a decorator param |
| (not available) | `toolsets=[...]` | pydantic-ai 1.x Toolset objects |
| (not available) | `durable=True` | Step-level caching |
| (not available) | `enable_hitl_review=True` | Iterative human review loop |

**Key insight:** Everything that was configured on the `Agent()` constructor now goes into either a top-level decorator parameter or `agent_params`. The `agent_params` dict is passed directly to pydantic-ai's `Agent` constructor.
@task.agent( llm_conn_id="pydanticai_default", system_prompt="You are a research assistant.", agent_params={"tools": [search_tool, lookup_tool]}, ) def research(question: str) -> str: return question

**参数映射:**

| airflow-ai-sdk | common-ai提供者 | 说明 |
|----------------|-------------------|-------|
| `agent=Agent(model, ...)` | `llm_conn_id="..."` | 模型来自连接配置 |
| Agent的`system_prompt` | `system_prompt="..."` | 现在是装饰器参数 |
| Agent的`tools=[...]` | `agent_params={"tools": [...]}` | 通过agent_params字典配置工具 |
| Agent的`output_type` | `output_type=MyModel` | 现在是装饰器参数 |
| (无此参数) | `toolsets=[...]` | pydantic-ai 1.x Toolset对象 |
| (无此参数) | `durable=True` | 步骤级缓存 |
| (无此参数) | `enable_hitl_review=True` | 迭代式人工审核循环 |

**核心要点:** 之前在`Agent()`构造函数中配置的所有内容,现在都要放在顶层装饰器参数或`agent_params`中。`agent_params`字典会直接传递给pydantic-ai的`Agent`构造函数。

@task.embed (NO EQUIVALENT)

@task.embed(无替代方案)

The new provider does NOT include an embed decorator. Replace with a plain
@task
:
python
undefined
新提供者不包含embed装饰器,请替换为普通的
@task
python
undefined

BEFORE (airflow-ai-sdk)

迁移前(airflow-ai-sdk)

@task.embed( model_name="all-MiniLM-L6-v2", encode_kwargs={"normalize_embeddings": True}, max_active_tis_per_dagrun=1, ) def embed_text(text: str) -> str: return text
@task.embed( model_name="all-MiniLM-L6-v2", encode_kwargs={"normalize_embeddings": True}, max_active_tis_per_dagrun=1, ) def embed_text(text: str) -> str: return text

AFTER (plain @task with sentence-transformers)

迁移后(使用sentence-transformers的普通@task)

@task(max_active_tis_per_dagrun=1) def embed_text(text: str) -> list[float]: from sentence_transformers import SentenceTransformer model = SentenceTransformer("all-MiniLM-L6-v2") return model.encode(text, normalize_embeddings=True).tolist()

Note: The model is loaded on each task execution. For small workloads this is fine. For large batches, consider embedding all texts in a single task instead of using `.expand()`.

---
@task(max_active_tis_per_dagrun=1) def embed_text(text: str) -> list[float]: from sentence_transformers import SentenceTransformer model = SentenceTransformer("all-MiniLM-L6-v2") return model.encode(text, normalize_embeddings=True).tolist()

注意:模型会在每次任务执行时加载。对于小型工作负载,此方式可行。对于大规模批量任务,建议在单个任务中完成所有文本嵌入,而非使用`.expand()`。

---

Step 4: Update imports

步骤4:更新导入语句

Old importNew import
import airflow_ai_sdk as ai_sdk
Remove entirely
from airflow_ai_sdk import BaseModel
from pydantic import BaseModel
from airflow_ai_sdk.models.base import BaseModel
from pydantic import BaseModel
class Foo(ai_sdk.BaseModel):
class Foo(BaseModel):
from pydantic_ai import Agent
Remove if Agent was only used for
@task.agent
from pydantic_ai.models.openai import OpenAIModel
Remove (model config in connection now)
The
@task.llm
,
@task.agent
,
@task.llm_branch
decorators are auto-registered by the provider. No explicit import needed beyond
from airflow.sdk import task
.
pydantic_ai
imports for non-decorator usage (e.g.,
BinaryContent
for multimodal) are still valid since the new provider depends on
pydantic-ai-slim>=1.34.0
.

旧导入语句新导入语句
import airflow_ai_sdk as ai_sdk
完全移除
from airflow_ai_sdk import BaseModel
from pydantic import BaseModel
from airflow_ai_sdk.models.base import BaseModel
from pydantic import BaseModel
class Foo(ai_sdk.BaseModel):
class Foo(BaseModel):
from pydantic_ai import Agent
若Agent仅用于
@task.agent
则移除
from pydantic_ai.models.openai import OpenAIModel
移除(模型配置现在在连接中)
@task.llm
@task.agent
@task.llm_branch
装饰器由提供者自动注册,只需
from airflow.sdk import task
即可,无需额外导入。
非装饰器场景下的
pydantic_ai
导入(如用于多模态的
BinaryContent
)仍然有效,因为新提供者依赖
pydantic-ai-slim>=1.34.0

Step 5: Update connections.yaml (if used for local testing)

步骤5:更新connections.yaml(若用于本地测试)

yaml
pydanticai_default:
  conn_type: pydanticai
  password: <api-key>
  extra:
    model: "openai:gpt-5"
For custom endpoints:
yaml
pydanticai_cortex:
  conn_type: pydanticai
  password: <api-key>
  host: https://my-endpoint.com/v1
  extra:
    model: "openai:llama3.1-8b"

yaml
pydanticai_default:
  conn_type: pydanticai
  password: <api-key>
  extra:
    model: "openai:gpt-5"
自定义端点配置:
yaml
pydanticai_cortex:
  conn_type: pydanticai
  password: <api-key>
  host: https://my-endpoint.com/v1
  extra:
    model: "openai:llama3.1-8b"

Step 6: Clean up env vars

步骤6:清理环境变量

The new provider reads model config from the
pydanticai
connection, so env vars that previously fed the model in code are usually redundant. Before removing any of them, grep the project (and any sibling scripts/services) to confirm nothing else still references them:
OPENAI_API_KEY|OPENAI_BASE_URL|ANTHROPIC_API_KEY|GOOGLE_API_KEY
Candidates for removal only if no other code references them:
  • OPENAI_API_KEY
    (now in the pydanticai connection's password field)
  • OPENAI_BASE_URL
    (now in the connection's host field)
  • Custom model name vars (now in the connection's extra.model)
If anything outside the migrated DAGs still uses them (other DAGs not yet migrated, helper scripts, non-Airflow services sharing the
.env
), leave them in place.
Keep
AIRFLOW_CONN_*
env vars for all connections.

新提供者从
pydanticai
连接读取模型配置,因此之前在代码中用于模型配置的环境变量通常已冗余。移除前,请通过Grep排查项目(及相关脚本/服务),确认没有其他地方仍在引用这些变量:
OPENAI_API_KEY|OPENAI_BASE_URL|ANTHROPIC_API_KEY|GOOGLE_API_KEY
仅当无其他代码引用时可移除的变量:
  • OPENAI_API_KEY
    (现在存储在pydanticai连接的password字段中)
  • OPENAI_BASE_URL
    (现在存储在连接的host字段中)
  • 自定义模型名称变量(现在存储在连接的extra.model中)
如果迁移后的DAG之外还有其他内容在使用这些变量(如未迁移的其他DAG、辅助脚本、共享
.env
的非Airflow服务),请保留它们。
请保留所有连接对应的
AIRFLOW_CONN_*
环境变量。

Step 7: Verify

步骤7:验证

After migration, grep the codebase to confirm no stale references remain:
airflow_ai_sdk|airflow-ai-sdk|ai_sdk\.BaseModel|from pydantic_ai import Agent|from pydantic_ai.models
Verify:
  • No imports from
    airflow_ai_sdk
  • No
    Agent()
    objects created for
    @task.agent
    (unless used outside decorators)
  • No
    model=
    parameter on LLM decorators (should be
    llm_conn_id=
    )
  • All
    @task.embed
    replaced with plain
    @task
  • pydanticai
    connection configured in
    .env
    or connections.yaml
  • requirements.txt
    has
    apache-airflow-providers-common-ai[...]
    instead of
    airflow-ai-sdk[...]

迁移完成后,通过Grep排查代码库,确认无残留引用:
airflow_ai_sdk|airflow-ai-sdk|ai_sdk\.BaseModel|from pydantic_ai import Agent|from pydantic_ai.models
验证项:
  • airflow_ai_sdk
    相关导入
  • 无用于
    @task.agent
    Agent()
    对象(除非在装饰器外使用)
  • LLM装饰器上无
    model=
    参数(应替换为
    llm_conn_id=
  • 所有
    @task.embed
    已替换为普通
    @task
  • .env
    或connections.yaml中已配置
    pydanticai
    连接
  • requirements.txt
    中包含
    apache-airflow-providers-common-ai[...]
    ,而非
    airflow-ai-sdk[...]

Quick reference: New features in common-ai provider

快速参考:common-ai提供者的新特性

These features are available after migration but have no airflow-ai-sdk equivalent:
FeatureParameterDescription
HITL approval
require_approval=True
on
@task.llm
Pause for human review before returning
HITL review loop
enable_hitl_review=True
on
@task.agent
Iterative review with regeneration
Durable execution
durable=True
on
@task.agent
Step-level caching for resilience
Tool logging
enable_tool_logging=True
on
@task.agent
INFO-level tool call logs (default: on)
Model override
model_id="openai:gpt-5"
Override connection's model per-task
File analysis
@task.llm_file_analysis
Analyze files/images via ObjectStoragePath
NL-to-SQL
@task.llm_sql
Generate SQL from natural language
以下特性在迁移后可用,但airflow-ai-sdk中无对应功能:
特性参数描述
人工审核
@task.llm
上设置
require_approval=True
返回结果前暂停以进行人工审核
人工审核循环
@task.agent
上设置
enable_hitl_review=True
带重新生成功能的迭代式审核
持久化执行
@task.agent
上设置
durable=True
用于恢复能力的步骤级缓存
工具日志
@task.agent
上设置
enable_tool_logging=True
INFO级工具调用日志(默认开启)
模型覆盖
model_id="openai:gpt-5"
按任务覆盖连接中的模型
文件分析
@task.llm_file_analysis
通过ObjectStoragePath分析文件/图片
自然语言转SQL
@task.llm_sql
从自然语言生成SQL