diff --git a/skills/ai/SKILL.md b/skills/ai/SKILL.md index 0e3ba95..5922a36 100644 --- a/skills/ai/SKILL.md +++ b/skills/ai/SKILL.md @@ -1,10 +1,12 @@ --- name: ai -description: Python `ai` module — models, agents, hooks, MCP, structured output +description: Python `ai` SDK — models, providers, streams, events, tools, agents, hooks, MCP, AI SDK UI, structured output, and media generation --- # ai +Use this skill when working with the Python `ai` SDK. + ```bash uv add ai ``` @@ -13,269 +15,485 @@ uv add ai import ai ``` -## Quick reference +## Quick start ```python +import asyncio +import ai + + @ai.tool async def get_weather(city: str) -> str: - """Get the current weather for a city.""" + """Get current weather for a city.""" return f"Sunny, 72F in {city}" -model = ai.get_model("anthropic/claude-sonnet-4") -agent = ai.agent(tools=[get_weather]) -messages = [ - ai.system_message("You are a helpful weather assistant."), - ai.user_message("What's the weather in Tokyo?"), -] +async def main() -> None: + model = ai.get_model("gateway:anthropic/claude-sonnet-4") + agent = ai.agent(tools=[get_weather]) + + messages = [ + ai.system_message("You are a helpful weather assistant."), + ai.user_message("What's the weather in Tokyo?"), + ] + + async with agent.run(model, messages) as stream: + async for event in stream: + if isinstance(event, ai.events.TextDelta): + print(event.chunk, end="", flush=True) + + print(stream.output) -async for msg in agent.run(model, messages): - print(msg.text_delta, end="") + +if __name__ == "__main__": + asyncio.run(main()) ``` -`ai.get_model("model")` — resolve a model ID to a `Model`; omitted providers default to AI Gateway, e.g. `ai.get_model("anthropic/claude-sonnet-4")`. `ai.get_model()` reads `AI_SDK_DEFAULT_MODEL`. Use `provider:model` to target a provider directly, e.g. `ai.get_model("openai:gpt-5.4")`. Clients auto-created from env vars (`AI_GATEWAY_API_KEY`, `ANTHROPIC_API_KEY`, `OPENAI_API_KEY`). Pass `ai.Client(base_url=, api_key=)` for custom endpoints: `ai.get_model("openai:gpt-5.4", client=c)`. Provider factories remain available from `ai.providers` for custom provider setup and `provider.list_models()`. +`ai.stream(...)` and `agent.run(...)` are async context managers. Iterate events +inside the context. After iteration, read final state from the stream object. + +## Models and providers -`ai.stream(model, messages, ...)` — streaming without an agent loop. Returns `StreamResult` with `.text`, `.tool_calls`, `.output`, `.usage` after iteration. +```python +model = ai.get_model() # reads AI_SDK_DEFAULT_MODEL +model = ai.get_model("anthropic/claude-sonnet-4") # unprefixed: gateway route +model = ai.get_model("gateway:anthropic/claude-sonnet-4") +model = ai.get_model("openai:gpt-5.4") # direct provider route +model = ai.get_model("anthropic:claude-sonnet-4-6") +``` -`ai.generate(model, messages, params)` — non-streaming generation. `params` is `ImageParams` or `VideoParams`. +- Gateway credentials use `AI_GATEWAY_API_KEY`. +- Direct providers use provider-specific env vars such as `OPENAI_API_KEY` and + `ANTHROPIC_API_KEY`. +- Use `ai.get_provider(...)` when you need a custom base URL, API key, headers, + or client. +- Use `await ai.probe(model)` to check credentials and model availability. -`ai.probe(model)` — verify credentials and model availability. +```python +provider = ai.get_provider( + "openai", + base_url="http://localhost:1234/v1", + api_key="your_access_token_here", +) +model = ai.Model("local-model", provider=provider) -## Messages +models = await ai.get_provider("anthropic").list_models() +``` -Immutable Pydantic models. Use builders: +Request-scoped provider options go through `params`: + +```python +params = { + "providerOptions": { + "gateway": {"sort": "cost"}, + "anthropic": {"speed": "fast"}, + } +} + +async with ai.stream(model, messages, params=params) as stream: + async for event in stream: + ... +``` + +## Messages and events + +Messages are Pydantic models with typed parts. Use builders for common roles +and parts: ```python ai.system_message("Be concise.") -ai.user_message("Describe this image:", ai.file_part(url)) -ai.assistant_message(...) -ai.tool_message(...) # merge one or more tool-result messages/parts -ai.tool_result(...) # single ToolResultPart -ai.thinking(...) +ai.user_message("Describe this image:", ai.file_part(image_bytes, media_type="image/png")) +ai.assistant_message(ai.thinking("scratchpad"), "Final answer") +ai.tool_result_part("tc-1", result={"temp": 72}, tool_name="get_weather") +ai.tool_message(tool_call_id="tc-1", result=72, tool_name="get_weather") ``` -Key properties on streamed messages: +Common message properties: -- `msg.text_delta` — current text chunk (for live display) -- `msg.text` — full accumulated text -- `msg.tool_calls` — list of `ToolCallPart` on assistant messages -- `msg.output` — validated Pydantic instance (when using `output_type`) -- `msg.get_hook_part()` — find a hook suspension part +- `message.text`, `message.reasoning`. +- `message.tool_calls`, `message.tool_results`. +- `message.builtin_tool_calls`, `message.builtin_tool_returns`. +- `message.files`, `message.images`, `message.videos`. +- `message.get_output()` or `message.get_output(MyModel)`. -Serialize: `msg.model_dump()`. Restore: `ai.messages.Message.model_validate(data)`. +Streams and agents yield event objects from `ai.events`: -## Streaming tools +```python +async with ai.stream(model, messages, tools=tools) as stream: + async for event in stream: + if isinstance(event, ai.events.TextDelta): + print(event.chunk, end="", flush=True) + elif isinstance(event, ai.events.ToolEnd): + print(event.tool_call.tool_name, event.tool_call.tool_args) + elif isinstance(event, ai.events.ToolCallResult): + for result in event.results: + print(result.tool_name, result.result) + elif isinstance(event, ai.events.HookEvent): + print(event.hook.hook_id, event.hook.status) + elif isinstance(event, ai.events.PartialToolCallResult): + print(event.label, event.value) +``` -A regular `@ai.tool` returns the tool result directly. An async-generator tool yields intermediate values that stream to the consumer as `ai.events.PartialToolCallResult` events; an _aggregator_ decides what the model sees as the final result. +After iteration: -Declare the aggregator via the return-type annotation: +```python +stream.message # final assistant message for ai.stream +stream.messages # updated agent history for agent.run +stream.text # text output for ai.stream +stream.output # text or parsed Pydantic output +stream.tool_calls # function tool calls from ai.stream +stream.usage # latest reported usage +``` + +Serialize and restore history with Pydantic JSON: ```python -@ai.tool -async def fetch(url: str) -> ai.StreamingStatusTool[str]: - yield "connecting..." - yield "downloading..." - yield body # last yield = tool result; earlier yields = status +encoded = [message.model_dump(mode="json") for message in stream.messages] +restored = [ai.messages.Message.model_validate(item) for item in encoded] +``` -@ai.tool -async def render(prompt: str) -> ai.StreamingTextTool: - async for chunk in stream: - yield chunk # all chunks concatenated into the tool result +## Direct streaming -@ai.tool -async def research(topic: str) -> ai.SubAgentTool: - sub = ai.agent(tools=[...]) - async for event in sub.run(model, msgs): - yield event # final assistant text becomes the tool result +Use `ai.stream` when you want one model response and will handle any function +tool calls yourself: + +```python +async with ai.stream(model, messages, tools=[get_weather.tool]) as stream: + async for event in stream: + if isinstance(event, ai.events.TextDelta): + print(event.chunk, end="", flush=True) + +for call in stream.tool_calls: + print(call.tool_name, call.tool_args) ``` -These aliases expand to `Annotated[AsyncGenerator[T], ai.agents.Aggregate(Aggregator)]`. For a parameterized aggregator, write the marker directly: +Use structured output with a Pydantic model: ```python -type Joined = Annotated[AsyncGenerator[str], ai.agents.Aggregate(ai.agents.ConcatAggregator, delim="\n")] +import pydantic + + +class Forecast(pydantic.BaseModel): + city: str + temperature: float + + +async with ai.stream(model, messages, output_type=Forecast) as stream: + async for event in stream: + ... + +forecast = stream.output ``` -Or pass the factory as a keyword argument: `@ai.tool(aggregator=ai.agents.LastAggregator)`. Specifying both an annotation marker and the kwarg raises `TypeError`. +## Tools -Built-in aggregators: `ai.agents.ConcatAggregator`, `ai.agents.LastAggregator`, `ai.agents.MessageAggregator`. Subclass `ai.agents.SimpleAggregator[Item, Result]` (in/out same type) or `ai.events.Aggregator[Item, Result, ModelInput]` (separate model input) for custom ones. +A function tool is an async Python function decorated with `@ai.tool`. The +function name becomes the tool name, the docstring becomes the description, and +the signature becomes a Pydantic-validated JSON schema. -## Custom agent loops +```python +@ai.tool +async def scan_sector(sector: str, depth: int = 1) -> str: + """Scan a sector at the requested depth.""" + return f"{sector}: clear at depth {depth}" +``` -Override the default loop when you need approval gates, routing, or batching. +Use schema-only tools with `ai.stream` when the SDK should not execute them: -The loop yields `AgentEvent`s for the consumer and updates history with `context.add(message)`. Anything not added to `context.messages` won't be visible on the next turn. +```python +tool = ai.Tool( + kind="function", + name="get_weather", + args=ai.tools.FunctionToolArgs( + description="Get current weather for a city.", + params={ + "type": "object", + "properties": {"city": {"type": "string"}}, + "required": ["city"], + }, + ), +) +``` -**Concurrent form** — fan tools out as the model emits them, mirrors the default loop. +Provider-executed tools run outside your process: ```python -my_agent = ai.agent(tools=[get_weather, get_population]) +tools = [ai.providers.anthropic.tools.web_search(max_uses=3)] -@my_agent.loop -async def custom(context: ai.Context): - while context.keep_running(): - async with ( - ai.stream( - context.model, context.messages, tools=context.tools - ) as s, - ai.ToolRunner(s) as tr, - ): - async for event in ai.util.merge(s, tr.events()): - yield event - if isinstance(event, ai.events.ToolEnd): - tr.schedule(context.resolve(event.tool_call)) +async with ai.stream(model, messages, tools=tools) as stream: + async for event in stream: + if isinstance(event, ai.events.BuiltinToolResult): + print(event.result.tool_name, event.result.result) +``` + +Tool validation failures and exceptions become `ToolCallResult` events with +error result parts. The original exception is on `event.exception` for logging. - context.add(s.message) - context.add(tr.get_tool_message()) +```python +if isinstance(event, ai.events.ToolCallResult) and event.exception: + log_exception(event.exception) ``` -**Sequential form** — drain the stream, then run tools. Simpler when you need to gate tool calls before executing them: +## Streaming tools + +Async-generator tools yield partial values while they run. An aggregator turns +those values into the final tool result the model sees. + +```python +@ai.tool +async def draft_reply(topic: str) -> ai.StreamingTextTool: + """Draft a reply.""" + yield "Checking " + yield f"records for {topic}." +``` + +```python +@ai.tool +async def fetch(url: str) -> ai.StreamingStatusTool[str]: + """Fetch a URL with status updates.""" + yield "connecting" + yield "downloading" + yield body # last yield is the tool result +``` ```python -@my_agent.loop -async def custom(context: ai.Context): - while True: - s = ai.stream( - context.model, context.messages, tools=context.tools - ) - async for event in s: +@ai.tool +async def research(topic: str) -> ai.SubAgentTool: + """Research a topic with a subagent.""" + subagent = ai.agent(tools=[...]) + async with subagent.run(model, [ai.user_message(topic)]) as stream: + async for event in stream: yield event - context.add(s.message) +``` + +For custom aggregation, annotate an async-generator return type with +`Annotated[AsyncGenerator[T], ai.agents.Aggregate(...)]`. Built-in +aggregators: `ai.agents.ConcatAggregator`, `ai.agents.LastAggregator`, and +`ai.agents.MessageAggregator`. + +## Agents - tool_calls = context.resolve(s.tool_calls) - if not tool_calls: - return +Use an agent when the SDK should execute Python tools, append tool results, and +continue until the assistant returns a final answer. - async with asyncio.TaskGroup() as tg: - tasks = [tg.create_task(tc()) for tc in tool_calls] +```python +agent = ai.agent(tools=[get_weather]) - context.add(ai.tool_message(*(t.result() for t in tasks))) +async with agent.run(model, messages) as stream: + async for event in stream: + if isinstance(event, ai.events.TextDelta): + print(event.chunk, end="", flush=True) + +history = stream.messages +answer = stream.output ``` -Loop helpers: `context.model`, `context.messages`, `context.tools`, `context.resolve(s.tool_calls)`, `context.keep_running()`, `context.add(msg)`. `await tc()` executes a tool call and returns a `ToolCallResult`; pass those (or `Message`s) to `ai.tool_message(...)` to merge into one tool-result message. +Pass structured output and provider params through `agent.run`: -## Multi-agent +```python +async with agent.run( + model, + [ai.user_message("Return a JSON forecast.")], + output_type=Forecast, + params={"temperature": 0}, +) as stream: + async for event in stream: + ... + +forecast = stream.output +``` + +## Custom agent loops -Use `asyncio.gather` with `ai.yield_from(...)` and labels to run agents in parallel: +Subclass `ai.Agent` and override `loop` for custom scheduling, routing, +logging, persistence, or approval logic. ```python -async def multi(model: ai.Model, query: str) -> str: - researcher = ai.agent(tools=[t1]) - analyst = ai.agent(tools=[t2]) +from collections.abc import AsyncGenerator - r1, r2 = await asyncio.gather( + +class CustomAgent(ai.Agent): + async def loop(self, context: ai.Context) -> AsyncGenerator[ai.events.AgentEvent]: + while context.keep_running(): + async with ( + ai.stream(context=context) as stream, + ai.ToolRunner() as tool_runner, + ): + async for event in ai.util.merge(stream, tool_runner.events()): + yield event + + if isinstance(event, ai.events.ToolEnd): + tool_call = context.resolve(event.tool_call) + tool_runner.schedule(tool_call) + + context.add(stream.message) + context.add(tool_runner.get_tool_message()) +``` + +Loop helpers: `context.model`, `context.messages`, `context.tools`, +`context.output_type`, `context.params`, `context.resolve(...)`, +`context.keep_running()`, and `context.add(...)`. + +## Multi-agent + +Use `ai.SubAgentTool` for agent-as-tool workflows. Use `ai.yield_from(...)` +inside custom loops to fan out streams and forward nested events as +`PartialToolCallResult` values with labels. + +```python +async with ( + researcher.run(model, research_messages) as research_stream, + analyst.run(model, analyst_messages) as analyst_stream, +): + research_text, analyst_text = await asyncio.gather( ai.yield_from( - researcher.run(model, msgs1), + research_stream, label="researcher", aggregator=ai.agents.MessageAggregator, ), ai.yield_from( - analyst.run(model, msgs2), + analyst_stream, label="analyst", aggregator=ai.agents.MessageAggregator, ), ) - return f"{r1}\n{r2}" ``` -Each forwarded event is wrapped in `ai.events.PartialToolCallResult` carrying the `label`, so the consumer can route by source. +Route labels in the consumer: -## Hooks +```python +if isinstance(event, ai.events.PartialToolCallResult): + if event.label == "researcher": + route_research(event.value) +``` -Typed suspension points for human-in-the-loop. +## Hooks -**Tool approval (built-in shortcut).** Flag a tool with `require_approval=True` and the default loop gates each call behind an `ai.tools.ToolApproval` hook (label `approve_{tool_call_id}`, payload carries `granted` + `reason`): +Hooks are runtime suspension points. Tool approvals are the built-in workflow. ```python @ai.tool(require_approval=True) async def delete_file(path: str) -> str: + """Delete a file.""" ... +``` -# consumer-side resolve: -ai.resolve_hook(hook.hook_id, ai.tools.ToolApproval(granted=True)) +The default loop gates each call behind an approval hook with label +`approve_{tool_call_id}` and payload `ai.tools.ToolApproval`. + +```python +async with agent.run(model, messages) as stream: + async for event in stream: + if isinstance(event, ai.events.HookEvent) and event.hook.status == "pending": + ai.resolve_hook( + event.hook.hook_id, + ai.tools.ToolApproval(granted=True, reason="approved"), + ) ``` -Denial returns an error `ToolCallResult` with `result=f"Rejected: {reason}"`. For custom payloads, label schemes, or per-call gating, write a custom loop using the primitives below. +Resolve with `granted=False` to deny the call and return an error tool result. -**Manual hooks.** Inside agent code (blocks until resolved): +Manual hooks block until resolved in live flows: ```python -class Approval(pydantic.BaseModel): - granted: bool - reason: str - approval = await ai.hook( "approve_send_email", - payload=Approval, + payload=ai.tools.ToolApproval, metadata={"tool": "send_email"}, ) -if approval.granted: - await tc() ``` -From outside: +Resolve or cancel from another task, request handler, or UI callback: ```python -ai.resolve_hook("approve_send_email", {"granted": True, "reason": "User approved"}) -await ai.cancel_hook("approve_send_email") +ai.resolve_hook("approve_send_email", {"granted": True, "reason": "approved"}) +await ai.cancel_hook("approve_send_email", reason="client disconnected") ``` -Hook messages have `role="signal"` with a `HookPart`. - -**Long-running mode** (`interrupt_loop=False`, default): await blocks until resolved. Use for websocket/interactive UIs. +Hooks emit `HookEvent` objects. Their messages use `role="internal"` and contain +`HookPart` values. -**Serverless mode** (`interrupt_loop=True`): unresolved hooks cancel the run. Pre-register a resolution with `ai.resolve_hook(...)` before rerunning. - -Consuming hooks in the iterator: +Serverless resume flow: ```python -async for msg in my_agent.run(model, messages): - if msg.role == "signal" and (hook := msg.get_hook_part()): - answer = input(f"Approve {hook.hook_id}? [y/n] ") - ai.resolve_hook( - hook.hook_id, - Approval(granted=answer == "y", reason="operator"), - ) - continue - print(msg.text_delta, end="") -``` +async with agent.run(model, messages) as stream: + async for event in stream: + if isinstance(event, ai.events.HookEvent) and event.hook.status == "pending": + ai.abort_pending_hook(event.hook) + yield event -## Structured output +persist(stream.messages) -```python -class Forecast(pydantic.BaseModel): - city: str - temperature: float - -stream = await ai.stream(model, messages, output_type=Forecast) -async for msg in stream: - ... -stream.output.city +# Later, restore messages, pre-register the resolution, and rerun. +ai.resolve_hook(hook_id, ai.tools.ToolApproval(granted=True, reason="approved")) ``` ## MCP +MCP adapters return `AgentTool` objects usable in `ai.agent(...)`. + ```python tools = await ai.mcp.get_http_tools( "https://mcp.example.com/mcp", - headers={...}, + headers={"Authorization": "Bearer token"}, tool_prefix="docs", ) + tools = await ai.mcp.get_stdio_tools( - "npx", "-y", "@anthropic/mcp-server-filesystem", "/tmp", + "npx", + "-y", + "@anthropic/mcp-server-filesystem", + "/tmp", tool_prefix="fs", ) -``` -Returns `Tool` objects usable in `ai.stream(...)` or `ai.agent(...)`. +agent = ai.agent(tools=tools) +``` ## AI SDK UI adapter +Use `ai.agents.ui.ai_sdk` to convert between AI SDK UI messages and Python +runtime messages/events. + ```python -from ai.ai_sdk_ui import UI_MESSAGE_STREAM_HEADERS, to_messages, to_sse_stream +class ChatRequest(pydantic.BaseModel): + messages: list[ai.agents.ui.ai_sdk.UIMessage] + + +@app.post("/chat") +async def chat(request: ChatRequest): + messages, approvals = ai.agents.ui.ai_sdk.to_messages(request.messages) + ai.agents.ui.ai_sdk.apply_approvals(approvals) + + async def stream_response(): + async with chat_agent.run(model, messages) as stream: + async for chunk in ai.agents.ui.ai_sdk.to_sse(stream): + yield chunk + + return fastapi.responses.StreamingResponse( + stream_response(), + headers=ai.agents.ui.ai_sdk.UI_MESSAGE_STREAM_HEADERS, + ) +``` -messages = to_messages(request.messages) -return StreamingResponse( - to_sse_stream(agent.run(model, messages)), - headers=UI_MESSAGE_STREAM_HEADERS, +Use `ai.agents.ui.ai_sdk.to_ui_messages(messages)` to rebuild UI history from +stored runtime messages. + +For serverless approvals, monitor `HookEvent` before passing events to `to_sse` +and call `ai.abort_pending_hook(event.hook)` on pending hooks. + +## Media generation + +Use `ai.generate` for dedicated image and video models: + +```python +image_message = await ai.generate( + ai.get_model("gateway:google/imagen-4.0-generate-001"), + [ai.user_message("A watercolor mothership over a quiet city.")], + ai.ImageParams(n=1, aspect_ratio="16:9"), ) + +image = image_message.images[0] ``` + +For video generation, pass `ai.VideoParams(...)` and read `message.videos`.