"""
Main Agent class.
The primary interface for interacting with LLM providers.
"""
from typing import TYPE_CHECKING, Any
from pydantic import BaseModel
from agent.execution.runtime import ExecutionRuntime
from agent.messages import AgentRequest, Message
from agent.middleware import Middleware, MiddlewareChain
from agent.providers.base import BaseProvider
from agent.providers.registry import get_provider
from agent.response import AgentResponse
from agent.stream import AsyncStreamResponse, StreamResponse
from agent.tools import Tool
from agent.types.config import AgentConfig, RetryConfig
if TYPE_CHECKING:
from agent.session import Session
[docs]
class Agent:
"""
A provider-agnostic LLM agent.
The Agent class is the main entry point for interacting with LLM providers.
It provides a unified interface for text generation, streaming, structured
outputs, and tool calling across multiple providers.
Example:
```python
from agent import Agent
agent = Agent(provider="openai", model="gpt-4o")
response = agent.run("Hello, world!")
print(response.text)
```
"""
def __init__(
self,
provider: str,
model: str,
*,
api_key: str | None = None,
base_url: str | None = None,
timeout: float = 120.0,
max_retries: int = 2,
temperature: float | None = None,
max_tokens: int | None = None,
top_p: float | None = None,
tools: list[Tool] | None = None,
middleware: list[Middleware] | None = None,
default_system: str | None = None,
**kwargs: Any,
):
"""
Initialize an Agent.
Args:
provider: Provider name (e.g., "openai", "anthropic")
model: Model name (e.g., "gpt-4o", "claude-sonnet")
api_key: API key (defaults to environment variable)
base_url: Custom base URL for the API
timeout: Request timeout in seconds
max_retries: Maximum retry attempts for transient errors
temperature: Sampling temperature
max_tokens: Maximum tokens to generate
top_p: Top-p sampling parameter
tools: List of tools available to the agent
middleware: List of middleware to apply
default_system: Default system prompt
**kwargs: Additional provider-specific options
"""
# Build configuration
self.config = AgentConfig(
provider=provider,
model=model,
api_key=api_key,
base_url=base_url,
timeout=timeout,
max_retries=max_retries,
temperature=temperature,
max_tokens=max_tokens,
top_p=top_p,
default_system=default_system,
extra=kwargs,
)
# Initialize provider
self._provider: BaseProvider = get_provider(
provider,
api_key=self.config.api_key,
base_url=self.config.base_url,
timeout=self.config.timeout,
max_retries=self.config.max_retries,
**kwargs,
)
# Store tools
self._tools = tools or []
# Build middleware chain
self._middleware = MiddlewareChain(middleware or [])
# Create execution runtime
self._runtime = ExecutionRuntime(
provider=self._provider,
config=self.config,
tools=self._tools,
middleware=self._middleware,
retry_config=RetryConfig(max_retries=max_retries),
)
@property
def provider(self) -> str:
"""Get the provider name."""
return self.config.provider
@property
def model(self) -> str:
"""Get the model name."""
return self.config.model
@property
def tools(self) -> list[Tool]:
"""Get registered tools."""
return self._tools
[docs]
def run(
self,
input: str | None = None,
*,
messages: list[Message] | None = None,
system: str | None = None,
temperature: float | None = None,
max_tokens: int | None = None,
stop: list[str] | None = None,
metadata: dict[str, Any] | None = None,
) -> AgentResponse:
"""
Execute a synchronous request.
Args:
input: User input text
messages: Optional message history
system: System prompt (overrides default)
temperature: Sampling temperature (overrides default)
max_tokens: Max tokens (overrides default)
stop: Stop sequences
metadata: Request metadata
Returns:
AgentResponse with the model's response
"""
request = self._build_request(
input=input,
messages=messages,
system=system,
temperature=temperature,
max_tokens=max_tokens,
stop=stop,
metadata=metadata,
)
return self._runtime.run(request)
[docs]
async def run_async(
self,
input: str | None = None,
*,
messages: list[Message] | None = None,
system: str | None = None,
temperature: float | None = None,
max_tokens: int | None = None,
stop: list[str] | None = None,
metadata: dict[str, Any] | None = None,
) -> AgentResponse:
"""
Execute an asynchronous request.
Args:
input: User input text
messages: Optional message history
system: System prompt (overrides default)
temperature: Sampling temperature (overrides default)
max_tokens: Max tokens (overrides default)
stop: Stop sequences
metadata: Request metadata
Returns:
AgentResponse with the model's response
"""
request = self._build_request(
input=input,
messages=messages,
system=system,
temperature=temperature,
max_tokens=max_tokens,
stop=stop,
metadata=metadata,
)
return await self._runtime.run_async(request)
[docs]
def stream(
self,
input: str | None = None,
*,
messages: list[Message] | None = None,
system: str | None = None,
temperature: float | None = None,
max_tokens: int | None = None,
stop: list[str] | None = None,
metadata: dict[str, Any] | None = None,
) -> StreamResponse:
"""
Execute a streaming request.
Args:
input: User input text
messages: Optional message history
system: System prompt (overrides default)
temperature: Sampling temperature (overrides default)
max_tokens: Max tokens (overrides default)
stop: Stop sequences
metadata: Request metadata
Returns:
StreamResponse iterator yielding events
"""
request = self._build_request(
input=input,
messages=messages,
system=system,
temperature=temperature,
max_tokens=max_tokens,
stop=stop,
metadata=metadata,
)
return self._runtime.stream(request)
[docs]
async def stream_async(
self,
input: str | None = None,
*,
messages: list[Message] | None = None,
system: str | None = None,
temperature: float | None = None,
max_tokens: int | None = None,
stop: list[str] | None = None,
metadata: dict[str, Any] | None = None,
) -> AsyncStreamResponse:
"""
Execute an async streaming request.
Args:
input: User input text
messages: Optional message history
system: System prompt (overrides default)
temperature: Sampling temperature (overrides default)
max_tokens: Max tokens (overrides default)
stop: Stop sequences
metadata: Request metadata
Returns:
AsyncStreamResponse iterator yielding events
"""
request = self._build_request(
input=input,
messages=messages,
system=system,
temperature=temperature,
max_tokens=max_tokens,
stop=stop,
metadata=metadata,
)
return await self._runtime.stream_async(request)
[docs]
def json(
self,
input: str | None = None,
*,
schema: type[BaseModel] | dict[str, Any],
messages: list[Message] | None = None,
system: str | None = None,
temperature: float | None = None,
max_tokens: int | None = None,
metadata: dict[str, Any] | None = None,
) -> AgentResponse:
"""
Execute a request expecting structured JSON output.
Args:
input: User input text
schema: Pydantic model or JSON schema for output
messages: Optional message history
system: System prompt (overrides default)
temperature: Sampling temperature (overrides default)
max_tokens: Max tokens (overrides default)
metadata: Request metadata
Returns:
AgentResponse with parsed output in response.output
"""
request = self._build_request(
input=input,
messages=messages,
system=system,
temperature=temperature,
max_tokens=max_tokens,
metadata=metadata,
)
return self._runtime.run(request, schema=schema)
[docs]
async def json_async(
self,
input: str | None = None,
*,
schema: type[BaseModel] | dict[str, Any],
messages: list[Message] | None = None,
system: str | None = None,
temperature: float | None = None,
max_tokens: int | None = None,
metadata: dict[str, Any] | None = None,
) -> AgentResponse:
"""
Execute an async request expecting structured JSON output.
Args:
input: User input text
schema: Pydantic model or JSON schema for output
messages: Optional message history
system: System prompt (overrides default)
temperature: Sampling temperature (overrides default)
max_tokens: Max tokens (overrides default)
metadata: Request metadata
Returns:
AgentResponse with parsed output in response.output
"""
request = self._build_request(
input=input,
messages=messages,
system=system,
temperature=temperature,
max_tokens=max_tokens,
metadata=metadata,
)
return await self._runtime.run_async(request, schema=schema)
[docs]
def session(
self,
session_id: str | None = None,
system: str | None = None,
) -> "Session":
"""
Create a new session for multi-turn conversation.
Args:
session_id: Optional session identifier
system: System prompt for this session
Returns:
Session instance
"""
from agent.session import Session
return Session(
agent=self,
session_id=session_id,
system=system or self.config.default_system,
)
[docs]
def with_config(self, **kwargs: Any) -> "Agent":
"""
Create a new Agent with modified configuration.
Args:
**kwargs: Configuration overrides
Returns:
New Agent instance with modified config
"""
new_config = self.config.with_overrides(**kwargs)
return Agent(
provider=new_config.provider,
model=new_config.model,
api_key=new_config.api_key,
base_url=new_config.base_url,
timeout=new_config.timeout,
max_retries=new_config.max_retries,
temperature=new_config.temperature,
max_tokens=new_config.max_tokens,
top_p=new_config.top_p,
tools=list(self._tools),
middleware=list(self._middleware.middlewares),
default_system=new_config.default_system,
**new_config.extra,
)
def _build_request(
self,
input: str | None = None,
messages: list[Message] | None = None,
system: str | None = None,
temperature: float | None = None,
max_tokens: int | None = None,
top_p: float | None = None,
stop: list[str] | None = None,
metadata: dict[str, Any] | None = None,
) -> AgentRequest:
"""Build a normalized request from parameters."""
return AgentRequest(
input=input,
messages=messages or [],
system=system if system is not None else self.config.default_system,
temperature=temperature if temperature is not None else self.config.temperature,
max_tokens=max_tokens if max_tokens is not None else self.config.max_tokens,
top_p=top_p if top_p is not None else self.config.top_p,
stop=stop,
metadata=metadata or {},
)
def __repr__(self) -> str:
return f"Agent(provider={self.provider!r}, model={self.model!r})"