Streaming

Agent provides a normalized streaming interface that works consistently across all providers. Stream responses for real-time output and better user experience.

Basic Streaming

Synchronous

from agent import Agent

agent = Agent(provider="anthropic", model="claude-sonnet")

for event in agent.stream("Write a short story about a robot"):
    if event.type == "text_delta":
        print(event.text, end="", flush=True)
print()  # Final newline

Asynchronous

import asyncio

async def main():
    agent = Agent(provider="openai", model="gpt-4o")
    
    async for event in await agent.stream_async("Explain quantum physics"):
        if event.type == "text_delta":
            print(event.text, end="", flush=True)
    print()

asyncio.run(main())

Stream Events

Agent normalizes events across all providers:

Event Types

from agent import StreamEvent

for event in agent.stream("Hello"):
    match event.type:
        case "message_start":
            print("Response started")
        
        case "text_delta":
            print(event.text, end="")
        
        case "tool_call_start":
            print(f"Calling tool: {event.tool_call.name}")
        
        case "tool_call_delta":
            print(f"Tool args: {event.tool_call_delta}")
        
        case "usage":
            print(f"Tokens: {event.usage.total_tokens}")
        
        case "message_end":
            print("Response complete")
        
        case "error":
            print(f"Error: {event.error}")

Event Properties

event.type          # StreamEventType
event.text          # str | None - for text_delta
event.tool_call     # ToolCall | None - for tool_call_start
event.tool_call_delta  # dict | None - for tool_call_delta
event.usage         # Usage | None - for usage/message_end
event.error         # str | None - for error
event.raw           # Any - provider's raw event

StreamResponse

The stream response accumulates data as you iterate:

stream = agent.stream("Write a poem")

# Iterate through events
for event in stream:
    if event.type == "text_delta":
        print(event.text, end="")

# After iteration, access accumulated data
print(f"\nFull text: {stream.text}")
print(f"Tool calls: {stream.tool_calls}")
print(f"Usage: {stream.usage}")

Collect All Events

# Consume stream and get final state
stream = agent.stream("Hello").collect()
print(stream.text)  # Full response text

Streaming with Tools

Tool calls are streamed as events:

from agent import Agent, tool

@tool
def calculate(expression: str) -> str:
    return str(eval(expression))

agent = Agent(
    provider="openai",
    model="gpt-4o",
    tools=[calculate],
)

for event in agent.stream("What's 25 * 4 + 10?"):
    match event.type:
        case "text_delta":
            print(event.text, end="")
        case "tool_call_start":
            print(f"\n[Calling {event.tool_call.name}...]")
        case "message_end":
            print("\n[Done]")

Session Streaming

Stream within multi-turn conversations:

session = agent.session()

session.run("My name is Alice")

# Stream response, history updated after completion
for event in session.stream("Tell me a story about someone with my name"):
    if event.type == "text_delta":
        print(event.text, end="")

# History now includes the streamed response
print(f"\nMessages: {len(session.messages)}")

Router Streaming

Stream with fallback support:

from agent import Agent, AgentRouter

router = AgentRouter(
    agents=[
        Agent(provider="anthropic", model="claude-sonnet"),
        Agent(provider="openai", model="gpt-4o"),
    ],
    strategy="fallback",
)

# Falls back if first provider fails to connect
for event in router.stream("Write a haiku"):
    if event.type == "text_delta":
        print(event.text, end="")

Web Streaming (FastAPI)

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from agent import Agent

app = FastAPI()
agent = Agent(provider="openai", model="gpt-4o")

@app.post("/chat")
async def chat(prompt: str):
    async def generate():
        async for event in await agent.stream_async(prompt):
            if event.type == "text_delta":
                yield f"data: {event.text}\n\n"
        yield "data: [DONE]\n\n"
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream"
    )

WebSocket Streaming

from fastapi import FastAPI, WebSocket
from agent import Agent

app = FastAPI()
agent = Agent(provider="anthropic", model="claude-sonnet")

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    
    while True:
        prompt = await websocket.receive_text()
        
        async for event in await agent.stream_async(prompt):
            if event.type == "text_delta":
                await websocket.send_text(event.text)
            elif event.type == "message_end":
                await websocket.send_text("[END]")

Progress Indicators

Show progress during streaming:

import sys

stream = agent.stream("Write a long analysis")
char_count = 0

for event in stream:
    if event.type == "text_delta":
        char_count += len(event.text)
        sys.stdout.write(event.text)
        sys.stdout.flush()
    elif event.type == "message_end":
        print(f"\n\n[{char_count} characters, {stream.usage.total_tokens} tokens]")

Error Handling

from agent.errors import AgentError

try:
    for event in agent.stream("Hello"):
        if event.type == "error":
            print(f"Stream error: {event.error}")
            break
        if event.type == "text_delta":
            print(event.text, end="")
except AgentError as e:
    print(f"Connection error: {e}")

Configuration

response = agent.stream(
    "Write something",
    temperature=0.9,
    max_tokens=500,
    system="Be creative and verbose.",
)

Best Practices

1. Always Handle Events

# Good - handle relevant events
for event in agent.stream(prompt):
    if event.type == "text_delta":
        process_text(event.text)
    elif event.type == "error":
        handle_error(event.error)

# Bad - assume only text
for event in agent.stream(prompt):
    print(event.text)  # May be None for non-text events

2. Use Flush for Real-time Output

# Good - immediate output
print(event.text, end="", flush=True)

# Bad - may buffer
print(event.text, end="")

3. Clean Up Resources

# Async context manager for cleanup
async with agent.stream_async(prompt) as stream:
    async for event in stream:
        process(event)

4. Handle Connection Failures

import time
from agent.errors import ProviderError

def stream_with_retry(agent, prompt, max_retries=3):
    for attempt in range(max_retries):
        try:
            for event in agent.stream(prompt):
                yield event
            return
        except ProviderError:
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)
            else:
                raise

Next Steps

  • Sessions - Streaming in conversations

  • Tools - Streaming tool calls

  • Routing - Streaming with fallback