Migrate airflow-ai-sdk to apache-airflow-providers-common-ai
This skill migrates Airflow projects from
to
apache-airflow-providers-common-ai
(0.1.0+), the official Airflow AI provider built on PydanticAI.
CRITICAL: The new provider requires
Airflow 3.0+ and
pydantic-ai-slim >= 1.34.0. The API surface has changed: LLM configuration moves from code (model strings/objects) to Airflow connections (
type). There is no
in the new provider.
Before starting
Use the Grep tool with the pattern below to inventory everything that needs to migrate:
airflow_ai_sdk|airflow-ai-sdk|ai_sdk|@task\.llm|@task\.agent|@task\.llm_branch|@task\.embed
From the results, capture:
- All files importing /
- Which decorators are in use: , , ,
- The model configuration pattern (string names like , or objects)
- Any subclasses used as
Use this inventory to drive the steps below.
Step 1: Update requirements.txt
Remove:
airflow-ai-sdk[openai]
# or any variant: airflow-ai-sdk[openai]==0.1.7, airflow-ai-sdk[anthropic], etc.
Add:
apache-airflow-providers-common-ai[openai]>=0.1.0
Use the latest available 0.x version unless the user has pinned a specific one. Available extras match the LLM provider:
,
,
,
,
,
,
.
Keep
and
if the project uses embeddings (they now run via plain
instead of
).
Step 2: Create PydanticAI connection
The new provider uses an Airflow connection instead of model strings or objects in code.
Connection type:
Default connection ID:
Via environment variable (.env)
bash
AIRFLOW_CONN_PYDANTICAI_DEFAULT='{
"conn_type": "pydanticai",
"password": "<api-key>",
"extra": {
"model": "<provider>:<model-name>"
}
}'
Model format
The model field uses
format:
| Provider | Example model value |
|---|
| OpenAI | |
| Anthropic | anthropic:claude-sonnet-4-20250514
|
| Google | |
| Groq | groq:llama-3.3-70b-versatile
|
| Mistral | mistral:mistral-large-latest
|
| Bedrock | bedrock:us.anthropic.claude-sonnet-4-20250514-v1:0
|
Custom endpoints (Ollama, vLLM, Snowflake Cortex, etc.)
bash
AIRFLOW_CONN_PYDANTICAI_CORTEX='{
"conn_type": "pydanticai",
"password": "<api-key>",
"host": "https://my-endpoint.com/v1",
"extra": {
"model": "openai:<model-name>"
}
}'
Use the
prefix for any OpenAI-compatible API, regardless of the actual provider.
Connection ID convention
The env var name determines the connection ID:
AIRFLOW_CONN_PYDANTICAI_DEFAULT
creates
AIRFLOW_CONN_PYDANTICAI_CORTEX
creates
Model resolution priority
- parameter on the decorator/operator (highest)
- in connection's extra JSON (fallback)
Step 3: Migrate decorators
@task.llm
python
# BEFORE (airflow-ai-sdk)
import airflow_ai_sdk as ai_sdk
class MyOutput(ai_sdk.BaseModel):
field: str
@task.llm(
model="gpt-5", # or model=OpenAIModel(...)
system_prompt="You are helpful.",
output_type=MyOutput,
)
def my_task(text: str) -> str:
return text
# AFTER (apache-airflow-providers-common-ai)
from pydantic import BaseModel
class MyOutput(BaseModel):
field: str
@task.llm(
llm_conn_id="pydanticai_default", # Airflow connection ID
system_prompt="You are helpful.",
output_type=MyOutput,
)
def my_task(text: str) -> str:
return text
Parameter mapping:
| airflow-ai-sdk | common-ai provider | Notes |
|---|
| llm_conn_id="pydanticai_default"
| Model specified in connection |
| llm_conn_id="pydanticai_default"
| Model + endpoint in connection |
| | Unchanged |
| | Unchanged |
| | was already deprecated |
| (not available) | | Override connection's model |
| (not available) | | Built-in HITL review |
| (not available) | | Extra kwargs for pydantic-ai Agent |
@task.llm_branch
python
# BEFORE
@task.llm_branch(
model="gpt-5",
system_prompt="Choose a team...",
allow_multiple_branches=False,
)
def route(text: str) -> str:
return text
# AFTER
@task.llm_branch(
llm_conn_id="pydanticai_default",
system_prompt="Choose a team...",
allow_multiple_branches=False, # same parameter, unchanged
)
def route(text: str) -> str:
return text
@task.agent
This has the biggest API change. The Agent is no longer pre-built in user code.
python
# BEFORE (airflow-ai-sdk) - Agent built at module level
from pydantic_ai import Agent
my_agent = Agent(
"gpt-5",
system_prompt="You are a research assistant.",
tools=[search_tool, lookup_tool],
)
@task.agent(agent=my_agent)
def research(question: str) -> str:
return question
# AFTER (common-ai provider) - No Agent object, config via parameters
@task.agent(
llm_conn_id="pydanticai_default",
system_prompt="You are a research assistant.",
agent_params={"tools": [search_tool, lookup_tool]},
)
def research(question: str) -> str:
return question
Parameter mapping:
| airflow-ai-sdk | common-ai provider | Notes |
|---|
| | Model from connection |
| Agent's | | Now a decorator param |
| Agent's | agent_params={"tools": [...]}
| Tools via agent_params dict |
| Agent's | | Now a decorator param |
| (not available) | | pydantic-ai 1.x Toolset objects |
| (not available) | | Step-level caching |
| (not available) | | Iterative human review loop |
Key insight: Everything that was configured on the
constructor now goes into either a top-level decorator parameter or
. The
dict is passed directly to pydantic-ai's
constructor.
@task.embed (NO EQUIVALENT)
The new provider does NOT include an embed decorator. Replace with a plain
:
python
# BEFORE (airflow-ai-sdk)
@task.embed(
model_name="all-MiniLM-L6-v2",
encode_kwargs={"normalize_embeddings": True},
max_active_tis_per_dagrun=1,
)
def embed_text(text: str) -> str:
return text
# AFTER (plain @task with sentence-transformers)
@task(max_active_tis_per_dagrun=1)
def embed_text(text: str) -> list[float]:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-MiniLM-L6-v2")
return model.encode(text, normalize_embeddings=True).tolist()
Note: The model is loaded on each task execution. For small workloads this is fine. For large batches, consider embedding all texts in a single task instead of using
.
Step 4: Update imports
| Old import | New import |
|---|
import airflow_ai_sdk as ai_sdk
| Remove entirely |
from airflow_ai_sdk import BaseModel
| from pydantic import BaseModel
|
from airflow_ai_sdk.models.base import BaseModel
| from pydantic import BaseModel
|
class Foo(ai_sdk.BaseModel):
| |
from pydantic_ai import Agent
| Remove if Agent was only used for |
from pydantic_ai.models.openai import OpenAIModel
| Remove (model config in connection now) |
The
,
,
decorators are auto-registered by the provider. No explicit import needed beyond
from airflow.sdk import task
.
imports for non-decorator usage (e.g.,
for multimodal) are still valid since the new provider depends on
.
Step 5: Update connections.yaml (if used for local testing)
yaml
pydanticai_default:
conn_type: pydanticai
password: <api-key>
extra:
model: "openai:gpt-5"
For custom endpoints:
yaml
pydanticai_cortex:
conn_type: pydanticai
password: <api-key>
host: https://my-endpoint.com/v1
extra:
model: "openai:llama3.1-8b"
Step 6: Clean up env vars
The new provider reads model config from the
connection, so env vars that previously fed the model in code are usually redundant. Before removing any of them, grep the project (and any sibling scripts/services) to confirm nothing else still references them:
OPENAI_API_KEY|OPENAI_BASE_URL|ANTHROPIC_API_KEY|GOOGLE_API_KEY
Candidates for removal only if no other code references them:
- (now in the pydanticai connection's password field)
- (now in the connection's host field)
- Custom model name vars (now in the connection's extra.model)
If anything outside the migrated DAGs still uses them (other DAGs not yet migrated, helper scripts, non-Airflow services sharing the
), leave them in place.
Keep env vars for all connections.
Step 7: Verify
After migration, grep the codebase to confirm no stale references remain:
airflow_ai_sdk|airflow-ai-sdk|ai_sdk\.BaseModel|from pydantic_ai import Agent|from pydantic_ai.models
Verify:
Quick reference: New features in common-ai provider
These features are available after migration but have no airflow-ai-sdk equivalent:
| Feature | Parameter | Description |
|---|
| HITL approval | on | Pause for human review before returning |
| HITL review loop | on | Iterative review with regeneration |
| Durable execution | on | Step-level caching for resilience |
| Tool logging | on | INFO-level tool call logs (default: on) |
| Model override | | Override connection's model per-task |
| File analysis | | Analyze files/images via ObjectStoragePath |
| NL-to-SQL | | Generate SQL from natural language |