Streaming¶
Agent provides a normalized streaming interface that works consistently across all providers. Stream responses for real-time output and better user experience.
Basic Streaming¶
Synchronous¶
from agent import Agent
agent = Agent(provider="anthropic", model="claude-sonnet")
for event in agent.stream("Write a short story about a robot"):
if event.type == "text_delta":
print(event.text, end="", flush=True)
print() # Final newline
Asynchronous¶
import asyncio
async def main():
agent = Agent(provider="openai", model="gpt-4o")
async for event in await agent.stream_async("Explain quantum physics"):
if event.type == "text_delta":
print(event.text, end="", flush=True)
print()
asyncio.run(main())
Stream Events¶
Agent normalizes events across all providers:
Event Types¶
from agent import StreamEvent
for event in agent.stream("Hello"):
match event.type:
case "message_start":
print("Response started")
case "text_delta":
print(event.text, end="")
case "tool_call_start":
print(f"Calling tool: {event.tool_call.name}")
case "tool_call_delta":
print(f"Tool args: {event.tool_call_delta}")
case "usage":
print(f"Tokens: {event.usage.total_tokens}")
case "message_end":
print("Response complete")
case "error":
print(f"Error: {event.error}")
Event Properties¶
event.type # StreamEventType
event.text # str | None - for text_delta
event.tool_call # ToolCall | None - for tool_call_start
event.tool_call_delta # dict | None - for tool_call_delta
event.usage # Usage | None - for usage/message_end
event.error # str | None - for error
event.raw # Any - provider's raw event
StreamResponse¶
The stream response accumulates data as you iterate:
stream = agent.stream("Write a poem")
# Iterate through events
for event in stream:
if event.type == "text_delta":
print(event.text, end="")
# After iteration, access accumulated data
print(f"\nFull text: {stream.text}")
print(f"Tool calls: {stream.tool_calls}")
print(f"Usage: {stream.usage}")
Collect All Events¶
# Consume stream and get final state
stream = agent.stream("Hello").collect()
print(stream.text) # Full response text
Streaming with Tools¶
Tool calls are streamed as events:
from agent import Agent, tool
@tool
def calculate(expression: str) -> str:
return str(eval(expression))
agent = Agent(
provider="openai",
model="gpt-4o",
tools=[calculate],
)
for event in agent.stream("What's 25 * 4 + 10?"):
match event.type:
case "text_delta":
print(event.text, end="")
case "tool_call_start":
print(f"\n[Calling {event.tool_call.name}...]")
case "message_end":
print("\n[Done]")
Session Streaming¶
Stream within multi-turn conversations:
session = agent.session()
session.run("My name is Alice")
# Stream response, history updated after completion
for event in session.stream("Tell me a story about someone with my name"):
if event.type == "text_delta":
print(event.text, end="")
# History now includes the streamed response
print(f"\nMessages: {len(session.messages)}")
Router Streaming¶
Stream with fallback support:
from agent import Agent, AgentRouter
router = AgentRouter(
agents=[
Agent(provider="anthropic", model="claude-sonnet"),
Agent(provider="openai", model="gpt-4o"),
],
strategy="fallback",
)
# Falls back if first provider fails to connect
for event in router.stream("Write a haiku"):
if event.type == "text_delta":
print(event.text, end="")
Web Streaming (FastAPI)¶
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from agent import Agent
app = FastAPI()
agent = Agent(provider="openai", model="gpt-4o")
@app.post("/chat")
async def chat(prompt: str):
async def generate():
async for event in await agent.stream_async(prompt):
if event.type == "text_delta":
yield f"data: {event.text}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
WebSocket Streaming¶
from fastapi import FastAPI, WebSocket
from agent import Agent
app = FastAPI()
agent = Agent(provider="anthropic", model="claude-sonnet")
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
prompt = await websocket.receive_text()
async for event in await agent.stream_async(prompt):
if event.type == "text_delta":
await websocket.send_text(event.text)
elif event.type == "message_end":
await websocket.send_text("[END]")
Progress Indicators¶
Show progress during streaming:
import sys
stream = agent.stream("Write a long analysis")
char_count = 0
for event in stream:
if event.type == "text_delta":
char_count += len(event.text)
sys.stdout.write(event.text)
sys.stdout.flush()
elif event.type == "message_end":
print(f"\n\n[{char_count} characters, {stream.usage.total_tokens} tokens]")
Error Handling¶
from agent.errors import AgentError
try:
for event in agent.stream("Hello"):
if event.type == "error":
print(f"Stream error: {event.error}")
break
if event.type == "text_delta":
print(event.text, end="")
except AgentError as e:
print(f"Connection error: {e}")
Configuration¶
response = agent.stream(
"Write something",
temperature=0.9,
max_tokens=500,
system="Be creative and verbose.",
)
Best Practices¶
1. Always Handle Events¶
# Good - handle relevant events
for event in agent.stream(prompt):
if event.type == "text_delta":
process_text(event.text)
elif event.type == "error":
handle_error(event.error)
# Bad - assume only text
for event in agent.stream(prompt):
print(event.text) # May be None for non-text events
2. Use Flush for Real-time Output¶
# Good - immediate output
print(event.text, end="", flush=True)
# Bad - may buffer
print(event.text, end="")
3. Clean Up Resources¶
# Async context manager for cleanup
async with agent.stream_async(prompt) as stream:
async for event in stream:
process(event)
4. Handle Connection Failures¶
import time
from agent.errors import ProviderError
def stream_with_retry(agent, prompt, max_retries=3):
for attempt in range(max_retries):
try:
for event in agent.stream(prompt):
yield event
return
except ProviderError:
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
else:
raise