From astronomer-data
Migrates Airflow projects from airflow-ai-sdk to apache-airflow-providers-common-ai 0.1.0+, replacing LLM decorators (@task.llm, @task.agent, @task.llm_branch, @task.embed), updating imports, requirements, and switching to pydanticai connections.
npx claudepluginhub astronomer/agents --plugin astronomer-dataThis skill uses the workspace's default tool permissions.
This skill migrates Airflow projects from `airflow-ai-sdk` to `apache-airflow-providers-common-ai` (0.1.0+), the official Airflow AI provider built on PydanticAI.
Migrates Apache Airflow 2.x DAG code to 3.x using Ruff AIR rules for auto-fixes on imports, operators, hooks, context; refactors metadata DB access via Task Execution API and fixes scheduling/config gotchas.
Migrates JavaScript/TypeScript code from @openrouter/sdk to @openrouter/agent for callModel, tool(), stop conditions, and agent features like format converters. Use when refactoring old imports.
Migrates OpenAI/Anthropic AI integrations to Mistral AI in Node.js/TypeScript projects via assessment scripts, model mapping, and feature-flagged adapters.
Share bugs, ideas, or general feedback.
This skill migrates Airflow projects from airflow-ai-sdk to apache-airflow-providers-common-ai (0.1.0+), the official Airflow AI provider built on PydanticAI.
CRITICAL: The new provider requires Airflow 3.0+ and pydantic-ai-slim >= 1.34.0. The API surface has changed: LLM configuration moves from code (model strings/objects) to Airflow connections (
pydanticaitype). There is no@task.embedin the new provider.
Use the Grep tool with the pattern below to inventory everything that needs to migrate:
airflow_ai_sdk|airflow-ai-sdk|ai_sdk|@task\.llm|@task\.agent|@task\.llm_branch|@task\.embed
From the results, capture:
airflow-ai-sdk / airflow_ai_sdk@task.llm, @task.agent, @task.llm_branch, @task.embed"gpt-5", or OpenAIModel(...) objects)airflow_ai_sdk.BaseModel subclasses used as output_typeUse this inventory to drive the steps below.
Remove:
airflow-ai-sdk[openai]
# or any variant: airflow-ai-sdk[openai]==0.1.7, airflow-ai-sdk[anthropic], etc.
Add:
apache-airflow-providers-common-ai[openai]>=0.1.0
Use the latest available 0.x version unless the user has pinned a specific one. Available extras match the LLM provider: [openai], [anthropic], [google], [bedrock], [groq], [mistral], [mcp].
Keep sentence-transformers and torch if the project uses embeddings (they now run via plain @task instead of @task.embed).
The new provider uses an Airflow connection instead of model strings or objects in code.
Connection type: pydanticai
Default connection ID: pydanticai_default
AIRFLOW_CONN_PYDANTICAI_DEFAULT='{
"conn_type": "pydanticai",
"password": "<api-key>",
"extra": {
"model": "<provider>:<model-name>"
}
}'
The model field uses provider:model format:
| Provider | Example model value |
|---|---|
| OpenAI | openai:gpt-5 |
| Anthropic | anthropic:claude-sonnet-4-20250514 |
google:gemini-2.5-pro | |
| Groq | groq:llama-3.3-70b-versatile |
| Mistral | mistral:mistral-large-latest |
| Bedrock | bedrock:us.anthropic.claude-sonnet-4-20250514-v1:0 |
Set host to the base URL:
AIRFLOW_CONN_PYDANTICAI_CORTEX='{
"conn_type": "pydanticai",
"password": "<api-key>",
"host": "https://my-endpoint.com/v1",
"extra": {
"model": "openai:<model-name>"
}
}'
Use the openai: prefix for any OpenAI-compatible API, regardless of the actual provider.
The env var name determines the connection ID:
AIRFLOW_CONN_PYDANTICAI_DEFAULT creates pydanticai_defaultAIRFLOW_CONN_PYDANTICAI_CORTEX creates pydanticai_cortexmodel_id parameter on the decorator/operator (highest)model in connection's extra JSON (fallback)# BEFORE (airflow-ai-sdk)
import airflow_ai_sdk as ai_sdk
class MyOutput(ai_sdk.BaseModel):
field: str
@task.llm(
model="gpt-5", # or model=OpenAIModel(...)
system_prompt="You are helpful.",
output_type=MyOutput,
)
def my_task(text: str) -> str:
return text
# AFTER (apache-airflow-providers-common-ai)
from pydantic import BaseModel
class MyOutput(BaseModel):
field: str
@task.llm(
llm_conn_id="pydanticai_default", # Airflow connection ID
system_prompt="You are helpful.",
output_type=MyOutput,
)
def my_task(text: str) -> str:
return text
Parameter mapping:
| airflow-ai-sdk | common-ai provider | Notes |
|---|---|---|
model="gpt-5" | llm_conn_id="pydanticai_default" | Model specified in connection |
model=OpenAIModel(...) | llm_conn_id="pydanticai_default" | Model + endpoint in connection |
system_prompt="..." | system_prompt="..." | Unchanged |
output_type=MyModel | output_type=MyModel | Unchanged |
result_type=MyModel | output_type=MyModel | result_type was already deprecated |
| (not available) | model_id="openai:gpt-5" | Override connection's model |
| (not available) | require_approval=True | Built-in HITL review |
| (not available) | agent_params={...} | Extra kwargs for pydantic-ai Agent |
# BEFORE
@task.llm_branch(
model="gpt-5",
system_prompt="Choose a team...",
allow_multiple_branches=False,
)
def route(text: str) -> str:
return text
# AFTER
@task.llm_branch(
llm_conn_id="pydanticai_default",
system_prompt="Choose a team...",
allow_multiple_branches=False, # same parameter, unchanged
)
def route(text: str) -> str:
return text
Only change: model= becomes llm_conn_id=.
This has the biggest API change. The Agent is no longer pre-built in user code.
# BEFORE (airflow-ai-sdk) - Agent built at module level
from pydantic_ai import Agent
my_agent = Agent(
"gpt-5",
system_prompt="You are a research assistant.",
tools=[search_tool, lookup_tool],
)
@task.agent(agent=my_agent)
def research(question: str) -> str:
return question
# AFTER (common-ai provider) - No Agent object, config via parameters
@task.agent(
llm_conn_id="pydanticai_default",
system_prompt="You are a research assistant.",
agent_params={"tools": [search_tool, lookup_tool]},
)
def research(question: str) -> str:
return question
Parameter mapping:
| airflow-ai-sdk | common-ai provider | Notes |
|---|---|---|
agent=Agent(model, ...) | llm_conn_id="..." | Model from connection |
Agent's system_prompt | system_prompt="..." | Now a decorator param |
Agent's tools=[...] | agent_params={"tools": [...]} | Tools via agent_params dict |
Agent's output_type | output_type=MyModel | Now a decorator param |
| (not available) | toolsets=[...] | pydantic-ai 1.x Toolset objects |
| (not available) | durable=True | Step-level caching |
| (not available) | enable_hitl_review=True | Iterative human review loop |
Key insight: Everything that was configured on the Agent() constructor now goes into either a top-level decorator parameter or agent_params. The agent_params dict is passed directly to pydantic-ai's Agent constructor.
The new provider does NOT include an embed decorator. Replace with a plain @task:
# BEFORE (airflow-ai-sdk)
@task.embed(
model_name="all-MiniLM-L6-v2",
encode_kwargs={"normalize_embeddings": True},
max_active_tis_per_dagrun=1,
)
def embed_text(text: str) -> str:
return text
# AFTER (plain @task with sentence-transformers)
@task(max_active_tis_per_dagrun=1)
def embed_text(text: str) -> list[float]:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-MiniLM-L6-v2")
return model.encode(text, normalize_embeddings=True).tolist()
Note: The model is loaded on each task execution. For small workloads this is fine. For large batches, consider embedding all texts in a single task instead of using .expand().
| Old import | New import |
|---|---|
import airflow_ai_sdk as ai_sdk | Remove entirely |
from airflow_ai_sdk import BaseModel | from pydantic import BaseModel |
from airflow_ai_sdk.models.base import BaseModel | from pydantic import BaseModel |
class Foo(ai_sdk.BaseModel): | class Foo(BaseModel): |
from pydantic_ai import Agent | Remove if Agent was only used for @task.agent |
from pydantic_ai.models.openai import OpenAIModel | Remove (model config in connection now) |
The @task.llm, @task.agent, @task.llm_branch decorators are auto-registered by the provider. No explicit import needed beyond from airflow.sdk import task.
pydantic_ai imports for non-decorator usage (e.g., BinaryContent for multimodal) are still valid since the new provider depends on pydantic-ai-slim>=1.34.0.
pydanticai_default:
conn_type: pydanticai
password: <api-key>
extra:
model: "openai:gpt-5"
For custom endpoints:
pydanticai_cortex:
conn_type: pydanticai
password: <api-key>
host: https://my-endpoint.com/v1
extra:
model: "openai:llama3.1-8b"
The new provider reads model config from the pydanticai connection, so env vars that previously fed the model in code are usually redundant. Before removing any of them, grep the project (and any sibling scripts/services) to confirm nothing else still references them:
OPENAI_API_KEY|OPENAI_BASE_URL|ANTHROPIC_API_KEY|GOOGLE_API_KEY
Candidates for removal only if no other code references them:
OPENAI_API_KEY (now in the pydanticai connection's password field)OPENAI_BASE_URL (now in the connection's host field)If anything outside the migrated DAGs still uses them (other DAGs not yet migrated, helper scripts, non-Airflow services sharing the .env), leave them in place.
Keep AIRFLOW_CONN_* env vars for all connections.
After migration, grep the codebase to confirm no stale references remain:
airflow_ai_sdk|airflow-ai-sdk|ai_sdk\.BaseModel|from pydantic_ai import Agent|from pydantic_ai.models
Verify:
airflow_ai_sdkAgent() objects created for @task.agent (unless used outside decorators)model= parameter on LLM decorators (should be llm_conn_id=)@task.embed replaced with plain @taskpydanticai connection configured in .env or connections.yamlrequirements.txt has apache-airflow-providers-common-ai[...] instead of airflow-ai-sdk[...]These features are available after migration but have no airflow-ai-sdk equivalent:
| Feature | Parameter | Description |
|---|---|---|
| HITL approval | require_approval=True on @task.llm | Pause for human review before returning |
| HITL review loop | enable_hitl_review=True on @task.agent | Iterative review with regeneration |
| Durable execution | durable=True on @task.agent | Step-level caching for resilience |
| Tool logging | enable_tool_logging=True on @task.agent | INFO-level tool call logs (default: on) |
| Model override | model_id="openai:gpt-5" | Override connection's model per-task |
| File analysis | @task.llm_file_analysis | Analyze files/images via ObjectStoragePath |
| NL-to-SQL | @task.llm_sql | Generate SQL from natural language |