• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 18592817487

17 Oct 2025 12:33PM UTC coverage: 92.2% (+0.1%) from 92.062%
18592817487

Pull #9859

github

web-flow
Merge f20ff2b98 into a43c47b63
Pull Request #9859: feat: Add FallbackChatGenerator

13346 of 14475 relevant lines covered (92.2%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.77
haystack/components/agents/agent.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import inspect
1✔
6
from dataclasses import dataclass
1✔
7
from typing import Any, Optional, Union
1✔
8

9
from haystack import logging, tracing
1✔
10
from haystack.components.generators.chat.types import ChatGenerator
1✔
11
from haystack.components.tools import ToolInvoker
1✔
12
from haystack.core.component.component import component
1✔
13
from haystack.core.errors import PipelineRuntimeError
1✔
14
from haystack.core.pipeline.async_pipeline import AsyncPipeline
1✔
15
from haystack.core.pipeline.breakpoint import (
1✔
16
    _create_pipeline_snapshot_from_chat_generator,
17
    _create_pipeline_snapshot_from_tool_invoker,
18
    _trigger_chat_generator_breakpoint,
19
    _trigger_tool_invoker_breakpoint,
20
    _validate_tool_breakpoint_is_valid,
21
)
22
from haystack.core.pipeline.pipeline import Pipeline
1✔
23
from haystack.core.pipeline.utils import _deepcopy_with_exceptions
1✔
24
from haystack.core.serialization import component_to_dict, default_from_dict, default_to_dict
1✔
25
from haystack.dataclasses import ChatMessage, ChatRole
1✔
26
from haystack.dataclasses.breakpoints import AgentBreakpoint, AgentSnapshot, PipelineSnapshot, ToolBreakpoint
1✔
27
from haystack.dataclasses.streaming_chunk import StreamingCallbackT, select_streaming_callback
1✔
28
from haystack.tools import Tool, Toolset, deserialize_tools_or_toolset_inplace, serialize_tools_or_toolset
1✔
29
from haystack.utils import _deserialize_value_with_schema
1✔
30
from haystack.utils.callable_serialization import deserialize_callable, serialize_callable
1✔
31
from haystack.utils.deserialization import deserialize_chatgenerator_inplace
1✔
32

33
from .state.state import State, _schema_from_dict, _schema_to_dict, _validate_schema
1✔
34
from .state.state_utils import merge_lists
1✔
35

36
logger = logging.getLogger(__name__)
1✔
37

38

39
@dataclass
1✔
40
class _ExecutionContext:
1✔
41
    """
42
    Context for executing the agent.
43

44
    :param state: The current state of the agent, including messages and any additional data.
45
    :param component_visits: A dictionary tracking how many times each component has been visited.
46
    :param chat_generator_inputs: Runtime inputs to be passed to the chat generator.
47
    :param tool_invoker_inputs: Runtime inputs to be passed to the tool invoker.
48
    :param counter: A counter to track the number of steps taken in the agent's run.
49
    :param skip_chat_generator: A flag to indicate whether to skip the chat generator in the next iteration.
50
        This is useful when resuming from a ToolBreakpoint where the ToolInvoker needs to be called first.
51
    """
52

53
    state: State
1✔
54
    component_visits: dict
1✔
55
    chat_generator_inputs: dict
1✔
56
    tool_invoker_inputs: dict
1✔
57
    counter: int = 0
1✔
58
    skip_chat_generator: bool = False
1✔
59

60

61
@component
1✔
62
class Agent:
1✔
63
    """
64
    A Haystack component that implements a tool-using agent with provider-agnostic chat model support.
65

66
    The component processes messages and executes tools until an exit condition is met.
67
    The exit condition can be triggered either by a direct text response or by invoking a specific designated tool.
68
    Multiple exit conditions can be specified.
69

70
    When you call an Agent without tools, it acts as a ChatGenerator, produces one response, then exits.
71

72
    ### Usage example
73
    ```python
74
    from haystack.components.agents import Agent
75
    from haystack.components.generators.chat import OpenAIChatGenerator
76
    from haystack.dataclasses import ChatMessage
77
    from haystack.tools.tool import Tool
78

79
    tools = [Tool(name="calculator", description="..."), Tool(name="search", description="...")]
80

81
    agent = Agent(
82
        chat_generator=OpenAIChatGenerator(),
83
        tools=tools,
84
        exit_conditions=["search"],
85
    )
86

87
    # Run the agent
88
    result = agent.run(
89
        messages=[ChatMessage.from_user("Find information about Haystack")]
90
    )
91

92
    assert "messages" in result  # Contains conversation history
93
    ```
94
    """
95

96
    def __init__(
1✔
97
        self,
98
        *,
99
        chat_generator: ChatGenerator,
100
        tools: Optional[Union[list[Tool], Toolset]] = None,
101
        system_prompt: Optional[str] = None,
102
        exit_conditions: Optional[list[str]] = None,
103
        state_schema: Optional[dict[str, Any]] = None,
104
        max_agent_steps: int = 100,
105
        streaming_callback: Optional[StreamingCallbackT] = None,
106
        raise_on_tool_invocation_failure: bool = False,
107
        tool_invoker_kwargs: Optional[dict[str, Any]] = None,
108
    ) -> None:
109
        """
110
        Initialize the agent component.
111

112
        :param chat_generator: An instance of the chat generator that your agent should use. It must support tools.
113
        :param tools: List of Tool objects or a Toolset that the agent can use.
114
        :param system_prompt: System prompt for the agent.
115
        :param exit_conditions: List of conditions that will cause the agent to return.
116
            Can include "text" if the agent should return when it generates a message without tool calls,
117
            or tool names that will cause the agent to return once the tool was executed. Defaults to ["text"].
118
        :param state_schema: The schema for the runtime state used by the tools.
119
        :param max_agent_steps: Maximum number of steps the agent will run before stopping. Defaults to 100.
120
            If the agent exceeds this number of steps, it will stop and return the current state.
121
        :param streaming_callback: A callback that will be invoked when a response is streamed from the LLM.
122
            The same callback can be configured to emit tool results when a tool is called.
123
        :param raise_on_tool_invocation_failure: Should the agent raise an exception when a tool invocation fails?
124
            If set to False, the exception will be turned into a chat message and passed to the LLM.
125
        :param tool_invoker_kwargs: Additional keyword arguments to pass to the ToolInvoker.
126
        :raises TypeError: If the chat_generator does not support tools parameter in its run method.
127
        :raises ValueError: If the exit_conditions are not valid.
128
        """
129
        # Check if chat_generator supports tools parameter
130
        chat_generator_run_method = inspect.signature(chat_generator.run)
1✔
131
        if "tools" not in chat_generator_run_method.parameters:
1✔
132
            raise TypeError(
1✔
133
                f"{type(chat_generator).__name__} does not accept tools parameter in its run method. "
134
                "The Agent component requires a chat generator that supports tools."
135
            )
136

137
        valid_exits = ["text"] + [tool.name for tool in tools or []]
1✔
138
        if exit_conditions is None:
1✔
139
            exit_conditions = ["text"]
1✔
140
        if not all(condition in valid_exits for condition in exit_conditions):
1✔
141
            raise ValueError(
1✔
142
                f"Invalid exit conditions provided: {exit_conditions}. "
143
                f"Valid exit conditions must be a subset of {valid_exits}. "
144
                "Ensure that each exit condition corresponds to either 'text' or a valid tool name."
145
            )
146

147
        # Validate state schema if provided
148
        if state_schema is not None:
1✔
149
            _validate_schema(state_schema)
1✔
150
        self._state_schema = state_schema or {}
1✔
151

152
        # Initialize state schema
153
        resolved_state_schema = _deepcopy_with_exceptions(self._state_schema)
1✔
154
        if resolved_state_schema.get("messages") is None:
1✔
155
            resolved_state_schema["messages"] = {"type": list[ChatMessage], "handler": merge_lists}
1✔
156
        self.state_schema = resolved_state_schema
1✔
157

158
        self.chat_generator = chat_generator
1✔
159
        self.tools = tools or []
1✔
160
        self.system_prompt = system_prompt
1✔
161
        self.exit_conditions = exit_conditions
1✔
162
        self.max_agent_steps = max_agent_steps
1✔
163
        self.raise_on_tool_invocation_failure = raise_on_tool_invocation_failure
1✔
164
        self.streaming_callback = streaming_callback
1✔
165

166
        output_types = {"last_message": ChatMessage}
1✔
167
        for param, config in self.state_schema.items():
1✔
168
            output_types[param] = config["type"]
1✔
169
            # Skip setting input types for parameters that are already in the run method
170
            if param in ["messages", "streaming_callback"]:
1✔
171
                continue
1✔
172
            component.set_input_type(self, name=param, type=config["type"], default=None)
1✔
173
        component.set_output_types(self, **output_types)
1✔
174

175
        self.tool_invoker_kwargs = tool_invoker_kwargs
1✔
176
        self._tool_invoker = None
1✔
177
        if self.tools:
1✔
178
            resolved_tool_invoker_kwargs = {
1✔
179
                "tools": self.tools,
180
                "raise_on_failure": self.raise_on_tool_invocation_failure,
181
                **(tool_invoker_kwargs or {}),
182
            }
183
            self._tool_invoker = ToolInvoker(**resolved_tool_invoker_kwargs)
1✔
184
        else:
185
            logger.warning(
1✔
186
                "No tools provided to the Agent. The Agent will behave like a ChatGenerator and only return text "
187
                "responses. To enable tool usage, pass tools directly to the Agent, not to the chat_generator."
188
            )
189

190
        self._is_warmed_up = False
1✔
191

192
    def warm_up(self) -> None:
1✔
193
        """
194
        Warm up the Agent.
195
        """
196
        if not self._is_warmed_up:
1✔
197
            if hasattr(self.chat_generator, "warm_up"):
1✔
198
                self.chat_generator.warm_up()
×
199
            self._is_warmed_up = True
1✔
200

201
    def to_dict(self) -> dict[str, Any]:
1✔
202
        """
203
        Serialize the component to a dictionary.
204

205
        :return: Dictionary with serialized data
206
        """
207
        return default_to_dict(
1✔
208
            self,
209
            chat_generator=component_to_dict(obj=self.chat_generator, name="chat_generator"),
210
            tools=serialize_tools_or_toolset(self.tools),
211
            system_prompt=self.system_prompt,
212
            exit_conditions=self.exit_conditions,
213
            # We serialize the original state schema, not the resolved one to reflect the original user input
214
            state_schema=_schema_to_dict(self._state_schema),
215
            max_agent_steps=self.max_agent_steps,
216
            streaming_callback=serialize_callable(self.streaming_callback) if self.streaming_callback else None,
217
            raise_on_tool_invocation_failure=self.raise_on_tool_invocation_failure,
218
            tool_invoker_kwargs=self.tool_invoker_kwargs,
219
        )
220

221
    @classmethod
1✔
222
    def from_dict(cls, data: dict[str, Any]) -> "Agent":
1✔
223
        """
224
        Deserialize the agent from a dictionary.
225

226
        :param data: Dictionary to deserialize from
227
        :return: Deserialized agent
228
        """
229
        init_params = data.get("init_parameters", {})
1✔
230

231
        deserialize_chatgenerator_inplace(init_params, key="chat_generator")
1✔
232

233
        if "state_schema" in init_params:
1✔
234
            init_params["state_schema"] = _schema_from_dict(init_params["state_schema"])
1✔
235

236
        if init_params.get("streaming_callback") is not None:
1✔
237
            init_params["streaming_callback"] = deserialize_callable(init_params["streaming_callback"])
1✔
238

239
        deserialize_tools_or_toolset_inplace(init_params, key="tools")
1✔
240

241
        return default_from_dict(cls, data)
1✔
242

243
    def _create_agent_span(self) -> Any:
1✔
244
        """Create a span for the agent run."""
245
        return tracing.tracer.trace(
1✔
246
            "haystack.agent.run",
247
            tags={
248
                "haystack.agent.max_steps": self.max_agent_steps,
249
                "haystack.agent.tools": self.tools,
250
                "haystack.agent.exit_conditions": self.exit_conditions,
251
                "haystack.agent.state_schema": _schema_to_dict(self.state_schema),
252
            },
253
        )
254

255
    def _initialize_fresh_execution(
1✔
256
        self,
257
        messages: list[ChatMessage],
258
        streaming_callback: Optional[StreamingCallbackT],
259
        requires_async: bool,
260
        *,
261
        system_prompt: Optional[str] = None,
262
        tools: Optional[Union[list[Tool], Toolset, list[str]]] = None,
263
        **kwargs,
264
    ) -> _ExecutionContext:
265
        """
266
        Initialize execution context for a fresh run of the agent.
267

268
        :param messages: List of ChatMessage objects to start the agent with.
269
        :param streaming_callback: Optional callback for streaming responses.
270
        :param requires_async: Whether the agent run requires asynchronous execution.
271
        :param system_prompt: System prompt for the agent. If provided, it overrides the default system prompt.
272
        :param tools: Optional list of Tool objects, a Toolset, or list of tool names to use for this run.
273
            When passing tool names, tools are selected from the Agent's originally configured tools.
274
        :param kwargs: Additional data to pass to the State used by the Agent.
275
        """
276
        system_prompt = system_prompt or self.system_prompt
1✔
277
        if system_prompt is not None:
1✔
278
            messages = [ChatMessage.from_system(system_prompt)] + messages
1✔
279

280
        if all(m.is_from(ChatRole.SYSTEM) for m in messages):
1✔
281
            logger.warning("All messages provided to the Agent component are system messages. This is not recommended.")
1✔
282

283
        state = State(schema=self.state_schema, data=kwargs)
1✔
284
        state.set("messages", messages)
1✔
285

286
        streaming_callback = select_streaming_callback(  # type: ignore[call-overload]
1✔
287
            init_callback=self.streaming_callback, runtime_callback=streaming_callback, requires_async=requires_async
288
        )
289

290
        selected_tools = self._select_tools(tools)
1✔
291
        tool_invoker_inputs: dict[str, Any] = {"tools": selected_tools}
1✔
292
        generator_inputs: dict[str, Any] = {"tools": selected_tools}
1✔
293
        if streaming_callback is not None:
1✔
294
            tool_invoker_inputs["streaming_callback"] = streaming_callback
1✔
295
            generator_inputs["streaming_callback"] = streaming_callback
1✔
296

297
        return _ExecutionContext(
1✔
298
            state=state,
299
            component_visits=dict.fromkeys(["chat_generator", "tool_invoker"], 0),
300
            chat_generator_inputs=generator_inputs,
301
            tool_invoker_inputs=tool_invoker_inputs,
302
        )
303

304
    def _select_tools(
1✔
305
        self, tools: Optional[Union[list[Tool], Toolset, list[str]]] = None
306
    ) -> Union[list[Tool], Toolset]:
307
        """
308
        Select tools for the current run based on the provided tools parameter.
309

310
        :param tools: Optional list of Tool objects, a Toolset, or list of tool names to use for this run.
311
            When passing tool names, tools are selected from the Agent's originally configured tools.
312
        :returns: Selected tools for the current run.
313
        :raises ValueError: If tool names are provided but no tools were configured at initialization,
314
            or if any provided tool name is not valid.
315
        :raises TypeError: If tools is not a list of Tool objects, a Toolset, or a list of tool names (strings).
316
        """
317
        selected_tools: Union[list[Tool], Toolset] = self.tools
1✔
318
        if isinstance(tools, Toolset) or isinstance(tools, list) and all(isinstance(t, Tool) for t in tools):
1✔
319
            selected_tools = tools  # type: ignore[assignment] # mypy thinks this could still be list[str]
1✔
320
        elif isinstance(tools, list) and all(isinstance(t, str) for t in tools):
1✔
321
            if not self.tools:
1✔
322
                raise ValueError("No tools were configured for the Agent at initialization.")
1✔
323
            selected_tool_names: list[str] = tools  # type: ignore[assignment] # mypy thinks this could still be list[Tool] or Toolset
1✔
324
            valid_tool_names = {tool.name for tool in self.tools}
1✔
325
            invalid_tool_names = {name for name in selected_tool_names if name not in valid_tool_names}
1✔
326
            if invalid_tool_names:
1✔
327
                raise ValueError(
1✔
328
                    f"The following tool names are not valid: {invalid_tool_names}. "
329
                    f"Valid tool names are: {valid_tool_names}."
330
                )
331
            selected_tools = [tool for tool in self.tools if tool.name in selected_tool_names]
1✔
332
        elif tools is not None:
1✔
333
            raise TypeError("tools must be a list of Tool objects, a Toolset, or a list of tool names (strings).")
1✔
334
        return selected_tools
1✔
335

336
    def _initialize_from_snapshot(
1✔
337
        self,
338
        snapshot: AgentSnapshot,
339
        streaming_callback: Optional[StreamingCallbackT],
340
        requires_async: bool,
341
        *,
342
        tools: Optional[Union[list[Tool], Toolset, list[str]]] = None,
343
    ) -> _ExecutionContext:
344
        """
345
        Initialize execution context from an AgentSnapshot.
346

347
        :param snapshot: An AgentSnapshot containing the state of a previously saved agent execution.
348
        :param streaming_callback: Optional callback for streaming responses.
349
        :param requires_async: Whether the agent run requires asynchronous execution.
350
        :param tools: Optional list of Tool objects, a Toolset, or list of tool names to use for this run.
351
            When passing tool names, tools are selected from the Agent's originally configured tools.
352
        """
353
        component_visits = snapshot.component_visits
1✔
354
        current_inputs = {
1✔
355
            "chat_generator": _deserialize_value_with_schema(snapshot.component_inputs["chat_generator"]),
356
            "tool_invoker": _deserialize_value_with_schema(snapshot.component_inputs["tool_invoker"]),
357
        }
358

359
        state_data = current_inputs["tool_invoker"]["state"].data
1✔
360
        state = State(schema=self.state_schema, data=state_data)
1✔
361

362
        skip_chat_generator = isinstance(snapshot.break_point.break_point, ToolBreakpoint)
1✔
363
        streaming_callback = current_inputs["chat_generator"].get("streaming_callback", streaming_callback)
1✔
364
        streaming_callback = select_streaming_callback(  # type: ignore[call-overload]
1✔
365
            init_callback=self.streaming_callback, runtime_callback=streaming_callback, requires_async=requires_async
366
        )
367

368
        selected_tools = self._select_tools(tools)
1✔
369
        tool_invoker_inputs: dict[str, Any] = {"tools": selected_tools}
1✔
370
        generator_inputs: dict[str, Any] = {"tools": selected_tools}
1✔
371
        if streaming_callback is not None:
1✔
372
            tool_invoker_inputs["streaming_callback"] = streaming_callback
×
373
            generator_inputs["streaming_callback"] = streaming_callback
×
374

375
        return _ExecutionContext(
1✔
376
            state=state,
377
            component_visits=component_visits,
378
            chat_generator_inputs=generator_inputs,
379
            tool_invoker_inputs=tool_invoker_inputs,
380
            counter=snapshot.break_point.break_point.visit_count,
381
            skip_chat_generator=skip_chat_generator,
382
        )
383

384
    def _runtime_checks(self, break_point: Optional[AgentBreakpoint], snapshot: Optional[AgentSnapshot]) -> None:
1✔
385
        """
386
        Perform runtime checks before running the agent.
387

388
        :param break_point: An AgentBreakpoint, can be a Breakpoint for the "chat_generator" or a ToolBreakpoint
389
            for "tool_invoker".
390
        :param snapshot: An AgentSnapshot containing the state of a previously saved agent execution.
391
        :raises RuntimeError: If the Agent component wasn't warmed up before calling `run()`.
392
        :raises ValueError: If the break_point is invalid.
393
        """
394
        if not self._is_warmed_up and hasattr(self.chat_generator, "warm_up"):
1✔
395
            raise RuntimeError("The component Agent wasn't warmed up. Run 'warm_up()' before calling 'run()'.")
1✔
396

397
        if break_point and isinstance(break_point.break_point, ToolBreakpoint):
1✔
398
            _validate_tool_breakpoint_is_valid(agent_breakpoint=break_point, tools=self.tools)
1✔
399

400
    @staticmethod
1✔
401
    def _check_chat_generator_breakpoint(
1✔
402
        execution_context: _ExecutionContext,
403
        break_point: Optional[AgentBreakpoint],
404
        parent_snapshot: Optional[PipelineSnapshot],
405
    ) -> None:
406
        """
407
        Check if the chat generator breakpoint should be triggered.
408

409
        If the breakpoint should be triggered, create an agent snapshot and trigger the chat generator breakpoint.
410

411
        :param execution_context: The current execution context of the agent.
412
        :param break_point: An AgentBreakpoint, can be a Breakpoint for the "chat_generator" or a ToolBreakpoint
413
            for "tool_invoker".
414
        :param parent_snapshot: An optional parent snapshot for the agent execution.
415
        """
416
        if (
1✔
417
            break_point
418
            and break_point.break_point.component_name == "chat_generator"
419
            and execution_context.component_visits["chat_generator"] == break_point.break_point.visit_count
420
        ):
421
            pipeline_snapshot = _create_pipeline_snapshot_from_chat_generator(
1✔
422
                execution_context=execution_context, break_point=break_point, parent_snapshot=parent_snapshot
423
            )
424
            _trigger_chat_generator_breakpoint(pipeline_snapshot=pipeline_snapshot)
1✔
425

426
    @staticmethod
1✔
427
    def _check_tool_invoker_breakpoint(
1✔
428
        execution_context: _ExecutionContext,
429
        break_point: Optional[AgentBreakpoint],
430
        parent_snapshot: Optional[PipelineSnapshot],
431
    ) -> None:
432
        """
433
        Check if the tool invoker breakpoint should be triggered.
434

435
        If the breakpoint should be triggered, create an agent snapshot and trigger the tool invoker breakpoint.
436

437
        :param execution_context: The current execution context of the agent.
438
        :param break_point: An AgentBreakpoint, can be a Breakpoint for the "chat_generator" or a ToolBreakpoint
439
            for "tool_invoker".
440
        :param parent_snapshot: An optional parent snapshot for the agent execution.
441
        """
442
        if (
1✔
443
            break_point
444
            and break_point.break_point.component_name == "tool_invoker"
445
            and break_point.break_point.visit_count == execution_context.component_visits["tool_invoker"]
446
        ):
447
            pipeline_snapshot = _create_pipeline_snapshot_from_tool_invoker(
1✔
448
                execution_context=execution_context, break_point=break_point, parent_snapshot=parent_snapshot
449
            )
450
            _trigger_tool_invoker_breakpoint(
1✔
451
                llm_messages=execution_context.state.data["messages"][-1:], pipeline_snapshot=pipeline_snapshot
452
            )
453

454
    def run(  # noqa: PLR0915
1✔
455
        self,
456
        messages: list[ChatMessage],
457
        streaming_callback: Optional[StreamingCallbackT] = None,
458
        *,
459
        break_point: Optional[AgentBreakpoint] = None,
460
        snapshot: Optional[AgentSnapshot] = None,
461
        system_prompt: Optional[str] = None,
462
        tools: Optional[Union[list[Tool], Toolset, list[str]]] = None,
463
        **kwargs: Any,
464
    ) -> dict[str, Any]:
465
        """
466
        Process messages and execute tools until an exit condition is met.
467

468
        :param messages: List of Haystack ChatMessage objects to process.
469
        :param streaming_callback: A callback that will be invoked when a response is streamed from the LLM.
470
            The same callback can be configured to emit tool results when a tool is called.
471
        :param break_point: An AgentBreakpoint, can be a Breakpoint for the "chat_generator" or a ToolBreakpoint
472
            for "tool_invoker".
473
        :param snapshot: A dictionary containing a snapshot of a previously saved agent execution. The snapshot contains
474
            the relevant information to restart the Agent execution from where it left off.
475
        :param system_prompt: System prompt for the agent. If provided, it overrides the default system prompt.
476
        :param tools: Optional list of Tool objects, a Toolset, or list of tool names to use for this run.
477
            When passing tool names, tools are selected from the Agent's originally configured tools.
478
        :param kwargs: Additional data to pass to the State schema used by the Agent.
479
            The keys must match the schema defined in the Agent's `state_schema`.
480
        :returns:
481
            A dictionary with the following keys:
482
            - "messages": List of all messages exchanged during the agent's run.
483
            - "last_message": The last message exchanged during the agent's run.
484
            - Any additional keys defined in the `state_schema`.
485
        :raises RuntimeError: If the Agent component wasn't warmed up before calling `run()`.
486
        :raises BreakpointException: If an agent breakpoint is triggered.
487
        """
488
        # We pop parent_snapshot from kwargs to avoid passing it into State.
489
        parent_snapshot = kwargs.pop("parent_snapshot", None)
1✔
490
        agent_inputs = {
1✔
491
            "messages": messages,
492
            "streaming_callback": streaming_callback,
493
            "break_point": break_point,
494
            "snapshot": snapshot,
495
            **kwargs,
496
        }
497
        self._runtime_checks(break_point=break_point, snapshot=snapshot)
1✔
498

499
        if snapshot:
1✔
500
            exe_context = self._initialize_from_snapshot(
1✔
501
                snapshot=snapshot, streaming_callback=streaming_callback, requires_async=False, tools=tools
502
            )
503
        else:
504
            exe_context = self._initialize_fresh_execution(
1✔
505
                messages=messages,
506
                streaming_callback=streaming_callback,
507
                requires_async=False,
508
                system_prompt=system_prompt,
509
                tools=tools,
510
                **kwargs,
511
            )
512

513
        with self._create_agent_span() as span:
1✔
514
            span.set_content_tag("haystack.agent.input", _deepcopy_with_exceptions(agent_inputs))
1✔
515

516
            while exe_context.counter < self.max_agent_steps:
1✔
517
                # Handle breakpoint and ChatGenerator call
518
                Agent._check_chat_generator_breakpoint(
1✔
519
                    execution_context=exe_context, break_point=break_point, parent_snapshot=parent_snapshot
520
                )
521
                # We skip the chat generator when restarting from a snapshot from a ToolBreakpoint
522
                if exe_context.skip_chat_generator:
1✔
523
                    llm_messages = exe_context.state.get("messages", [])[-1:]
1✔
524
                    # Set to False so the next iteration will call the chat generator
525
                    exe_context.skip_chat_generator = False
1✔
526
                else:
527
                    try:
1✔
528
                        result = Pipeline._run_component(
1✔
529
                            component_name="chat_generator",
530
                            component={"instance": self.chat_generator},
531
                            inputs={
532
                                "messages": exe_context.state.data["messages"],
533
                                **exe_context.chat_generator_inputs,
534
                            },
535
                            component_visits=exe_context.component_visits,
536
                            parent_span=span,
537
                        )
538
                    except PipelineRuntimeError as e:
1✔
539
                        pipeline_snapshot = _create_pipeline_snapshot_from_chat_generator(
1✔
540
                            agent_name=getattr(self, "__component_name__", None),
541
                            execution_context=exe_context,
542
                            parent_snapshot=parent_snapshot,
543
                        )
544
                        e.pipeline_snapshot = pipeline_snapshot
1✔
545
                        raise e
1✔
546

547
                    llm_messages = result["replies"]
1✔
548
                    exe_context.state.set("messages", llm_messages)
1✔
549

550
                # Check if any of the LLM responses contain a tool call or if the LLM is not using tools
551
                if not any(msg.tool_call for msg in llm_messages) or self._tool_invoker is None:
1✔
552
                    exe_context.counter += 1
1✔
553
                    break
1✔
554

555
                # Handle breakpoint and ToolInvoker call
556
                Agent._check_tool_invoker_breakpoint(
1✔
557
                    execution_context=exe_context, break_point=break_point, parent_snapshot=parent_snapshot
558
                )
559
                try:
1✔
560
                    # We only send the messages from the LLM to the tool invoker
561
                    tool_invoker_result = Pipeline._run_component(
1✔
562
                        component_name="tool_invoker",
563
                        component={"instance": self._tool_invoker},
564
                        inputs={
565
                            "messages": llm_messages,
566
                            "state": exe_context.state,
567
                            **exe_context.tool_invoker_inputs,
568
                        },
569
                        component_visits=exe_context.component_visits,
570
                        parent_span=span,
571
                    )
572
                except PipelineRuntimeError as e:
1✔
573
                    # Access the original Tool Invoker exception
574
                    original_error = e.__cause__
1✔
575
                    tool_name = getattr(original_error, "tool_name", None)
1✔
576

577
                    pipeline_snapshot = _create_pipeline_snapshot_from_tool_invoker(
1✔
578
                        tool_name=tool_name,
579
                        agent_name=getattr(self, "__component_name__", None),
580
                        execution_context=exe_context,
581
                        parent_snapshot=parent_snapshot,
582
                    )
583
                    e.pipeline_snapshot = pipeline_snapshot
1✔
584
                    raise e
1✔
585

586
                tool_messages = tool_invoker_result["tool_messages"]
1✔
587
                exe_context.state = tool_invoker_result["state"]
1✔
588
                exe_context.state.set("messages", tool_messages)
1✔
589

590
                # Check if any LLM message's tool call name matches an exit condition
591
                if self.exit_conditions != ["text"] and self._check_exit_conditions(llm_messages, tool_messages):
1✔
592
                    exe_context.counter += 1
1✔
593
                    break
1✔
594

595
                # Increment the step counter
596
                exe_context.counter += 1
1✔
597

598
            if exe_context.counter >= self.max_agent_steps:
1✔
599
                logger.warning(
1✔
600
                    "Agent reached maximum agent steps of {max_agent_steps}, stopping.",
601
                    max_agent_steps=self.max_agent_steps,
602
                )
603
            span.set_content_tag("haystack.agent.output", exe_context.state.data)
1✔
604
            span.set_tag("haystack.agent.steps_taken", exe_context.counter)
1✔
605

606
        result = {**exe_context.state.data}
1✔
607
        if msgs := result.get("messages"):
1✔
608
            result["last_message"] = msgs[-1]
1✔
609
        return result
1✔
610

611
    async def run_async(
1✔
612
        self,
613
        messages: list[ChatMessage],
614
        streaming_callback: Optional[StreamingCallbackT] = None,
615
        *,
616
        break_point: Optional[AgentBreakpoint] = None,
617
        snapshot: Optional[AgentSnapshot] = None,
618
        system_prompt: Optional[str] = None,
619
        tools: Optional[Union[list[Tool], Toolset, list[str]]] = None,
620
        **kwargs: Any,
621
    ) -> dict[str, Any]:
622
        """
623
        Asynchronously process messages and execute tools until the exit condition is met.
624

625
        This is the asynchronous version of the `run` method. It follows the same logic but uses
626
        asynchronous operations where possible, such as calling the `run_async` method of the ChatGenerator
627
        if available.
628

629
        :param messages: List of Haystack ChatMessage objects to process.
630
        :param streaming_callback: An asynchronous callback that will be invoked when a response is streamed from the
631
            LLM. The same callback can be configured to emit tool results when a tool is called.
632
        :param break_point: An AgentBreakpoint, can be a Breakpoint for the "chat_generator" or a ToolBreakpoint
633
            for "tool_invoker".
634
        :param snapshot: A dictionary containing a snapshot of a previously saved agent execution. The snapshot contains
635
            the relevant information to restart the Agent execution from where it left off.
636
        :param system_prompt: System prompt for the agent. If provided, it overrides the default system prompt.
637
        :param tools: Optional list of Tool objects, a Toolset, or list of tool names to use for this run.
638
        :param kwargs: Additional data to pass to the State schema used by the Agent.
639
            The keys must match the schema defined in the Agent's `state_schema`.
640
        :returns:
641
            A dictionary with the following keys:
642
            - "messages": List of all messages exchanged during the agent's run.
643
            - "last_message": The last message exchanged during the agent's run.
644
            - Any additional keys defined in the `state_schema`.
645
        :raises RuntimeError: If the Agent component wasn't warmed up before calling `run_async()`.
646
        :raises BreakpointException: If an agent breakpoint is triggered.
647
        """
648
        # We pop parent_snapshot from kwargs to avoid passing it into State.
649
        parent_snapshot = kwargs.pop("parent_snapshot", None)
1✔
650
        agent_inputs = {
1✔
651
            "messages": messages,
652
            "streaming_callback": streaming_callback,
653
            "break_point": break_point,
654
            "snapshot": snapshot,
655
            **kwargs,
656
        }
657
        self._runtime_checks(break_point=break_point, snapshot=snapshot)
1✔
658

659
        if snapshot:
1✔
660
            exe_context = self._initialize_from_snapshot(
1✔
661
                snapshot=snapshot, streaming_callback=streaming_callback, requires_async=True, tools=tools
662
            )
663
        else:
664
            exe_context = self._initialize_fresh_execution(
1✔
665
                messages=messages,
666
                streaming_callback=streaming_callback,
667
                requires_async=True,
668
                system_prompt=system_prompt,
669
                tools=tools,
670
                **kwargs,
671
            )
672

673
        with self._create_agent_span() as span:
1✔
674
            span.set_content_tag("haystack.agent.input", _deepcopy_with_exceptions(agent_inputs))
1✔
675

676
            while exe_context.counter < self.max_agent_steps:
1✔
677
                # Handle breakpoint and ChatGenerator call
678
                self._check_chat_generator_breakpoint(
1✔
679
                    execution_context=exe_context, break_point=break_point, parent_snapshot=parent_snapshot
680
                )
681
                # We skip the chat generator when restarting from a snapshot from a ToolBreakpoint
682
                if exe_context.skip_chat_generator:
1✔
683
                    llm_messages = exe_context.state.get("messages", [])[-1:]
1✔
684
                    # Set to False so the next iteration will call the chat generator
685
                    exe_context.skip_chat_generator = False
1✔
686
                else:
687
                    result = await AsyncPipeline._run_component_async(
1✔
688
                        component_name="chat_generator",
689
                        component={"instance": self.chat_generator},
690
                        component_inputs={
691
                            "messages": exe_context.state.data["messages"],
692
                            **exe_context.chat_generator_inputs,
693
                        },
694
                        component_visits=exe_context.component_visits,
695
                        parent_span=span,
696
                    )
697
                    llm_messages = result["replies"]
1✔
698
                    exe_context.state.set("messages", llm_messages)
1✔
699

700
                # Check if any of the LLM responses contain a tool call or if the LLM is not using tools
701
                if not any(msg.tool_call for msg in llm_messages) or self._tool_invoker is None:
1✔
702
                    exe_context.counter += 1
1✔
703
                    break
1✔
704

705
                # Handle breakpoint and ToolInvoker call
706
                self._check_tool_invoker_breakpoint(
1✔
707
                    execution_context=exe_context, break_point=break_point, parent_snapshot=parent_snapshot
708
                )
709
                # We only send the messages from the LLM to the tool invoker
710
                tool_invoker_result = await AsyncPipeline._run_component_async(
1✔
711
                    component_name="tool_invoker",
712
                    component={"instance": self._tool_invoker},
713
                    component_inputs={
714
                        "messages": llm_messages,
715
                        "state": exe_context.state,
716
                        **exe_context.tool_invoker_inputs,
717
                    },
718
                    component_visits=exe_context.component_visits,
719
                    parent_span=span,
720
                )
721
                tool_messages = tool_invoker_result["tool_messages"]
1✔
722
                exe_context.state = tool_invoker_result["state"]
1✔
723
                exe_context.state.set("messages", tool_messages)
1✔
724

725
                # Check if any LLM message's tool call name matches an exit condition
726
                if self.exit_conditions != ["text"] and self._check_exit_conditions(llm_messages, tool_messages):
1✔
727
                    exe_context.counter += 1
×
728
                    break
×
729

730
                # Increment the step counter
731
                exe_context.counter += 1
1✔
732

733
            if exe_context.counter >= self.max_agent_steps:
1✔
734
                logger.warning(
×
735
                    "Agent reached maximum agent steps of {max_agent_steps}, stopping.",
736
                    max_agent_steps=self.max_agent_steps,
737
                )
738
            span.set_content_tag("haystack.agent.output", exe_context.state.data)
1✔
739
            span.set_tag("haystack.agent.steps_taken", exe_context.counter)
1✔
740

741
        result = {**exe_context.state.data}
1✔
742
        if msgs := result.get("messages"):
1✔
743
            result["last_message"] = msgs[-1]
1✔
744
        return result
1✔
745

746
    def _check_exit_conditions(self, llm_messages: list[ChatMessage], tool_messages: list[ChatMessage]) -> bool:
1✔
747
        """
748
        Check if any of the LLM messages' tool calls match an exit condition and if there are no errors.
749

750
        :param llm_messages: List of messages from the LLM
751
        :param tool_messages: List of messages from the tool invoker
752
        :return: True if an exit condition is met and there are no errors, False otherwise
753
        """
754
        matched_exit_conditions = set()
1✔
755
        has_errors = False
1✔
756

757
        for msg in llm_messages:
1✔
758
            if msg.tool_call and msg.tool_call.tool_name in self.exit_conditions:
1✔
759
                matched_exit_conditions.add(msg.tool_call.tool_name)
1✔
760

761
                # Check if any error is specifically from the tool matching the exit condition
762
                tool_errors = [
1✔
763
                    tool_msg.tool_call_result.error
764
                    for tool_msg in tool_messages
765
                    if tool_msg.tool_call_result is not None
766
                    and tool_msg.tool_call_result.origin.tool_name == msg.tool_call.tool_name
767
                ]
768
                if any(tool_errors):
1✔
769
                    has_errors = True
×
770
                    # No need to check further if we found an error
771
                    break
×
772

773
        # Only return True if at least one exit condition was matched AND none had errors
774
        return bool(matched_exit_conditions) and not has_errors
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc