• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 18647136217

20 Oct 2025 08:52AM UTC coverage: 92.179% (-0.04%) from 92.22%
18647136217

Pull #9856

github

web-flow
Merge dc9eda57a into 1de94413c
Pull Request #9856: Add Tools warm_up

13425 of 14564 relevant lines covered (92.18%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.83
haystack/components/agents/agent.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import inspect
1✔
6
from dataclasses import dataclass
1✔
7
from typing import Any, Optional, Union, cast
1✔
8

9
from haystack import logging, tracing
1✔
10
from haystack.components.generators.chat.types import ChatGenerator
1✔
11
from haystack.components.tools import ToolInvoker
1✔
12
from haystack.core.component.component import component
1✔
13
from haystack.core.errors import PipelineRuntimeError
1✔
14
from haystack.core.pipeline.async_pipeline import AsyncPipeline
1✔
15
from haystack.core.pipeline.breakpoint import (
1✔
16
    _create_pipeline_snapshot_from_chat_generator,
17
    _create_pipeline_snapshot_from_tool_invoker,
18
    _trigger_chat_generator_breakpoint,
19
    _trigger_tool_invoker_breakpoint,
20
    _validate_tool_breakpoint_is_valid,
21
)
22
from haystack.core.pipeline.pipeline import Pipeline
1✔
23
from haystack.core.pipeline.utils import _deepcopy_with_exceptions
1✔
24
from haystack.core.serialization import component_to_dict, default_from_dict, default_to_dict
1✔
25
from haystack.dataclasses import ChatMessage, ChatRole
1✔
26
from haystack.dataclasses.breakpoints import AgentBreakpoint, AgentSnapshot, PipelineSnapshot, ToolBreakpoint
1✔
27
from haystack.dataclasses.streaming_chunk import StreamingCallbackT, select_streaming_callback
1✔
28
from haystack.tools import (
1✔
29
    Tool,
30
    Toolset,
31
    ToolsType,
32
    deserialize_tools_or_toolset_inplace,
33
    flatten_tools_or_toolsets,
34
    serialize_tools_or_toolset,
35
)
36
from haystack.utils import _deserialize_value_with_schema
1✔
37
from haystack.utils.callable_serialization import deserialize_callable, serialize_callable
1✔
38
from haystack.utils.deserialization import deserialize_chatgenerator_inplace
1✔
39

40
from .state.state import State, _schema_from_dict, _schema_to_dict, _validate_schema
1✔
41
from .state.state_utils import merge_lists
1✔
42

43
logger = logging.getLogger(__name__)
1✔
44

45

46
@dataclass
1✔
47
class _ExecutionContext:
1✔
48
    """
49
    Context for executing the agent.
50

51
    :param state: The current state of the agent, including messages and any additional data.
52
    :param component_visits: A dictionary tracking how many times each component has been visited.
53
    :param chat_generator_inputs: Runtime inputs to be passed to the chat generator.
54
    :param tool_invoker_inputs: Runtime inputs to be passed to the tool invoker.
55
    :param counter: A counter to track the number of steps taken in the agent's run.
56
    :param skip_chat_generator: A flag to indicate whether to skip the chat generator in the next iteration.
57
        This is useful when resuming from a ToolBreakpoint where the ToolInvoker needs to be called first.
58
    """
59

60
    state: State
1✔
61
    component_visits: dict
1✔
62
    chat_generator_inputs: dict
1✔
63
    tool_invoker_inputs: dict
1✔
64
    counter: int = 0
1✔
65
    skip_chat_generator: bool = False
1✔
66

67

68
@component
1✔
69
class Agent:
1✔
70
    """
71
    A Haystack component that implements a tool-using agent with provider-agnostic chat model support.
72

73
    The component processes messages and executes tools until an exit condition is met.
74
    The exit condition can be triggered either by a direct text response or by invoking a specific designated tool.
75
    Multiple exit conditions can be specified.
76

77
    When you call an Agent without tools, it acts as a ChatGenerator, produces one response, then exits.
78

79
    ### Usage example
80
    ```python
81
    from haystack.components.agents import Agent
82
    from haystack.components.generators.chat import OpenAIChatGenerator
83
    from haystack.dataclasses import ChatMessage
84
    from haystack.tools.tool import Tool
85

86
    tools = [Tool(name="calculator", description="..."), Tool(name="search", description="...")]
87

88
    agent = Agent(
89
        chat_generator=OpenAIChatGenerator(),
90
        tools=tools,
91
        exit_conditions=["search"],
92
    )
93

94
    # Run the agent
95
    result = agent.run(
96
        messages=[ChatMessage.from_user("Find information about Haystack")]
97
    )
98

99
    assert "messages" in result  # Contains conversation history
100
    ```
101
    """
102

103
    def __init__(
1✔
104
        self,
105
        *,
106
        chat_generator: ChatGenerator,
107
        tools: Optional[ToolsType] = None,
108
        system_prompt: Optional[str] = None,
109
        exit_conditions: Optional[list[str]] = None,
110
        state_schema: Optional[dict[str, Any]] = None,
111
        max_agent_steps: int = 100,
112
        streaming_callback: Optional[StreamingCallbackT] = None,
113
        raise_on_tool_invocation_failure: bool = False,
114
        tool_invoker_kwargs: Optional[dict[str, Any]] = None,
115
    ) -> None:
116
        """
117
        Initialize the agent component.
118

119
        :param chat_generator: An instance of the chat generator that your agent should use. It must support tools.
120
        :param tools: A list of Tool and/or Toolset objects, or a single Toolset that the agent can use.
121
        :param system_prompt: System prompt for the agent.
122
        :param exit_conditions: List of conditions that will cause the agent to return.
123
            Can include "text" if the agent should return when it generates a message without tool calls,
124
            or tool names that will cause the agent to return once the tool was executed. Defaults to ["text"].
125
        :param state_schema: The schema for the runtime state used by the tools.
126
        :param max_agent_steps: Maximum number of steps the agent will run before stopping. Defaults to 100.
127
            If the agent exceeds this number of steps, it will stop and return the current state.
128
        :param streaming_callback: A callback that will be invoked when a response is streamed from the LLM.
129
            The same callback can be configured to emit tool results when a tool is called.
130
        :param raise_on_tool_invocation_failure: Should the agent raise an exception when a tool invocation fails?
131
            If set to False, the exception will be turned into a chat message and passed to the LLM.
132
        :param tool_invoker_kwargs: Additional keyword arguments to pass to the ToolInvoker.
133
        :raises TypeError: If the chat_generator does not support tools parameter in its run method.
134
        :raises ValueError: If the exit_conditions are not valid.
135
        """
136
        # Check if chat_generator supports tools parameter
137
        chat_generator_run_method = inspect.signature(chat_generator.run)
1✔
138
        if "tools" not in chat_generator_run_method.parameters:
1✔
139
            raise TypeError(
1✔
140
                f"{type(chat_generator).__name__} does not accept tools parameter in its run method. "
141
                "The Agent component requires a chat generator that supports tools."
142
            )
143

144
        valid_exits = ["text"] + [tool.name for tool in flatten_tools_or_toolsets(tools)]
1✔
145
        if exit_conditions is None:
1✔
146
            exit_conditions = ["text"]
1✔
147
        if not all(condition in valid_exits for condition in exit_conditions):
1✔
148
            raise ValueError(
1✔
149
                f"Invalid exit conditions provided: {exit_conditions}. "
150
                f"Valid exit conditions must be a subset of {valid_exits}. "
151
                "Ensure that each exit condition corresponds to either 'text' or a valid tool name."
152
            )
153

154
        # Validate state schema if provided
155
        if state_schema is not None:
1✔
156
            _validate_schema(state_schema)
1✔
157
        self._state_schema = state_schema or {}
1✔
158

159
        # Initialize state schema
160
        resolved_state_schema = _deepcopy_with_exceptions(self._state_schema)
1✔
161
        if resolved_state_schema.get("messages") is None:
1✔
162
            resolved_state_schema["messages"] = {"type": list[ChatMessage], "handler": merge_lists}
1✔
163
        self.state_schema = resolved_state_schema
1✔
164

165
        self.chat_generator = chat_generator
1✔
166
        self.tools = tools or []
1✔
167
        self.system_prompt = system_prompt
1✔
168
        self.exit_conditions = exit_conditions
1✔
169
        self.max_agent_steps = max_agent_steps
1✔
170
        self.raise_on_tool_invocation_failure = raise_on_tool_invocation_failure
1✔
171
        self.streaming_callback = streaming_callback
1✔
172

173
        output_types = {"last_message": ChatMessage}
1✔
174
        for param, config in self.state_schema.items():
1✔
175
            output_types[param] = config["type"]
1✔
176
            # Skip setting input types for parameters that are already in the run method
177
            if param in ["messages", "streaming_callback"]:
1✔
178
                continue
1✔
179
            component.set_input_type(self, name=param, type=config["type"], default=None)
1✔
180
        component.set_output_types(self, **output_types)
1✔
181

182
        self.tool_invoker_kwargs = tool_invoker_kwargs
1✔
183
        self._tool_invoker = None
1✔
184
        if self.tools:
1✔
185
            resolved_tool_invoker_kwargs = {
1✔
186
                "tools": self.tools,
187
                "raise_on_failure": self.raise_on_tool_invocation_failure,
188
                **(tool_invoker_kwargs or {}),
189
            }
190
            self._tool_invoker = ToolInvoker(**resolved_tool_invoker_kwargs)
1✔
191
        else:
192
            logger.warning(
1✔
193
                "No tools provided to the Agent. The Agent will behave like a ChatGenerator and only return text "
194
                "responses. To enable tool usage, pass tools directly to the Agent, not to the chat_generator."
195
            )
196

197
        self._is_warmed_up = False
1✔
198

199
    def warm_up(self) -> None:
1✔
200
        """
201
        Warm up the Agent.
202
        """
203
        if not self._is_warmed_up:
1✔
204
            if hasattr(self.chat_generator, "warm_up"):
1✔
205
                self.chat_generator.warm_up()
1✔
206
            if hasattr(self._tool_invoker, "warm_up") and self._tool_invoker is not None:
1✔
207
                self._tool_invoker.warm_up()
1✔
208
            self._is_warmed_up = True
1✔
209

210
    def to_dict(self) -> dict[str, Any]:
1✔
211
        """
212
        Serialize the component to a dictionary.
213

214
        :return: Dictionary with serialized data
215
        """
216
        return default_to_dict(
1✔
217
            self,
218
            chat_generator=component_to_dict(obj=self.chat_generator, name="chat_generator"),
219
            tools=serialize_tools_or_toolset(self.tools),
220
            system_prompt=self.system_prompt,
221
            exit_conditions=self.exit_conditions,
222
            # We serialize the original state schema, not the resolved one to reflect the original user input
223
            state_schema=_schema_to_dict(self._state_schema),
224
            max_agent_steps=self.max_agent_steps,
225
            streaming_callback=serialize_callable(self.streaming_callback) if self.streaming_callback else None,
226
            raise_on_tool_invocation_failure=self.raise_on_tool_invocation_failure,
227
            tool_invoker_kwargs=self.tool_invoker_kwargs,
228
        )
229

230
    @classmethod
1✔
231
    def from_dict(cls, data: dict[str, Any]) -> "Agent":
1✔
232
        """
233
        Deserialize the agent from a dictionary.
234

235
        :param data: Dictionary to deserialize from
236
        :return: Deserialized agent
237
        """
238
        init_params = data.get("init_parameters", {})
1✔
239

240
        deserialize_chatgenerator_inplace(init_params, key="chat_generator")
1✔
241

242
        if "state_schema" in init_params:
1✔
243
            init_params["state_schema"] = _schema_from_dict(init_params["state_schema"])
1✔
244

245
        if init_params.get("streaming_callback") is not None:
1✔
246
            init_params["streaming_callback"] = deserialize_callable(init_params["streaming_callback"])
1✔
247

248
        deserialize_tools_or_toolset_inplace(init_params, key="tools")
1✔
249

250
        return default_from_dict(cls, data)
1✔
251

252
    def _create_agent_span(self) -> Any:
1✔
253
        """Create a span for the agent run."""
254
        return tracing.tracer.trace(
1✔
255
            "haystack.agent.run",
256
            tags={
257
                "haystack.agent.max_steps": self.max_agent_steps,
258
                "haystack.agent.tools": self.tools,
259
                "haystack.agent.exit_conditions": self.exit_conditions,
260
                "haystack.agent.state_schema": _schema_to_dict(self.state_schema),
261
            },
262
        )
263

264
    def _initialize_fresh_execution(
1✔
265
        self,
266
        messages: list[ChatMessage],
267
        streaming_callback: Optional[StreamingCallbackT],
268
        requires_async: bool,
269
        *,
270
        system_prompt: Optional[str] = None,
271
        tools: Optional[Union[ToolsType, list[str]]] = None,
272
        **kwargs,
273
    ) -> _ExecutionContext:
274
        """
275
        Initialize execution context for a fresh run of the agent.
276

277
        :param messages: List of ChatMessage objects to start the agent with.
278
        :param streaming_callback: Optional callback for streaming responses.
279
        :param requires_async: Whether the agent run requires asynchronous execution.
280
        :param system_prompt: System prompt for the agent. If provided, it overrides the default system prompt.
281
        :param tools: Optional list of Tool objects, a Toolset, or list of tool names to use for this run.
282
            When passing tool names, tools are selected from the Agent's originally configured tools.
283
        :param kwargs: Additional data to pass to the State used by the Agent.
284
        """
285
        system_prompt = system_prompt or self.system_prompt
1✔
286
        if system_prompt is not None:
1✔
287
            messages = [ChatMessage.from_system(system_prompt)] + messages
1✔
288

289
        if all(m.is_from(ChatRole.SYSTEM) for m in messages):
1✔
290
            logger.warning("All messages provided to the Agent component are system messages. This is not recommended.")
1✔
291

292
        state = State(schema=self.state_schema, data=kwargs)
1✔
293
        state.set("messages", messages)
1✔
294

295
        streaming_callback = select_streaming_callback(  # type: ignore[call-overload]
1✔
296
            init_callback=self.streaming_callback, runtime_callback=streaming_callback, requires_async=requires_async
297
        )
298

299
        selected_tools = self._select_tools(tools)
1✔
300
        tool_invoker_inputs: dict[str, Any] = {"tools": selected_tools}
1✔
301
        generator_inputs: dict[str, Any] = {"tools": selected_tools}
1✔
302
        if streaming_callback is not None:
1✔
303
            tool_invoker_inputs["streaming_callback"] = streaming_callback
1✔
304
            generator_inputs["streaming_callback"] = streaming_callback
1✔
305

306
        return _ExecutionContext(
1✔
307
            state=state,
308
            component_visits=dict.fromkeys(["chat_generator", "tool_invoker"], 0),
309
            chat_generator_inputs=generator_inputs,
310
            tool_invoker_inputs=tool_invoker_inputs,
311
        )
312

313
    def _select_tools(self, tools: Optional[Union[ToolsType, list[str]]] = None) -> ToolsType:
1✔
314
        """
315
        Select tools for the current run based on the provided tools parameter.
316

317
        :param tools: Optional list of Tool objects, a Toolset, or list of tool names to use for this run.
318
            When passing tool names, tools are selected from the Agent's originally configured tools.
319
        :returns: Selected tools for the current run.
320
        :raises ValueError: If tool names are provided but no tools were configured at initialization,
321
            or if any provided tool name is not valid.
322
        :raises TypeError: If tools is not a list of Tool objects, a Toolset, or a list of tool names (strings).
323
        """
324
        if tools is None:
1✔
325
            return self.tools
1✔
326

327
        if isinstance(tools, list) and all(isinstance(t, str) for t in tools):
1✔
328
            if not self.tools:
1✔
329
                raise ValueError("No tools were configured for the Agent at initialization.")
1✔
330
            available_tools = flatten_tools_or_toolsets(self.tools)
1✔
331
            selected_tool_names = cast(list[str], tools)  # mypy thinks this could still be list[Tool] or Toolset
1✔
332
            valid_tool_names = {tool.name for tool in available_tools}
1✔
333
            invalid_tool_names = {name for name in selected_tool_names if name not in valid_tool_names}
1✔
334
            if invalid_tool_names:
1✔
335
                raise ValueError(
1✔
336
                    f"The following tool names are not valid: {invalid_tool_names}. "
337
                    f"Valid tool names are: {valid_tool_names}."
338
                )
339
            return [tool for tool in available_tools if tool.name in selected_tool_names]
1✔
340

341
        if isinstance(tools, Toolset):
1✔
342
            return tools
×
343

344
        if isinstance(tools, list):
1✔
345
            return cast(list[Union[Tool, Toolset]], tools)  # mypy can't narrow the Union type from isinstance check
1✔
346

347
        raise TypeError(
1✔
348
            "tools must be a list of Tool and/or Toolset objects, a Toolset, or a list of tool names (strings)."
349
        )
350

351
    def _initialize_from_snapshot(
1✔
352
        self,
353
        snapshot: AgentSnapshot,
354
        streaming_callback: Optional[StreamingCallbackT],
355
        requires_async: bool,
356
        *,
357
        tools: Optional[Union[ToolsType, list[str]]] = None,
358
    ) -> _ExecutionContext:
359
        """
360
        Initialize execution context from an AgentSnapshot.
361

362
        :param snapshot: An AgentSnapshot containing the state of a previously saved agent execution.
363
        :param streaming_callback: Optional callback for streaming responses.
364
        :param requires_async: Whether the agent run requires asynchronous execution.
365
        :param tools: Optional list of Tool objects, a Toolset, or list of tool names to use for this run.
366
            When passing tool names, tools are selected from the Agent's originally configured tools.
367
        """
368
        component_visits = snapshot.component_visits
1✔
369
        current_inputs = {
1✔
370
            "chat_generator": _deserialize_value_with_schema(snapshot.component_inputs["chat_generator"]),
371
            "tool_invoker": _deserialize_value_with_schema(snapshot.component_inputs["tool_invoker"]),
372
        }
373

374
        state_data = current_inputs["tool_invoker"]["state"].data
1✔
375
        state = State(schema=self.state_schema, data=state_data)
1✔
376

377
        skip_chat_generator = isinstance(snapshot.break_point.break_point, ToolBreakpoint)
1✔
378
        streaming_callback = current_inputs["chat_generator"].get("streaming_callback", streaming_callback)
1✔
379
        streaming_callback = select_streaming_callback(  # type: ignore[call-overload]
1✔
380
            init_callback=self.streaming_callback, runtime_callback=streaming_callback, requires_async=requires_async
381
        )
382

383
        selected_tools = self._select_tools(tools)
1✔
384
        tool_invoker_inputs: dict[str, Any] = {"tools": selected_tools}
1✔
385
        generator_inputs: dict[str, Any] = {"tools": selected_tools}
1✔
386
        if streaming_callback is not None:
1✔
387
            tool_invoker_inputs["streaming_callback"] = streaming_callback
×
388
            generator_inputs["streaming_callback"] = streaming_callback
×
389

390
        return _ExecutionContext(
1✔
391
            state=state,
392
            component_visits=component_visits,
393
            chat_generator_inputs=generator_inputs,
394
            tool_invoker_inputs=tool_invoker_inputs,
395
            counter=snapshot.break_point.break_point.visit_count,
396
            skip_chat_generator=skip_chat_generator,
397
        )
398

399
    def _runtime_checks(self, break_point: Optional[AgentBreakpoint], snapshot: Optional[AgentSnapshot]) -> None:
1✔
400
        """
401
        Perform runtime checks before running the agent.
402

403
        :param break_point: An AgentBreakpoint, can be a Breakpoint for the "chat_generator" or a ToolBreakpoint
404
            for "tool_invoker".
405
        :param snapshot: An AgentSnapshot containing the state of a previously saved agent execution.
406
        :raises RuntimeError: If the Agent component wasn't warmed up before calling `run()`.
407
        :raises ValueError: If the break_point is invalid.
408
        """
409
        if not self._is_warmed_up and hasattr(self.chat_generator, "warm_up"):
1✔
410
            raise RuntimeError("The component Agent wasn't warmed up. Run 'warm_up()' before calling 'run()'.")
1✔
411

412
        if break_point and isinstance(break_point.break_point, ToolBreakpoint):
1✔
413
            _validate_tool_breakpoint_is_valid(agent_breakpoint=break_point, tools=self.tools)
1✔
414

415
    @staticmethod
1✔
416
    def _check_chat_generator_breakpoint(
1✔
417
        execution_context: _ExecutionContext,
418
        break_point: Optional[AgentBreakpoint],
419
        parent_snapshot: Optional[PipelineSnapshot],
420
    ) -> None:
421
        """
422
        Check if the chat generator breakpoint should be triggered.
423

424
        If the breakpoint should be triggered, create an agent snapshot and trigger the chat generator breakpoint.
425

426
        :param execution_context: The current execution context of the agent.
427
        :param break_point: An AgentBreakpoint, can be a Breakpoint for the "chat_generator" or a ToolBreakpoint
428
            for "tool_invoker".
429
        :param parent_snapshot: An optional parent snapshot for the agent execution.
430
        """
431
        if (
1✔
432
            break_point
433
            and break_point.break_point.component_name == "chat_generator"
434
            and execution_context.component_visits["chat_generator"] == break_point.break_point.visit_count
435
        ):
436
            pipeline_snapshot = _create_pipeline_snapshot_from_chat_generator(
1✔
437
                execution_context=execution_context, break_point=break_point, parent_snapshot=parent_snapshot
438
            )
439
            _trigger_chat_generator_breakpoint(pipeline_snapshot=pipeline_snapshot)
1✔
440

441
    @staticmethod
1✔
442
    def _check_tool_invoker_breakpoint(
1✔
443
        execution_context: _ExecutionContext,
444
        break_point: Optional[AgentBreakpoint],
445
        parent_snapshot: Optional[PipelineSnapshot],
446
    ) -> None:
447
        """
448
        Check if the tool invoker breakpoint should be triggered.
449

450
        If the breakpoint should be triggered, create an agent snapshot and trigger the tool invoker breakpoint.
451

452
        :param execution_context: The current execution context of the agent.
453
        :param break_point: An AgentBreakpoint, can be a Breakpoint for the "chat_generator" or a ToolBreakpoint
454
            for "tool_invoker".
455
        :param parent_snapshot: An optional parent snapshot for the agent execution.
456
        """
457
        if (
1✔
458
            break_point
459
            and break_point.break_point.component_name == "tool_invoker"
460
            and break_point.break_point.visit_count == execution_context.component_visits["tool_invoker"]
461
        ):
462
            pipeline_snapshot = _create_pipeline_snapshot_from_tool_invoker(
1✔
463
                execution_context=execution_context, break_point=break_point, parent_snapshot=parent_snapshot
464
            )
465
            _trigger_tool_invoker_breakpoint(
1✔
466
                llm_messages=execution_context.state.data["messages"][-1:], pipeline_snapshot=pipeline_snapshot
467
            )
468

469
    def run(  # noqa: PLR0915
1✔
470
        self,
471
        messages: list[ChatMessage],
472
        streaming_callback: Optional[StreamingCallbackT] = None,
473
        *,
474
        break_point: Optional[AgentBreakpoint] = None,
475
        snapshot: Optional[AgentSnapshot] = None,
476
        system_prompt: Optional[str] = None,
477
        tools: Optional[Union[ToolsType, list[str]]] = None,
478
        **kwargs: Any,
479
    ) -> dict[str, Any]:
480
        """
481
        Process messages and execute tools until an exit condition is met.
482

483
        :param messages: List of Haystack ChatMessage objects to process.
484
        :param streaming_callback: A callback that will be invoked when a response is streamed from the LLM.
485
            The same callback can be configured to emit tool results when a tool is called.
486
        :param break_point: An AgentBreakpoint, can be a Breakpoint for the "chat_generator" or a ToolBreakpoint
487
            for "tool_invoker".
488
        :param snapshot: A dictionary containing a snapshot of a previously saved agent execution. The snapshot contains
489
            the relevant information to restart the Agent execution from where it left off.
490
        :param system_prompt: System prompt for the agent. If provided, it overrides the default system prompt.
491
        :param tools: Optional list of Tool objects, a Toolset, or list of tool names to use for this run.
492
            When passing tool names, tools are selected from the Agent's originally configured tools.
493
        :param kwargs: Additional data to pass to the State schema used by the Agent.
494
            The keys must match the schema defined in the Agent's `state_schema`.
495
        :returns:
496
            A dictionary with the following keys:
497
            - "messages": List of all messages exchanged during the agent's run.
498
            - "last_message": The last message exchanged during the agent's run.
499
            - Any additional keys defined in the `state_schema`.
500
        :raises RuntimeError: If the Agent component wasn't warmed up before calling `run()`.
501
        :raises BreakpointException: If an agent breakpoint is triggered.
502
        """
503
        # We pop parent_snapshot from kwargs to avoid passing it into State.
504
        parent_snapshot = kwargs.pop("parent_snapshot", None)
1✔
505
        agent_inputs = {
1✔
506
            "messages": messages,
507
            "streaming_callback": streaming_callback,
508
            "break_point": break_point,
509
            "snapshot": snapshot,
510
            **kwargs,
511
        }
512
        self._runtime_checks(break_point=break_point, snapshot=snapshot)
1✔
513

514
        if snapshot:
1✔
515
            exe_context = self._initialize_from_snapshot(
1✔
516
                snapshot=snapshot, streaming_callback=streaming_callback, requires_async=False, tools=tools
517
            )
518
        else:
519
            exe_context = self._initialize_fresh_execution(
1✔
520
                messages=messages,
521
                streaming_callback=streaming_callback,
522
                requires_async=False,
523
                system_prompt=system_prompt,
524
                tools=tools,
525
                **kwargs,
526
            )
527

528
        with self._create_agent_span() as span:
1✔
529
            span.set_content_tag("haystack.agent.input", _deepcopy_with_exceptions(agent_inputs))
1✔
530

531
            while exe_context.counter < self.max_agent_steps:
1✔
532
                # Handle breakpoint and ChatGenerator call
533
                Agent._check_chat_generator_breakpoint(
1✔
534
                    execution_context=exe_context, break_point=break_point, parent_snapshot=parent_snapshot
535
                )
536
                # We skip the chat generator when restarting from a snapshot from a ToolBreakpoint
537
                if exe_context.skip_chat_generator:
1✔
538
                    llm_messages = exe_context.state.get("messages", [])[-1:]
1✔
539
                    # Set to False so the next iteration will call the chat generator
540
                    exe_context.skip_chat_generator = False
1✔
541
                else:
542
                    try:
1✔
543
                        result = Pipeline._run_component(
1✔
544
                            component_name="chat_generator",
545
                            component={"instance": self.chat_generator},
546
                            inputs={
547
                                "messages": exe_context.state.data["messages"],
548
                                **exe_context.chat_generator_inputs,
549
                            },
550
                            component_visits=exe_context.component_visits,
551
                            parent_span=span,
552
                        )
553
                    except PipelineRuntimeError as e:
1✔
554
                        pipeline_snapshot = _create_pipeline_snapshot_from_chat_generator(
1✔
555
                            agent_name=getattr(self, "__component_name__", None),
556
                            execution_context=exe_context,
557
                            parent_snapshot=parent_snapshot,
558
                        )
559
                        e.pipeline_snapshot = pipeline_snapshot
1✔
560
                        raise e
1✔
561

562
                    llm_messages = result["replies"]
1✔
563
                    exe_context.state.set("messages", llm_messages)
1✔
564

565
                # Check if any of the LLM responses contain a tool call or if the LLM is not using tools
566
                if not any(msg.tool_call for msg in llm_messages) or self._tool_invoker is None:
1✔
567
                    exe_context.counter += 1
1✔
568
                    break
1✔
569

570
                # Handle breakpoint and ToolInvoker call
571
                Agent._check_tool_invoker_breakpoint(
1✔
572
                    execution_context=exe_context, break_point=break_point, parent_snapshot=parent_snapshot
573
                )
574
                try:
1✔
575
                    # We only send the messages from the LLM to the tool invoker
576
                    tool_invoker_result = Pipeline._run_component(
1✔
577
                        component_name="tool_invoker",
578
                        component={"instance": self._tool_invoker},
579
                        inputs={
580
                            "messages": llm_messages,
581
                            "state": exe_context.state,
582
                            **exe_context.tool_invoker_inputs,
583
                        },
584
                        component_visits=exe_context.component_visits,
585
                        parent_span=span,
586
                    )
587
                except PipelineRuntimeError as e:
1✔
588
                    # Access the original Tool Invoker exception
589
                    original_error = e.__cause__
1✔
590
                    tool_name = getattr(original_error, "tool_name", None)
1✔
591

592
                    pipeline_snapshot = _create_pipeline_snapshot_from_tool_invoker(
1✔
593
                        tool_name=tool_name,
594
                        agent_name=getattr(self, "__component_name__", None),
595
                        execution_context=exe_context,
596
                        parent_snapshot=parent_snapshot,
597
                    )
598
                    e.pipeline_snapshot = pipeline_snapshot
1✔
599
                    raise e
1✔
600

601
                tool_messages = tool_invoker_result["tool_messages"]
1✔
602
                exe_context.state = tool_invoker_result["state"]
1✔
603
                exe_context.state.set("messages", tool_messages)
1✔
604

605
                # Check if any LLM message's tool call name matches an exit condition
606
                if self.exit_conditions != ["text"] and self._check_exit_conditions(llm_messages, tool_messages):
1✔
607
                    exe_context.counter += 1
1✔
608
                    break
1✔
609

610
                # Increment the step counter
611
                exe_context.counter += 1
1✔
612

613
            if exe_context.counter >= self.max_agent_steps:
1✔
614
                logger.warning(
1✔
615
                    "Agent reached maximum agent steps of {max_agent_steps}, stopping.",
616
                    max_agent_steps=self.max_agent_steps,
617
                )
618
            span.set_content_tag("haystack.agent.output", exe_context.state.data)
1✔
619
            span.set_tag("haystack.agent.steps_taken", exe_context.counter)
1✔
620

621
        result = {**exe_context.state.data}
1✔
622
        if msgs := result.get("messages"):
1✔
623
            result["last_message"] = msgs[-1]
1✔
624
        return result
1✔
625

626
    async def run_async(
1✔
627
        self,
628
        messages: list[ChatMessage],
629
        streaming_callback: Optional[StreamingCallbackT] = None,
630
        *,
631
        break_point: Optional[AgentBreakpoint] = None,
632
        snapshot: Optional[AgentSnapshot] = None,
633
        system_prompt: Optional[str] = None,
634
        tools: Optional[Union[ToolsType, list[str]]] = None,
635
        **kwargs: Any,
636
    ) -> dict[str, Any]:
637
        """
638
        Asynchronously process messages and execute tools until the exit condition is met.
639

640
        This is the asynchronous version of the `run` method. It follows the same logic but uses
641
        asynchronous operations where possible, such as calling the `run_async` method of the ChatGenerator
642
        if available.
643

644
        :param messages: List of Haystack ChatMessage objects to process.
645
        :param streaming_callback: An asynchronous callback that will be invoked when a response is streamed from the
646
            LLM. The same callback can be configured to emit tool results when a tool is called.
647
        :param break_point: An AgentBreakpoint, can be a Breakpoint for the "chat_generator" or a ToolBreakpoint
648
            for "tool_invoker".
649
        :param snapshot: A dictionary containing a snapshot of a previously saved agent execution. The snapshot contains
650
            the relevant information to restart the Agent execution from where it left off.
651
        :param system_prompt: System prompt for the agent. If provided, it overrides the default system prompt.
652
        :param tools: Optional list of Tool objects, a Toolset, or list of tool names to use for this run.
653
        :param kwargs: Additional data to pass to the State schema used by the Agent.
654
            The keys must match the schema defined in the Agent's `state_schema`.
655
        :returns:
656
            A dictionary with the following keys:
657
            - "messages": List of all messages exchanged during the agent's run.
658
            - "last_message": The last message exchanged during the agent's run.
659
            - Any additional keys defined in the `state_schema`.
660
        :raises RuntimeError: If the Agent component wasn't warmed up before calling `run_async()`.
661
        :raises BreakpointException: If an agent breakpoint is triggered.
662
        """
663
        # We pop parent_snapshot from kwargs to avoid passing it into State.
664
        parent_snapshot = kwargs.pop("parent_snapshot", None)
1✔
665
        agent_inputs = {
1✔
666
            "messages": messages,
667
            "streaming_callback": streaming_callback,
668
            "break_point": break_point,
669
            "snapshot": snapshot,
670
            **kwargs,
671
        }
672
        self._runtime_checks(break_point=break_point, snapshot=snapshot)
1✔
673

674
        if snapshot:
1✔
675
            exe_context = self._initialize_from_snapshot(
1✔
676
                snapshot=snapshot, streaming_callback=streaming_callback, requires_async=True, tools=tools
677
            )
678
        else:
679
            exe_context = self._initialize_fresh_execution(
1✔
680
                messages=messages,
681
                streaming_callback=streaming_callback,
682
                requires_async=True,
683
                system_prompt=system_prompt,
684
                tools=tools,
685
                **kwargs,
686
            )
687

688
        with self._create_agent_span() as span:
1✔
689
            span.set_content_tag("haystack.agent.input", _deepcopy_with_exceptions(agent_inputs))
1✔
690

691
            while exe_context.counter < self.max_agent_steps:
1✔
692
                # Handle breakpoint and ChatGenerator call
693
                self._check_chat_generator_breakpoint(
1✔
694
                    execution_context=exe_context, break_point=break_point, parent_snapshot=parent_snapshot
695
                )
696
                # We skip the chat generator when restarting from a snapshot from a ToolBreakpoint
697
                if exe_context.skip_chat_generator:
1✔
698
                    llm_messages = exe_context.state.get("messages", [])[-1:]
1✔
699
                    # Set to False so the next iteration will call the chat generator
700
                    exe_context.skip_chat_generator = False
1✔
701
                else:
702
                    result = await AsyncPipeline._run_component_async(
1✔
703
                        component_name="chat_generator",
704
                        component={"instance": self.chat_generator},
705
                        component_inputs={
706
                            "messages": exe_context.state.data["messages"],
707
                            **exe_context.chat_generator_inputs,
708
                        },
709
                        component_visits=exe_context.component_visits,
710
                        parent_span=span,
711
                    )
712
                    llm_messages = result["replies"]
1✔
713
                    exe_context.state.set("messages", llm_messages)
1✔
714

715
                # Check if any of the LLM responses contain a tool call or if the LLM is not using tools
716
                if not any(msg.tool_call for msg in llm_messages) or self._tool_invoker is None:
1✔
717
                    exe_context.counter += 1
1✔
718
                    break
1✔
719

720
                # Handle breakpoint and ToolInvoker call
721
                self._check_tool_invoker_breakpoint(
1✔
722
                    execution_context=exe_context, break_point=break_point, parent_snapshot=parent_snapshot
723
                )
724
                # We only send the messages from the LLM to the tool invoker
725
                tool_invoker_result = await AsyncPipeline._run_component_async(
1✔
726
                    component_name="tool_invoker",
727
                    component={"instance": self._tool_invoker},
728
                    component_inputs={
729
                        "messages": llm_messages,
730
                        "state": exe_context.state,
731
                        **exe_context.tool_invoker_inputs,
732
                    },
733
                    component_visits=exe_context.component_visits,
734
                    parent_span=span,
735
                )
736
                tool_messages = tool_invoker_result["tool_messages"]
1✔
737
                exe_context.state = tool_invoker_result["state"]
1✔
738
                exe_context.state.set("messages", tool_messages)
1✔
739

740
                # Check if any LLM message's tool call name matches an exit condition
741
                if self.exit_conditions != ["text"] and self._check_exit_conditions(llm_messages, tool_messages):
1✔
742
                    exe_context.counter += 1
×
743
                    break
×
744

745
                # Increment the step counter
746
                exe_context.counter += 1
1✔
747

748
            if exe_context.counter >= self.max_agent_steps:
1✔
749
                logger.warning(
×
750
                    "Agent reached maximum agent steps of {max_agent_steps}, stopping.",
751
                    max_agent_steps=self.max_agent_steps,
752
                )
753
            span.set_content_tag("haystack.agent.output", exe_context.state.data)
1✔
754
            span.set_tag("haystack.agent.steps_taken", exe_context.counter)
1✔
755

756
        result = {**exe_context.state.data}
1✔
757
        if msgs := result.get("messages"):
1✔
758
            result["last_message"] = msgs[-1]
1✔
759
        return result
1✔
760

761
    def _check_exit_conditions(self, llm_messages: list[ChatMessage], tool_messages: list[ChatMessage]) -> bool:
1✔
762
        """
763
        Check if any of the LLM messages' tool calls match an exit condition and if there are no errors.
764

765
        :param llm_messages: List of messages from the LLM
766
        :param tool_messages: List of messages from the tool invoker
767
        :return: True if an exit condition is met and there are no errors, False otherwise
768
        """
769
        matched_exit_conditions = set()
1✔
770
        has_errors = False
1✔
771

772
        for msg in llm_messages:
1✔
773
            if msg.tool_call and msg.tool_call.tool_name in self.exit_conditions:
1✔
774
                matched_exit_conditions.add(msg.tool_call.tool_name)
1✔
775

776
                # Check if any error is specifically from the tool matching the exit condition
777
                tool_errors = [
1✔
778
                    tool_msg.tool_call_result.error
779
                    for tool_msg in tool_messages
780
                    if tool_msg.tool_call_result is not None
781
                    and tool_msg.tool_call_result.origin.tool_name == msg.tool_call.tool_name
782
                ]
783
                if any(tool_errors):
1✔
784
                    has_errors = True
×
785
                    # No need to check further if we found an error
786
                    break
×
787

788
        # Only return True if at least one exit condition was matched AND none had errors
789
        return bool(matched_exit_conditions) and not has_errors
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc