• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 18595249452

17 Oct 2025 02:08PM UTC coverage: 92.22% (+0.02%) from 92.2%
18595249452

Pull #9886

github

web-flow
Merge ad30d1879 into cc4f024af
Pull Request #9886: feat: Update tools param to Optional[Union[list[Union[Tool, Toolset]], Toolset]]

13382 of 14511 relevant lines covered (92.22%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.4
haystack/components/agents/agent.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import inspect
1✔
6
from dataclasses import dataclass
1✔
7
from typing import Any, Optional, Union, cast
1✔
8

9
from haystack import logging, tracing
1✔
10
from haystack.components.generators.chat.types import ChatGenerator
1✔
11
from haystack.components.tools import ToolInvoker
1✔
12
from haystack.core.component.component import component
1✔
13
from haystack.core.errors import PipelineRuntimeError
1✔
14
from haystack.core.pipeline.async_pipeline import AsyncPipeline
1✔
15
from haystack.core.pipeline.breakpoint import (
1✔
16
    _create_pipeline_snapshot_from_chat_generator,
17
    _create_pipeline_snapshot_from_tool_invoker,
18
    _trigger_chat_generator_breakpoint,
19
    _trigger_tool_invoker_breakpoint,
20
    _validate_tool_breakpoint_is_valid,
21
)
22
from haystack.core.pipeline.pipeline import Pipeline
1✔
23
from haystack.core.pipeline.utils import _deepcopy_with_exceptions
1✔
24
from haystack.core.serialization import component_to_dict, default_from_dict, default_to_dict
1✔
25
from haystack.dataclasses import ChatMessage, ChatRole
1✔
26
from haystack.dataclasses.breakpoints import AgentBreakpoint, AgentSnapshot, PipelineSnapshot, ToolBreakpoint
1✔
27
from haystack.dataclasses.streaming_chunk import StreamingCallbackT, select_streaming_callback
1✔
28
from haystack.tools import (
1✔
29
    Tool,
30
    Toolset,
31
    ToolsType,
32
    deserialize_tools_or_toolset_inplace,
33
    flatten_tools_or_toolsets,
34
    serialize_tools_or_toolset,
35
)
36
from haystack.utils import _deserialize_value_with_schema
1✔
37
from haystack.utils.callable_serialization import deserialize_callable, serialize_callable
1✔
38
from haystack.utils.deserialization import deserialize_chatgenerator_inplace
1✔
39

40
from .state.state import State, _schema_from_dict, _schema_to_dict, _validate_schema
1✔
41
from .state.state_utils import merge_lists
1✔
42

43
logger = logging.getLogger(__name__)
1✔
44

45

46
@dataclass
1✔
47
class _ExecutionContext:
1✔
48
    """
49
    Context for executing the agent.
50

51
    :param state: The current state of the agent, including messages and any additional data.
52
    :param component_visits: A dictionary tracking how many times each component has been visited.
53
    :param chat_generator_inputs: Runtime inputs to be passed to the chat generator.
54
    :param tool_invoker_inputs: Runtime inputs to be passed to the tool invoker.
55
    :param counter: A counter to track the number of steps taken in the agent's run.
56
    :param skip_chat_generator: A flag to indicate whether to skip the chat generator in the next iteration.
57
        This is useful when resuming from a ToolBreakpoint where the ToolInvoker needs to be called first.
58
    """
59

60
    state: State
1✔
61
    component_visits: dict
1✔
62
    chat_generator_inputs: dict
1✔
63
    tool_invoker_inputs: dict
1✔
64
    counter: int = 0
1✔
65
    skip_chat_generator: bool = False
1✔
66

67

68
@component
1✔
69
class Agent:
1✔
70
    """
71
    A Haystack component that implements a tool-using agent with provider-agnostic chat model support.
72

73
    The component processes messages and executes tools until an exit condition is met.
74
    The exit condition can be triggered either by a direct text response or by invoking a specific designated tool.
75
    Multiple exit conditions can be specified.
76

77
    When you call an Agent without tools, it acts as a ChatGenerator, produces one response, then exits.
78

79
    ### Usage example
80
    ```python
81
    from haystack.components.agents import Agent
82
    from haystack.components.generators.chat import OpenAIChatGenerator
83
    from haystack.dataclasses import ChatMessage
84
    from haystack.tools.tool import Tool
85

86
    tools = [Tool(name="calculator", description="..."), Tool(name="search", description="...")]
87

88
    agent = Agent(
89
        chat_generator=OpenAIChatGenerator(),
90
        tools=tools,
91
        exit_conditions=["search"],
92
    )
93

94
    # Run the agent
95
    result = agent.run(
96
        messages=[ChatMessage.from_user("Find information about Haystack")]
97
    )
98

99
    assert "messages" in result  # Contains conversation history
100
    ```
101
    """
102

103
    def __init__(
1✔
104
        self,
105
        *,
106
        chat_generator: ChatGenerator,
107
        tools: Optional[ToolsType] = None,
108
        system_prompt: Optional[str] = None,
109
        exit_conditions: Optional[list[str]] = None,
110
        state_schema: Optional[dict[str, Any]] = None,
111
        max_agent_steps: int = 100,
112
        streaming_callback: Optional[StreamingCallbackT] = None,
113
        raise_on_tool_invocation_failure: bool = False,
114
        tool_invoker_kwargs: Optional[dict[str, Any]] = None,
115
    ) -> None:
116
        """
117
        Initialize the agent component.
118

119
        :param chat_generator: An instance of the chat generator that your agent should use. It must support tools.
120
        :param tools: A list of Tool and/or Toolset objects, or a single Toolset that the agent can use.
121
        :param system_prompt: System prompt for the agent.
122
        :param exit_conditions: List of conditions that will cause the agent to return.
123
            Can include "text" if the agent should return when it generates a message without tool calls,
124
            or tool names that will cause the agent to return once the tool was executed. Defaults to ["text"].
125
        :param state_schema: The schema for the runtime state used by the tools.
126
        :param max_agent_steps: Maximum number of steps the agent will run before stopping. Defaults to 100.
127
            If the agent exceeds this number of steps, it will stop and return the current state.
128
        :param streaming_callback: A callback that will be invoked when a response is streamed from the LLM.
129
            The same callback can be configured to emit tool results when a tool is called.
130
        :param raise_on_tool_invocation_failure: Should the agent raise an exception when a tool invocation fails?
131
            If set to False, the exception will be turned into a chat message and passed to the LLM.
132
        :param tool_invoker_kwargs: Additional keyword arguments to pass to the ToolInvoker.
133
        :raises TypeError: If the chat_generator does not support tools parameter in its run method.
134
        :raises ValueError: If the exit_conditions are not valid.
135
        """
136
        # Check if chat_generator supports tools parameter
137
        chat_generator_run_method = inspect.signature(chat_generator.run)
1✔
138
        if "tools" not in chat_generator_run_method.parameters:
1✔
139
            raise TypeError(
1✔
140
                f"{type(chat_generator).__name__} does not accept tools parameter in its run method. "
141
                "The Agent component requires a chat generator that supports tools."
142
            )
143

144
        valid_exits = ["text"] + [tool.name for tool in flatten_tools_or_toolsets(tools)]
1✔
145
        if exit_conditions is None:
1✔
146
            exit_conditions = ["text"]
1✔
147
        if not all(condition in valid_exits for condition in exit_conditions):
1✔
148
            raise ValueError(
1✔
149
                f"Invalid exit conditions provided: {exit_conditions}. "
150
                f"Valid exit conditions must be a subset of {valid_exits}. "
151
                "Ensure that each exit condition corresponds to either 'text' or a valid tool name."
152
            )
153

154
        # Validate state schema if provided
155
        if state_schema is not None:
1✔
156
            _validate_schema(state_schema)
1✔
157
        self._state_schema = state_schema or {}
1✔
158

159
        # Initialize state schema
160
        resolved_state_schema = _deepcopy_with_exceptions(self._state_schema)
1✔
161
        if resolved_state_schema.get("messages") is None:
1✔
162
            resolved_state_schema["messages"] = {"type": list[ChatMessage], "handler": merge_lists}
1✔
163
        self.state_schema = resolved_state_schema
1✔
164

165
        self.chat_generator = chat_generator
1✔
166
        self.tools = tools or []
1✔
167
        self.system_prompt = system_prompt
1✔
168
        self.exit_conditions = exit_conditions
1✔
169
        self.max_agent_steps = max_agent_steps
1✔
170
        self.raise_on_tool_invocation_failure = raise_on_tool_invocation_failure
1✔
171
        self.streaming_callback = streaming_callback
1✔
172

173
        output_types = {"last_message": ChatMessage}
1✔
174
        for param, config in self.state_schema.items():
1✔
175
            output_types[param] = config["type"]
1✔
176
            # Skip setting input types for parameters that are already in the run method
177
            if param in ["messages", "streaming_callback"]:
1✔
178
                continue
1✔
179
            component.set_input_type(self, name=param, type=config["type"], default=None)
1✔
180
        component.set_output_types(self, **output_types)
1✔
181

182
        self.tool_invoker_kwargs = tool_invoker_kwargs
1✔
183
        self._tool_invoker = None
1✔
184
        if self.tools:
1✔
185
            resolved_tool_invoker_kwargs = {
1✔
186
                "tools": self.tools,
187
                "raise_on_failure": self.raise_on_tool_invocation_failure,
188
                **(tool_invoker_kwargs or {}),
189
            }
190
            self._tool_invoker = ToolInvoker(**resolved_tool_invoker_kwargs)
1✔
191
        else:
192
            logger.warning(
1✔
193
                "No tools provided to the Agent. The Agent will behave like a ChatGenerator and only return text "
194
                "responses. To enable tool usage, pass tools directly to the Agent, not to the chat_generator."
195
            )
196

197
        self._is_warmed_up = False
1✔
198

199
    def warm_up(self) -> None:
1✔
200
        """
201
        Warm up the Agent.
202
        """
203
        if not self._is_warmed_up:
1✔
204
            if hasattr(self.chat_generator, "warm_up"):
1✔
205
                self.chat_generator.warm_up()
×
206
            self._is_warmed_up = True
1✔
207

208
    def to_dict(self) -> dict[str, Any]:
1✔
209
        """
210
        Serialize the component to a dictionary.
211

212
        :return: Dictionary with serialized data
213
        """
214
        return default_to_dict(
1✔
215
            self,
216
            chat_generator=component_to_dict(obj=self.chat_generator, name="chat_generator"),
217
            tools=serialize_tools_or_toolset(self.tools),
218
            system_prompt=self.system_prompt,
219
            exit_conditions=self.exit_conditions,
220
            # We serialize the original state schema, not the resolved one to reflect the original user input
221
            state_schema=_schema_to_dict(self._state_schema),
222
            max_agent_steps=self.max_agent_steps,
223
            streaming_callback=serialize_callable(self.streaming_callback) if self.streaming_callback else None,
224
            raise_on_tool_invocation_failure=self.raise_on_tool_invocation_failure,
225
            tool_invoker_kwargs=self.tool_invoker_kwargs,
226
        )
227

228
    @classmethod
1✔
229
    def from_dict(cls, data: dict[str, Any]) -> "Agent":
1✔
230
        """
231
        Deserialize the agent from a dictionary.
232

233
        :param data: Dictionary to deserialize from
234
        :return: Deserialized agent
235
        """
236
        init_params = data.get("init_parameters", {})
1✔
237

238
        deserialize_chatgenerator_inplace(init_params, key="chat_generator")
1✔
239

240
        if "state_schema" in init_params:
1✔
241
            init_params["state_schema"] = _schema_from_dict(init_params["state_schema"])
1✔
242

243
        if init_params.get("streaming_callback") is not None:
1✔
244
            init_params["streaming_callback"] = deserialize_callable(init_params["streaming_callback"])
1✔
245

246
        deserialize_tools_or_toolset_inplace(init_params, key="tools")
1✔
247

248
        return default_from_dict(cls, data)
1✔
249

250
    def _create_agent_span(self) -> Any:
1✔
251
        """Create a span for the agent run."""
252
        return tracing.tracer.trace(
1✔
253
            "haystack.agent.run",
254
            tags={
255
                "haystack.agent.max_steps": self.max_agent_steps,
256
                "haystack.agent.tools": self.tools,
257
                "haystack.agent.exit_conditions": self.exit_conditions,
258
                "haystack.agent.state_schema": _schema_to_dict(self.state_schema),
259
            },
260
        )
261

262
    def _initialize_fresh_execution(
1✔
263
        self,
264
        messages: list[ChatMessage],
265
        streaming_callback: Optional[StreamingCallbackT],
266
        requires_async: bool,
267
        *,
268
        system_prompt: Optional[str] = None,
269
        tools: Optional[Union[ToolsType, list[str]]] = None,
270
        **kwargs,
271
    ) -> _ExecutionContext:
272
        """
273
        Initialize execution context for a fresh run of the agent.
274

275
        :param messages: List of ChatMessage objects to start the agent with.
276
        :param streaming_callback: Optional callback for streaming responses.
277
        :param requires_async: Whether the agent run requires asynchronous execution.
278
        :param system_prompt: System prompt for the agent. If provided, it overrides the default system prompt.
279
        :param tools: Optional list of Tool objects, a Toolset, or list of tool names to use for this run.
280
            When passing tool names, tools are selected from the Agent's originally configured tools.
281
        :param kwargs: Additional data to pass to the State used by the Agent.
282
        """
283
        system_prompt = system_prompt or self.system_prompt
1✔
284
        if system_prompt is not None:
1✔
285
            messages = [ChatMessage.from_system(system_prompt)] + messages
1✔
286

287
        if all(m.is_from(ChatRole.SYSTEM) for m in messages):
1✔
288
            logger.warning("All messages provided to the Agent component are system messages. This is not recommended.")
1✔
289

290
        state = State(schema=self.state_schema, data=kwargs)
1✔
291
        state.set("messages", messages)
1✔
292

293
        streaming_callback = select_streaming_callback(  # type: ignore[call-overload]
1✔
294
            init_callback=self.streaming_callback, runtime_callback=streaming_callback, requires_async=requires_async
295
        )
296

297
        selected_tools = self._select_tools(tools)
1✔
298
        tool_invoker_inputs: dict[str, Any] = {"tools": selected_tools}
1✔
299
        generator_inputs: dict[str, Any] = {"tools": selected_tools}
1✔
300
        if streaming_callback is not None:
1✔
301
            tool_invoker_inputs["streaming_callback"] = streaming_callback
1✔
302
            generator_inputs["streaming_callback"] = streaming_callback
1✔
303

304
        return _ExecutionContext(
1✔
305
            state=state,
306
            component_visits=dict.fromkeys(["chat_generator", "tool_invoker"], 0),
307
            chat_generator_inputs=generator_inputs,
308
            tool_invoker_inputs=tool_invoker_inputs,
309
        )
310

311
    def _select_tools(self, tools: Optional[Union[ToolsType, list[str]]] = None) -> ToolsType:
1✔
312
        """
313
        Select tools for the current run based on the provided tools parameter.
314

315
        :param tools: Optional list of Tool objects, a Toolset, or list of tool names to use for this run.
316
            When passing tool names, tools are selected from the Agent's originally configured tools.
317
        :returns: Selected tools for the current run.
318
        :raises ValueError: If tool names are provided but no tools were configured at initialization,
319
            or if any provided tool name is not valid.
320
        :raises TypeError: If tools is not a list of Tool objects, a Toolset, or a list of tool names (strings).
321
        """
322
        if tools is None:
1✔
323
            return self.tools
1✔
324

325
        if isinstance(tools, list) and all(isinstance(t, str) for t in tools):
1✔
326
            if not self.tools:
1✔
327
                raise ValueError("No tools were configured for the Agent at initialization.")
1✔
328
            available_tools = flatten_tools_or_toolsets(self.tools)
1✔
329
            selected_tool_names = cast(list[str], tools)  # mypy thinks this could still be list[Tool] or Toolset
1✔
330
            valid_tool_names = {tool.name for tool in available_tools}
1✔
331
            invalid_tool_names = {name for name in selected_tool_names if name not in valid_tool_names}
1✔
332
            if invalid_tool_names:
1✔
333
                raise ValueError(
1✔
334
                    f"The following tool names are not valid: {invalid_tool_names}. "
335
                    f"Valid tool names are: {valid_tool_names}."
336
                )
337
            return [tool for tool in available_tools if tool.name in selected_tool_names]
1✔
338

339
        if isinstance(tools, Toolset):
1✔
340
            return tools
×
341

342
        if isinstance(tools, list):
1✔
343
            return cast(list[Union[Tool, Toolset]], tools)  # mypy can't narrow the Union type from isinstance check
1✔
344

345
        raise TypeError(
1✔
346
            "tools must be a list of Tool and/or Toolset objects, a Toolset, or a list of tool names (strings)."
347
        )
348

349
    def _initialize_from_snapshot(
1✔
350
        self,
351
        snapshot: AgentSnapshot,
352
        streaming_callback: Optional[StreamingCallbackT],
353
        requires_async: bool,
354
        *,
355
        tools: Optional[Union[ToolsType, list[str]]] = None,
356
    ) -> _ExecutionContext:
357
        """
358
        Initialize execution context from an AgentSnapshot.
359

360
        :param snapshot: An AgentSnapshot containing the state of a previously saved agent execution.
361
        :param streaming_callback: Optional callback for streaming responses.
362
        :param requires_async: Whether the agent run requires asynchronous execution.
363
        :param tools: Optional list of Tool objects, a Toolset, or list of tool names to use for this run.
364
            When passing tool names, tools are selected from the Agent's originally configured tools.
365
        """
366
        component_visits = snapshot.component_visits
1✔
367
        current_inputs = {
1✔
368
            "chat_generator": _deserialize_value_with_schema(snapshot.component_inputs["chat_generator"]),
369
            "tool_invoker": _deserialize_value_with_schema(snapshot.component_inputs["tool_invoker"]),
370
        }
371

372
        state_data = current_inputs["tool_invoker"]["state"].data
1✔
373
        state = State(schema=self.state_schema, data=state_data)
1✔
374

375
        skip_chat_generator = isinstance(snapshot.break_point.break_point, ToolBreakpoint)
1✔
376
        streaming_callback = current_inputs["chat_generator"].get("streaming_callback", streaming_callback)
1✔
377
        streaming_callback = select_streaming_callback(  # type: ignore[call-overload]
1✔
378
            init_callback=self.streaming_callback, runtime_callback=streaming_callback, requires_async=requires_async
379
        )
380

381
        selected_tools = self._select_tools(tools)
1✔
382
        tool_invoker_inputs: dict[str, Any] = {"tools": selected_tools}
1✔
383
        generator_inputs: dict[str, Any] = {"tools": selected_tools}
1✔
384
        if streaming_callback is not None:
1✔
385
            tool_invoker_inputs["streaming_callback"] = streaming_callback
×
386
            generator_inputs["streaming_callback"] = streaming_callback
×
387

388
        return _ExecutionContext(
1✔
389
            state=state,
390
            component_visits=component_visits,
391
            chat_generator_inputs=generator_inputs,
392
            tool_invoker_inputs=tool_invoker_inputs,
393
            counter=snapshot.break_point.break_point.visit_count,
394
            skip_chat_generator=skip_chat_generator,
395
        )
396

397
    def _runtime_checks(self, break_point: Optional[AgentBreakpoint], snapshot: Optional[AgentSnapshot]) -> None:
1✔
398
        """
399
        Perform runtime checks before running the agent.
400

401
        :param break_point: An AgentBreakpoint, can be a Breakpoint for the "chat_generator" or a ToolBreakpoint
402
            for "tool_invoker".
403
        :param snapshot: An AgentSnapshot containing the state of a previously saved agent execution.
404
        :raises RuntimeError: If the Agent component wasn't warmed up before calling `run()`.
405
        :raises ValueError: If the break_point is invalid.
406
        """
407
        if not self._is_warmed_up and hasattr(self.chat_generator, "warm_up"):
1✔
408
            raise RuntimeError("The component Agent wasn't warmed up. Run 'warm_up()' before calling 'run()'.")
1✔
409

410
        if break_point and isinstance(break_point.break_point, ToolBreakpoint):
1✔
411
            _validate_tool_breakpoint_is_valid(agent_breakpoint=break_point, tools=self.tools)
1✔
412

413
    @staticmethod
1✔
414
    def _check_chat_generator_breakpoint(
1✔
415
        execution_context: _ExecutionContext,
416
        break_point: Optional[AgentBreakpoint],
417
        parent_snapshot: Optional[PipelineSnapshot],
418
    ) -> None:
419
        """
420
        Check if the chat generator breakpoint should be triggered.
421

422
        If the breakpoint should be triggered, create an agent snapshot and trigger the chat generator breakpoint.
423

424
        :param execution_context: The current execution context of the agent.
425
        :param break_point: An AgentBreakpoint, can be a Breakpoint for the "chat_generator" or a ToolBreakpoint
426
            for "tool_invoker".
427
        :param parent_snapshot: An optional parent snapshot for the agent execution.
428
        """
429
        if (
1✔
430
            break_point
431
            and break_point.break_point.component_name == "chat_generator"
432
            and execution_context.component_visits["chat_generator"] == break_point.break_point.visit_count
433
        ):
434
            pipeline_snapshot = _create_pipeline_snapshot_from_chat_generator(
1✔
435
                execution_context=execution_context, break_point=break_point, parent_snapshot=parent_snapshot
436
            )
437
            _trigger_chat_generator_breakpoint(pipeline_snapshot=pipeline_snapshot)
1✔
438

439
    @staticmethod
1✔
440
    def _check_tool_invoker_breakpoint(
1✔
441
        execution_context: _ExecutionContext,
442
        break_point: Optional[AgentBreakpoint],
443
        parent_snapshot: Optional[PipelineSnapshot],
444
    ) -> None:
445
        """
446
        Check if the tool invoker breakpoint should be triggered.
447

448
        If the breakpoint should be triggered, create an agent snapshot and trigger the tool invoker breakpoint.
449

450
        :param execution_context: The current execution context of the agent.
451
        :param break_point: An AgentBreakpoint, can be a Breakpoint for the "chat_generator" or a ToolBreakpoint
452
            for "tool_invoker".
453
        :param parent_snapshot: An optional parent snapshot for the agent execution.
454
        """
455
        if (
1✔
456
            break_point
457
            and break_point.break_point.component_name == "tool_invoker"
458
            and break_point.break_point.visit_count == execution_context.component_visits["tool_invoker"]
459
        ):
460
            pipeline_snapshot = _create_pipeline_snapshot_from_tool_invoker(
1✔
461
                execution_context=execution_context, break_point=break_point, parent_snapshot=parent_snapshot
462
            )
463
            _trigger_tool_invoker_breakpoint(
1✔
464
                llm_messages=execution_context.state.data["messages"][-1:], pipeline_snapshot=pipeline_snapshot
465
            )
466

467
    def run(  # noqa: PLR0915
1✔
468
        self,
469
        messages: list[ChatMessage],
470
        streaming_callback: Optional[StreamingCallbackT] = None,
471
        *,
472
        break_point: Optional[AgentBreakpoint] = None,
473
        snapshot: Optional[AgentSnapshot] = None,
474
        system_prompt: Optional[str] = None,
475
        tools: Optional[Union[ToolsType, list[str]]] = None,
476
        **kwargs: Any,
477
    ) -> dict[str, Any]:
478
        """
479
        Process messages and execute tools until an exit condition is met.
480

481
        :param messages: List of Haystack ChatMessage objects to process.
482
        :param streaming_callback: A callback that will be invoked when a response is streamed from the LLM.
483
            The same callback can be configured to emit tool results when a tool is called.
484
        :param break_point: An AgentBreakpoint, can be a Breakpoint for the "chat_generator" or a ToolBreakpoint
485
            for "tool_invoker".
486
        :param snapshot: A dictionary containing a snapshot of a previously saved agent execution. The snapshot contains
487
            the relevant information to restart the Agent execution from where it left off.
488
        :param system_prompt: System prompt for the agent. If provided, it overrides the default system prompt.
489
        :param tools: Optional list of Tool objects, a Toolset, or list of tool names to use for this run.
490
            When passing tool names, tools are selected from the Agent's originally configured tools.
491
        :param kwargs: Additional data to pass to the State schema used by the Agent.
492
            The keys must match the schema defined in the Agent's `state_schema`.
493
        :returns:
494
            A dictionary with the following keys:
495
            - "messages": List of all messages exchanged during the agent's run.
496
            - "last_message": The last message exchanged during the agent's run.
497
            - Any additional keys defined in the `state_schema`.
498
        :raises RuntimeError: If the Agent component wasn't warmed up before calling `run()`.
499
        :raises BreakpointException: If an agent breakpoint is triggered.
500
        """
501
        # We pop parent_snapshot from kwargs to avoid passing it into State.
502
        parent_snapshot = kwargs.pop("parent_snapshot", None)
1✔
503
        agent_inputs = {
1✔
504
            "messages": messages,
505
            "streaming_callback": streaming_callback,
506
            "break_point": break_point,
507
            "snapshot": snapshot,
508
            **kwargs,
509
        }
510
        self._runtime_checks(break_point=break_point, snapshot=snapshot)
1✔
511

512
        if snapshot:
1✔
513
            exe_context = self._initialize_from_snapshot(
1✔
514
                snapshot=snapshot, streaming_callback=streaming_callback, requires_async=False, tools=tools
515
            )
516
        else:
517
            exe_context = self._initialize_fresh_execution(
1✔
518
                messages=messages,
519
                streaming_callback=streaming_callback,
520
                requires_async=False,
521
                system_prompt=system_prompt,
522
                tools=tools,
523
                **kwargs,
524
            )
525

526
        with self._create_agent_span() as span:
1✔
527
            span.set_content_tag("haystack.agent.input", _deepcopy_with_exceptions(agent_inputs))
1✔
528

529
            while exe_context.counter < self.max_agent_steps:
1✔
530
                # Handle breakpoint and ChatGenerator call
531
                Agent._check_chat_generator_breakpoint(
1✔
532
                    execution_context=exe_context, break_point=break_point, parent_snapshot=parent_snapshot
533
                )
534
                # We skip the chat generator when restarting from a snapshot from a ToolBreakpoint
535
                if exe_context.skip_chat_generator:
1✔
536
                    llm_messages = exe_context.state.get("messages", [])[-1:]
1✔
537
                    # Set to False so the next iteration will call the chat generator
538
                    exe_context.skip_chat_generator = False
1✔
539
                else:
540
                    try:
1✔
541
                        result = Pipeline._run_component(
1✔
542
                            component_name="chat_generator",
543
                            component={"instance": self.chat_generator},
544
                            inputs={
545
                                "messages": exe_context.state.data["messages"],
546
                                **exe_context.chat_generator_inputs,
547
                            },
548
                            component_visits=exe_context.component_visits,
549
                            parent_span=span,
550
                        )
551
                    except PipelineRuntimeError as e:
1✔
552
                        pipeline_snapshot = _create_pipeline_snapshot_from_chat_generator(
1✔
553
                            agent_name=getattr(self, "__component_name__", None),
554
                            execution_context=exe_context,
555
                            parent_snapshot=parent_snapshot,
556
                        )
557
                        e.pipeline_snapshot = pipeline_snapshot
1✔
558
                        raise e
1✔
559

560
                    llm_messages = result["replies"]
1✔
561
                    exe_context.state.set("messages", llm_messages)
1✔
562

563
                # Check if any of the LLM responses contain a tool call or if the LLM is not using tools
564
                if not any(msg.tool_call for msg in llm_messages) or self._tool_invoker is None:
1✔
565
                    exe_context.counter += 1
1✔
566
                    break
1✔
567

568
                # Handle breakpoint and ToolInvoker call
569
                Agent._check_tool_invoker_breakpoint(
1✔
570
                    execution_context=exe_context, break_point=break_point, parent_snapshot=parent_snapshot
571
                )
572
                try:
1✔
573
                    # We only send the messages from the LLM to the tool invoker
574
                    tool_invoker_result = Pipeline._run_component(
1✔
575
                        component_name="tool_invoker",
576
                        component={"instance": self._tool_invoker},
577
                        inputs={
578
                            "messages": llm_messages,
579
                            "state": exe_context.state,
580
                            **exe_context.tool_invoker_inputs,
581
                        },
582
                        component_visits=exe_context.component_visits,
583
                        parent_span=span,
584
                    )
585
                except PipelineRuntimeError as e:
1✔
586
                    # Access the original Tool Invoker exception
587
                    original_error = e.__cause__
1✔
588
                    tool_name = getattr(original_error, "tool_name", None)
1✔
589

590
                    pipeline_snapshot = _create_pipeline_snapshot_from_tool_invoker(
1✔
591
                        tool_name=tool_name,
592
                        agent_name=getattr(self, "__component_name__", None),
593
                        execution_context=exe_context,
594
                        parent_snapshot=parent_snapshot,
595
                    )
596
                    e.pipeline_snapshot = pipeline_snapshot
1✔
597
                    raise e
1✔
598

599
                tool_messages = tool_invoker_result["tool_messages"]
1✔
600
                exe_context.state = tool_invoker_result["state"]
1✔
601
                exe_context.state.set("messages", tool_messages)
1✔
602

603
                # Check if any LLM message's tool call name matches an exit condition
604
                if self.exit_conditions != ["text"] and self._check_exit_conditions(llm_messages, tool_messages):
1✔
605
                    exe_context.counter += 1
1✔
606
                    break
1✔
607

608
                # Increment the step counter
609
                exe_context.counter += 1
1✔
610

611
            if exe_context.counter >= self.max_agent_steps:
1✔
612
                logger.warning(
1✔
613
                    "Agent reached maximum agent steps of {max_agent_steps}, stopping.",
614
                    max_agent_steps=self.max_agent_steps,
615
                )
616
            span.set_content_tag("haystack.agent.output", exe_context.state.data)
1✔
617
            span.set_tag("haystack.agent.steps_taken", exe_context.counter)
1✔
618

619
        result = {**exe_context.state.data}
1✔
620
        if msgs := result.get("messages"):
1✔
621
            result["last_message"] = msgs[-1]
1✔
622
        return result
1✔
623

624
    async def run_async(
1✔
625
        self,
626
        messages: list[ChatMessage],
627
        streaming_callback: Optional[StreamingCallbackT] = None,
628
        *,
629
        break_point: Optional[AgentBreakpoint] = None,
630
        snapshot: Optional[AgentSnapshot] = None,
631
        system_prompt: Optional[str] = None,
632
        tools: Optional[Union[ToolsType, list[str]]] = None,
633
        **kwargs: Any,
634
    ) -> dict[str, Any]:
635
        """
636
        Asynchronously process messages and execute tools until the exit condition is met.
637

638
        This is the asynchronous version of the `run` method. It follows the same logic but uses
639
        asynchronous operations where possible, such as calling the `run_async` method of the ChatGenerator
640
        if available.
641

642
        :param messages: List of Haystack ChatMessage objects to process.
643
        :param streaming_callback: An asynchronous callback that will be invoked when a response is streamed from the
644
            LLM. The same callback can be configured to emit tool results when a tool is called.
645
        :param break_point: An AgentBreakpoint, can be a Breakpoint for the "chat_generator" or a ToolBreakpoint
646
            for "tool_invoker".
647
        :param snapshot: A dictionary containing a snapshot of a previously saved agent execution. The snapshot contains
648
            the relevant information to restart the Agent execution from where it left off.
649
        :param system_prompt: System prompt for the agent. If provided, it overrides the default system prompt.
650
        :param tools: Optional list of Tool objects, a Toolset, or list of tool names to use for this run.
651
        :param kwargs: Additional data to pass to the State schema used by the Agent.
652
            The keys must match the schema defined in the Agent's `state_schema`.
653
        :returns:
654
            A dictionary with the following keys:
655
            - "messages": List of all messages exchanged during the agent's run.
656
            - "last_message": The last message exchanged during the agent's run.
657
            - Any additional keys defined in the `state_schema`.
658
        :raises RuntimeError: If the Agent component wasn't warmed up before calling `run_async()`.
659
        :raises BreakpointException: If an agent breakpoint is triggered.
660
        """
661
        # We pop parent_snapshot from kwargs to avoid passing it into State.
662
        parent_snapshot = kwargs.pop("parent_snapshot", None)
1✔
663
        agent_inputs = {
1✔
664
            "messages": messages,
665
            "streaming_callback": streaming_callback,
666
            "break_point": break_point,
667
            "snapshot": snapshot,
668
            **kwargs,
669
        }
670
        self._runtime_checks(break_point=break_point, snapshot=snapshot)
1✔
671

672
        if snapshot:
1✔
673
            exe_context = self._initialize_from_snapshot(
1✔
674
                snapshot=snapshot, streaming_callback=streaming_callback, requires_async=True, tools=tools
675
            )
676
        else:
677
            exe_context = self._initialize_fresh_execution(
1✔
678
                messages=messages,
679
                streaming_callback=streaming_callback,
680
                requires_async=True,
681
                system_prompt=system_prompt,
682
                tools=tools,
683
                **kwargs,
684
            )
685

686
        with self._create_agent_span() as span:
1✔
687
            span.set_content_tag("haystack.agent.input", _deepcopy_with_exceptions(agent_inputs))
1✔
688

689
            while exe_context.counter < self.max_agent_steps:
1✔
690
                # Handle breakpoint and ChatGenerator call
691
                self._check_chat_generator_breakpoint(
1✔
692
                    execution_context=exe_context, break_point=break_point, parent_snapshot=parent_snapshot
693
                )
694
                # We skip the chat generator when restarting from a snapshot from a ToolBreakpoint
695
                if exe_context.skip_chat_generator:
1✔
696
                    llm_messages = exe_context.state.get("messages", [])[-1:]
1✔
697
                    # Set to False so the next iteration will call the chat generator
698
                    exe_context.skip_chat_generator = False
1✔
699
                else:
700
                    result = await AsyncPipeline._run_component_async(
1✔
701
                        component_name="chat_generator",
702
                        component={"instance": self.chat_generator},
703
                        component_inputs={
704
                            "messages": exe_context.state.data["messages"],
705
                            **exe_context.chat_generator_inputs,
706
                        },
707
                        component_visits=exe_context.component_visits,
708
                        parent_span=span,
709
                    )
710
                    llm_messages = result["replies"]
1✔
711
                    exe_context.state.set("messages", llm_messages)
1✔
712

713
                # Check if any of the LLM responses contain a tool call or if the LLM is not using tools
714
                if not any(msg.tool_call for msg in llm_messages) or self._tool_invoker is None:
1✔
715
                    exe_context.counter += 1
1✔
716
                    break
1✔
717

718
                # Handle breakpoint and ToolInvoker call
719
                self._check_tool_invoker_breakpoint(
1✔
720
                    execution_context=exe_context, break_point=break_point, parent_snapshot=parent_snapshot
721
                )
722
                # We only send the messages from the LLM to the tool invoker
723
                tool_invoker_result = await AsyncPipeline._run_component_async(
1✔
724
                    component_name="tool_invoker",
725
                    component={"instance": self._tool_invoker},
726
                    component_inputs={
727
                        "messages": llm_messages,
728
                        "state": exe_context.state,
729
                        **exe_context.tool_invoker_inputs,
730
                    },
731
                    component_visits=exe_context.component_visits,
732
                    parent_span=span,
733
                )
734
                tool_messages = tool_invoker_result["tool_messages"]
1✔
735
                exe_context.state = tool_invoker_result["state"]
1✔
736
                exe_context.state.set("messages", tool_messages)
1✔
737

738
                # Check if any LLM message's tool call name matches an exit condition
739
                if self.exit_conditions != ["text"] and self._check_exit_conditions(llm_messages, tool_messages):
1✔
740
                    exe_context.counter += 1
×
741
                    break
×
742

743
                # Increment the step counter
744
                exe_context.counter += 1
1✔
745

746
            if exe_context.counter >= self.max_agent_steps:
1✔
747
                logger.warning(
×
748
                    "Agent reached maximum agent steps of {max_agent_steps}, stopping.",
749
                    max_agent_steps=self.max_agent_steps,
750
                )
751
            span.set_content_tag("haystack.agent.output", exe_context.state.data)
1✔
752
            span.set_tag("haystack.agent.steps_taken", exe_context.counter)
1✔
753

754
        result = {**exe_context.state.data}
1✔
755
        if msgs := result.get("messages"):
1✔
756
            result["last_message"] = msgs[-1]
1✔
757
        return result
1✔
758

759
    def _check_exit_conditions(self, llm_messages: list[ChatMessage], tool_messages: list[ChatMessage]) -> bool:
1✔
760
        """
761
        Check if any of the LLM messages' tool calls match an exit condition and if there are no errors.
762

763
        :param llm_messages: List of messages from the LLM
764
        :param tool_messages: List of messages from the tool invoker
765
        :return: True if an exit condition is met and there are no errors, False otherwise
766
        """
767
        matched_exit_conditions = set()
1✔
768
        has_errors = False
1✔
769

770
        for msg in llm_messages:
1✔
771
            if msg.tool_call and msg.tool_call.tool_name in self.exit_conditions:
1✔
772
                matched_exit_conditions.add(msg.tool_call.tool_name)
1✔
773

774
                # Check if any error is specifically from the tool matching the exit condition
775
                tool_errors = [
1✔
776
                    tool_msg.tool_call_result.error
777
                    for tool_msg in tool_messages
778
                    if tool_msg.tool_call_result is not None
779
                    and tool_msg.tool_call_result.origin.tool_name == msg.tool_call.tool_name
780
                ]
781
                if any(tool_errors):
1✔
782
                    has_errors = True
×
783
                    # No need to check further if we found an error
784
                    break
×
785

786
        # Only return True if at least one exit condition was matched AND none had errors
787
        return bool(matched_exit_conditions) and not has_errors
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc