Source code for agent.execution.tool_loop
"""
Tool execution loop.
Handles the cycle of LLM response -> tool execution -> continue until done.
"""
import asyncio
from collections.abc import Awaitable, Callable
from agent.errors import ToolExecutionError
from agent.messages import AgentRequest, Message
from agent.response import AgentResponse
from agent.tools import Tool, ToolRegistry
from agent.types.config import ToolLoopConfig
from agent.types.tools import ToolCall, ToolResult
[docs]
class ToolLoop:
"""
Manages the tool calling loop.
Executes tools requested by the LLM and continues the conversation
until the LLM produces a final response without tool calls.
"""
def __init__(
self,
tools: list[Tool],
config: ToolLoopConfig | None = None,
):
self.config = config or ToolLoopConfig()
self.registry = ToolRegistry()
for tool in tools:
self.registry.register(tool)
[docs]
def execute_tool_calls(
self,
tool_calls: list[ToolCall],
) -> list[ToolResult]:
"""
Execute a list of tool calls synchronously.
Args:
tool_calls: Tool calls to execute
Returns:
List of tool results
"""
results: list[ToolResult] = []
for call in tool_calls[: self.config.max_tool_calls_per_iteration]:
tool = self.registry.get(call.name)
if tool is None:
results.append(
ToolResult(
tool_call_id=call.id,
name=call.name,
content=f"Error: Unknown tool '{call.name}'",
is_error=True,
)
)
continue
try:
content = tool.execute_sync(call.arguments)
results.append(
ToolResult(
tool_call_id=call.id,
name=call.name,
content=content,
is_error=False,
)
)
except Exception as e:
if self.config.stop_on_error:
raise ToolExecutionError(
f"Tool '{call.name}' failed: {e}",
tool_name=call.name,
) from e
results.append(
ToolResult(
tool_call_id=call.id,
name=call.name,
content=f"Error: {e}",
is_error=True,
)
)
return results
[docs]
async def execute_tool_calls_async(
self,
tool_calls: list[ToolCall],
) -> list[ToolResult]:
"""
Execute a list of tool calls asynchronously.
Args:
tool_calls: Tool calls to execute
Returns:
List of tool results
"""
calls_to_process = tool_calls[: self.config.max_tool_calls_per_iteration]
if self.config.parallel_tool_execution:
# Execute tools in parallel
tasks = [self._execute_single_tool_async(call) for call in calls_to_process]
return await asyncio.gather(*tasks)
else:
# Execute tools sequentially
results = []
for call in calls_to_process:
result = await self._execute_single_tool_async(call)
results.append(result)
return results
async def _execute_single_tool_async(self, call: ToolCall) -> ToolResult:
"""Execute a single tool call."""
tool = self.registry.get(call.name)
if tool is None:
return ToolResult(
tool_call_id=call.id,
name=call.name,
content=f"Error: Unknown tool '{call.name}'",
is_error=True,
)
try:
if tool.timeout:
content = await asyncio.wait_for(
tool.execute(call.arguments),
timeout=tool.timeout,
)
else:
content = await asyncio.wait_for(
tool.execute(call.arguments),
timeout=self.config.timeout_per_tool,
)
return ToolResult(
tool_call_id=call.id,
name=call.name,
content=content,
is_error=False,
)
except asyncio.TimeoutError:
return ToolResult(
tool_call_id=call.id,
name=call.name,
content=f"Error: Tool '{call.name}' timed out",
is_error=True,
)
except Exception as e:
if self.config.stop_on_error:
raise ToolExecutionError(
f"Tool '{call.name}' failed: {e}",
tool_name=call.name,
) from e
return ToolResult(
tool_call_id=call.id,
name=call.name,
content=f"Error: {e}",
is_error=True,
)
[docs]
def build_tool_messages(
self,
response: AgentResponse,
results: list[ToolResult],
) -> list[Message]:
"""
Build messages to append to conversation after tool execution.
Args:
response: The LLM response containing tool calls
results: Results from tool execution
Returns:
List of messages to append
"""
messages: list[Message] = []
# Add assistant message with tool calls
messages.append(
Message.assistant(
content=response.text or "",
tool_calls=[tc.to_dict() for tc in response.tool_calls],
)
)
# Add tool result messages
for result in results:
messages.append(
Message.tool(
content=result.content,
tool_call_id=result.tool_call_id,
name=result.name,
)
)
return messages
[docs]
def run_loop(
self,
initial_request: AgentRequest,
run_fn: Callable[[AgentRequest], AgentResponse],
) -> AgentResponse:
"""
Run the tool loop until completion.
Args:
initial_request: The initial request
run_fn: Function to call the LLM
Returns:
Final response after tool loop completion
"""
request = initial_request
messages = list(request.messages)
for iteration in range(self.config.max_iterations):
# Build request with current messages
current_request = AgentRequest(
input=request.input if iteration == 0 else None,
messages=messages,
system=request.system,
tools=request.tools,
output_schema=request.output_schema,
temperature=request.temperature,
max_tokens=request.max_tokens,
top_p=request.top_p,
stop=request.stop,
metadata=request.metadata,
session_id=request.session_id,
)
# Get response
response = run_fn(current_request)
# If no tool calls, we're done
if not response.has_tool_calls:
return response
# Execute tools
results = self.execute_tool_calls(response.tool_calls)
# Build messages for next iteration
tool_messages = self.build_tool_messages(response, results)
messages.extend(tool_messages)
# Max iterations reached
return response
[docs]
async def run_loop_async(
self,
initial_request: AgentRequest,
run_fn: Callable[[AgentRequest], Awaitable[AgentResponse]],
) -> AgentResponse:
"""
Run the tool loop asynchronously until completion.
Args:
initial_request: The initial request
run_fn: Async function to call the LLM
Returns:
Final response after tool loop completion
"""
request = initial_request
messages = list(request.messages)
for iteration in range(self.config.max_iterations):
current_request = AgentRequest(
input=request.input if iteration == 0 else None,
messages=messages,
system=request.system,
tools=request.tools,
output_schema=request.output_schema,
temperature=request.temperature,
max_tokens=request.max_tokens,
top_p=request.top_p,
stop=request.stop,
metadata=request.metadata,
session_id=request.session_id,
)
response = await run_fn(current_request)
if not response.has_tool_calls:
return response
results = await self.execute_tool_calls_async(response.tool_calls)
tool_messages = self.build_tool_messages(response, results)
messages.extend(tool_messages)
return response
# Re-export for backwards compatibility
__all__ = ["ToolLoopConfig", "ToolLoop"]