• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 18969835544

31 Oct 2025 10:29AM UTC coverage: 92.244% (+0.009%) from 92.235%
18969835544

Pull #9967

github

web-flow
Merge db5019a28 into 244dd824c
Pull Request #9967: feat: Enable first snippet test runs in new documentation

13498 of 14633 relevant lines covered (92.24%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.48
haystack/components/agents/agent.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import inspect
1✔
6
from dataclasses import dataclass
1✔
7
from typing import Any, Optional, Union, cast
1✔
8

9
from haystack import logging, tracing
1✔
10
from haystack.components.generators.chat.types import ChatGenerator
1✔
11
from haystack.components.tools import ToolInvoker
1✔
12
from haystack.core.component.component import component
1✔
13
from haystack.core.errors import PipelineRuntimeError
1✔
14
from haystack.core.pipeline.async_pipeline import AsyncPipeline
1✔
15
from haystack.core.pipeline.breakpoint import (
1✔
16
    _create_pipeline_snapshot_from_chat_generator,
17
    _create_pipeline_snapshot_from_tool_invoker,
18
    _trigger_chat_generator_breakpoint,
19
    _trigger_tool_invoker_breakpoint,
20
    _validate_tool_breakpoint_is_valid,
21
)
22
from haystack.core.pipeline.pipeline import Pipeline
1✔
23
from haystack.core.pipeline.utils import _deepcopy_with_exceptions
1✔
24
from haystack.core.serialization import component_to_dict, default_from_dict, default_to_dict
1✔
25
from haystack.dataclasses import ChatMessage, ChatRole
1✔
26
from haystack.dataclasses.breakpoints import AgentBreakpoint, AgentSnapshot, PipelineSnapshot, ToolBreakpoint
1✔
27
from haystack.dataclasses.streaming_chunk import StreamingCallbackT, select_streaming_callback
1✔
28
from haystack.tools import (
1✔
29
    Tool,
30
    Toolset,
31
    ToolsType,
32
    deserialize_tools_or_toolset_inplace,
33
    flatten_tools_or_toolsets,
34
    serialize_tools_or_toolset,
35
)
36
from haystack.utils import _deserialize_value_with_schema
1✔
37
from haystack.utils.callable_serialization import deserialize_callable, serialize_callable
1✔
38
from haystack.utils.deserialization import deserialize_chatgenerator_inplace
1✔
39

40
from .state.state import State, _schema_from_dict, _schema_to_dict, _validate_schema
1✔
41
from .state.state_utils import merge_lists
1✔
42

43
logger = logging.getLogger(__name__)
1✔
44

45

46
@dataclass
1✔
47
class _ExecutionContext:
1✔
48
    """
49
    Context for executing the agent.
50

51
    :param state: The current state of the agent, including messages and any additional data.
52
    :param component_visits: A dictionary tracking how many times each component has been visited.
53
    :param chat_generator_inputs: Runtime inputs to be passed to the chat generator.
54
    :param tool_invoker_inputs: Runtime inputs to be passed to the tool invoker.
55
    :param counter: A counter to track the number of steps taken in the agent's run.
56
    :param skip_chat_generator: A flag to indicate whether to skip the chat generator in the next iteration.
57
        This is useful when resuming from a ToolBreakpoint where the ToolInvoker needs to be called first.
58
    """
59

60
    state: State
1✔
61
    component_visits: dict
1✔
62
    chat_generator_inputs: dict
1✔
63
    tool_invoker_inputs: dict
1✔
64
    counter: int = 0
1✔
65
    skip_chat_generator: bool = False
1✔
66

67

68
@component
1✔
69
class Agent:
1✔
70
    """
71
    A Haystack component that implements a tool-using agent with provider-agnostic chat model support.
72

73
    The component processes messages and executes tools until an exit condition is met.
74
    The exit condition can be triggered either by a direct text response or by invoking a specific designated tool.
75
    Multiple exit conditions can be specified.
76

77
    When you call an Agent without tools, it acts as a ChatGenerator, produces one response, then exits.
78

79
    ### Usage example
80
    ```python
81
    from haystack.components.agents import Agent
82
    from haystack.components.generators.chat import OpenAIChatGenerator
83
    from haystack.dataclasses import ChatMessage
84
    from haystack.tools import Tool
85
    from haystack.components.tools import ToolInvoker
86

87
    # These would need to be your proper functions, here we use simple examples
88
    def calculator(a: int, b: int) -> int:
89
        '''Add two numbers.'''
90
        return a + b
91

92
    def search(query: str) -> str:
93
        '''Search for information.'''
94
        return f"Results for: {query}"
95

96
    # Create tools with proper JSON Schema
97
    tools = [
98
        Tool(
99
            name="calculator",
100
            description="Adds two numbers",
101
            parameters={
102
                "type": "object",
103
                "properties": {
104
                    "a": {"type": "integer", "description": "First number"},
105
                    "b": {"type": "integer", "description": "Second number"}
106
                },
107
                "required": ["a", "b"]
108
            },
109
            function=calculator
110
        ),
111
        Tool(
112
            name="search",
113
            description="Searches for information",
114
            parameters={
115
                "type": "object",
116
                "properties": {
117
                    "query": {"type": "string", "description": "Search query"}
118
                },
119
                "required": ["query"]
120
            },
121
            function=search
122
        )
123
    ]
124

125
    agent = Agent(
126
        chat_generator=OpenAIChatGenerator(),
127
        tools=tools,
128
        exit_conditions=["search"],
129
    )
130

131
    # Run the agent
132
    agent.warm_up()
133
    result = agent.run(
134
        messages=[ChatMessage.from_user("Find information about Haystack")]
135
    )
136

137
    assert "messages" in result  # Contains conversation history
138
    ```
139
    """
140

141
    def __init__(
1✔
142
        self,
143
        *,
144
        chat_generator: ChatGenerator,
145
        tools: Optional[ToolsType] = None,
146
        system_prompt: Optional[str] = None,
147
        exit_conditions: Optional[list[str]] = None,
148
        state_schema: Optional[dict[str, Any]] = None,
149
        max_agent_steps: int = 100,
150
        streaming_callback: Optional[StreamingCallbackT] = None,
151
        raise_on_tool_invocation_failure: bool = False,
152
        tool_invoker_kwargs: Optional[dict[str, Any]] = None,
153
    ) -> None:
154
        """
155
        Initialize the agent component.
156

157
        :param chat_generator: An instance of the chat generator that your agent should use. It must support tools.
158
        :param tools: A list of Tool and/or Toolset objects, or a single Toolset that the agent can use.
159
        :param system_prompt: System prompt for the agent.
160
        :param exit_conditions: List of conditions that will cause the agent to return.
161
            Can include "text" if the agent should return when it generates a message without tool calls,
162
            or tool names that will cause the agent to return once the tool was executed. Defaults to ["text"].
163
        :param state_schema: The schema for the runtime state used by the tools.
164
        :param max_agent_steps: Maximum number of steps the agent will run before stopping. Defaults to 100.
165
            If the agent exceeds this number of steps, it will stop and return the current state.
166
        :param streaming_callback: A callback that will be invoked when a response is streamed from the LLM.
167
            The same callback can be configured to emit tool results when a tool is called.
168
        :param raise_on_tool_invocation_failure: Should the agent raise an exception when a tool invocation fails?
169
            If set to False, the exception will be turned into a chat message and passed to the LLM.
170
        :param tool_invoker_kwargs: Additional keyword arguments to pass to the ToolInvoker.
171
        :raises TypeError: If the chat_generator does not support tools parameter in its run method.
172
        :raises ValueError: If the exit_conditions are not valid.
173
        """
174
        # Check if chat_generator supports tools parameter
175
        chat_generator_run_method = inspect.signature(chat_generator.run)
1✔
176
        if "tools" not in chat_generator_run_method.parameters:
1✔
177
            raise TypeError(
1✔
178
                f"{type(chat_generator).__name__} does not accept tools parameter in its run method. "
179
                "The Agent component requires a chat generator that supports tools."
180
            )
181

182
        valid_exits = ["text"] + [tool.name for tool in flatten_tools_or_toolsets(tools)]
1✔
183
        if exit_conditions is None:
1✔
184
            exit_conditions = ["text"]
1✔
185
        if not all(condition in valid_exits for condition in exit_conditions):
1✔
186
            raise ValueError(
1✔
187
                f"Invalid exit conditions provided: {exit_conditions}. "
188
                f"Valid exit conditions must be a subset of {valid_exits}. "
189
                "Ensure that each exit condition corresponds to either 'text' or a valid tool name."
190
            )
191

192
        # Validate state schema if provided
193
        if state_schema is not None:
1✔
194
            _validate_schema(state_schema)
1✔
195
        self._state_schema = state_schema or {}
1✔
196

197
        # Initialize state schema
198
        resolved_state_schema = _deepcopy_with_exceptions(self._state_schema)
1✔
199
        if resolved_state_schema.get("messages") is None:
1✔
200
            resolved_state_schema["messages"] = {"type": list[ChatMessage], "handler": merge_lists}
1✔
201
        self.state_schema = resolved_state_schema
1✔
202

203
        self.chat_generator = chat_generator
1✔
204
        self.tools = tools or []
1✔
205
        self.system_prompt = system_prompt
1✔
206
        self.exit_conditions = exit_conditions
1✔
207
        self.max_agent_steps = max_agent_steps
1✔
208
        self.raise_on_tool_invocation_failure = raise_on_tool_invocation_failure
1✔
209
        self.streaming_callback = streaming_callback
1✔
210

211
        output_types = {"last_message": ChatMessage}
1✔
212
        for param, config in self.state_schema.items():
1✔
213
            output_types[param] = config["type"]
1✔
214
            # Skip setting input types for parameters that are already in the run method
215
            if param in ["messages", "streaming_callback"]:
1✔
216
                continue
1✔
217
            component.set_input_type(self, name=param, type=config["type"], default=None)
1✔
218
        component.set_output_types(self, **output_types)
1✔
219

220
        self.tool_invoker_kwargs = tool_invoker_kwargs
1✔
221
        self._tool_invoker = None
1✔
222
        if self.tools:
1✔
223
            resolved_tool_invoker_kwargs = {
1✔
224
                "tools": self.tools,
225
                "raise_on_failure": self.raise_on_tool_invocation_failure,
226
                **(tool_invoker_kwargs or {}),
227
            }
228
            self._tool_invoker = ToolInvoker(**resolved_tool_invoker_kwargs)
1✔
229
        else:
230
            logger.warning(
1✔
231
                "No tools provided to the Agent. The Agent will behave like a ChatGenerator and only return text "
232
                "responses. To enable tool usage, pass tools directly to the Agent, not to the chat_generator."
233
            )
234

235
        self._is_warmed_up = False
1✔
236

237
    def warm_up(self) -> None:
1✔
238
        """
239
        Warm up the Agent.
240
        """
241
        if not self._is_warmed_up:
1✔
242
            if hasattr(self.chat_generator, "warm_up"):
1✔
243
                self.chat_generator.warm_up()
1✔
244
            if hasattr(self._tool_invoker, "warm_up") and self._tool_invoker is not None:
1✔
245
                self._tool_invoker.warm_up()
1✔
246
            self._is_warmed_up = True
1✔
247

248
    def to_dict(self) -> dict[str, Any]:
1✔
249
        """
250
        Serialize the component to a dictionary.
251

252
        :return: Dictionary with serialized data
253
        """
254
        return default_to_dict(
1✔
255
            self,
256
            chat_generator=component_to_dict(obj=self.chat_generator, name="chat_generator"),
257
            tools=serialize_tools_or_toolset(self.tools),
258
            system_prompt=self.system_prompt,
259
            exit_conditions=self.exit_conditions,
260
            # We serialize the original state schema, not the resolved one to reflect the original user input
261
            state_schema=_schema_to_dict(self._state_schema),
262
            max_agent_steps=self.max_agent_steps,
263
            streaming_callback=serialize_callable(self.streaming_callback) if self.streaming_callback else None,
264
            raise_on_tool_invocation_failure=self.raise_on_tool_invocation_failure,
265
            tool_invoker_kwargs=self.tool_invoker_kwargs,
266
        )
267

268
    @classmethod
1✔
269
    def from_dict(cls, data: dict[str, Any]) -> "Agent":
1✔
270
        """
271
        Deserialize the agent from a dictionary.
272

273
        :param data: Dictionary to deserialize from
274
        :return: Deserialized agent
275
        """
276
        init_params = data.get("init_parameters", {})
1✔
277

278
        deserialize_chatgenerator_inplace(init_params, key="chat_generator")
1✔
279

280
        if init_params.get("state_schema") is not None:
1✔
281
            init_params["state_schema"] = _schema_from_dict(init_params["state_schema"])
1✔
282

283
        if init_params.get("streaming_callback") is not None:
1✔
284
            init_params["streaming_callback"] = deserialize_callable(init_params["streaming_callback"])
1✔
285

286
        deserialize_tools_or_toolset_inplace(init_params, key="tools")
1✔
287

288
        return default_from_dict(cls, data)
1✔
289

290
    def _create_agent_span(self) -> Any:
1✔
291
        """Create a span for the agent run."""
292
        return tracing.tracer.trace(
1✔
293
            "haystack.agent.run",
294
            tags={
295
                "haystack.agent.max_steps": self.max_agent_steps,
296
                "haystack.agent.tools": self.tools,
297
                "haystack.agent.exit_conditions": self.exit_conditions,
298
                "haystack.agent.state_schema": _schema_to_dict(self.state_schema),
299
            },
300
        )
301

302
    def _initialize_fresh_execution(
1✔
303
        self,
304
        messages: list[ChatMessage],
305
        streaming_callback: Optional[StreamingCallbackT],
306
        requires_async: bool,
307
        *,
308
        system_prompt: Optional[str] = None,
309
        generation_kwargs: Optional[dict[str, Any]] = None,
310
        tools: Optional[Union[ToolsType, list[str]]] = None,
311
        **kwargs,
312
    ) -> _ExecutionContext:
313
        """
314
        Initialize execution context for a fresh run of the agent.
315

316
        :param messages: List of ChatMessage objects to start the agent with.
317
        :param streaming_callback: Optional callback for streaming responses.
318
        :param requires_async: Whether the agent run requires asynchronous execution.
319
        :param system_prompt: System prompt for the agent. If provided, it overrides the default system prompt.
320
        :param generation_kwargs: Additional keyword arguments for chat generator. These parameters will
321
            override the parameters passed during component initialization.
322
        :param tools: Optional list of Tool objects, a Toolset, or list of tool names to use for this run.
323
            When passing tool names, tools are selected from the Agent's originally configured tools.
324
        :param kwargs: Additional data to pass to the State used by the Agent.
325
        """
326
        system_prompt = system_prompt or self.system_prompt
1✔
327
        if system_prompt is not None:
1✔
328
            messages = [ChatMessage.from_system(system_prompt)] + messages
1✔
329

330
        if all(m.is_from(ChatRole.SYSTEM) for m in messages):
1✔
331
            logger.warning("All messages provided to the Agent component are system messages. This is not recommended.")
1✔
332

333
        state = State(schema=self.state_schema, data=kwargs)
1✔
334
        state.set("messages", messages)
1✔
335

336
        streaming_callback = select_streaming_callback(  # type: ignore[call-overload]
1✔
337
            init_callback=self.streaming_callback, runtime_callback=streaming_callback, requires_async=requires_async
338
        )
339

340
        selected_tools = self._select_tools(tools)
1✔
341
        tool_invoker_inputs: dict[str, Any] = {"tools": selected_tools}
1✔
342
        generator_inputs: dict[str, Any] = {"tools": selected_tools}
1✔
343
        if streaming_callback is not None:
1✔
344
            tool_invoker_inputs["streaming_callback"] = streaming_callback
1✔
345
            generator_inputs["streaming_callback"] = streaming_callback
1✔
346
        if generation_kwargs is not None:
1✔
347
            generator_inputs["generation_kwargs"] = generation_kwargs
1✔
348

349
        return _ExecutionContext(
1✔
350
            state=state,
351
            component_visits=dict.fromkeys(["chat_generator", "tool_invoker"], 0),
352
            chat_generator_inputs=generator_inputs,
353
            tool_invoker_inputs=tool_invoker_inputs,
354
        )
355

356
    def _select_tools(self, tools: Optional[Union[ToolsType, list[str]]] = None) -> ToolsType:
1✔
357
        """
358
        Select tools for the current run based on the provided tools parameter.
359

360
        :param tools: Optional list of Tool objects, a Toolset, or list of tool names to use for this run.
361
            When passing tool names, tools are selected from the Agent's originally configured tools.
362
        :returns: Selected tools for the current run.
363
        :raises ValueError: If tool names are provided but no tools were configured at initialization,
364
            or if any provided tool name is not valid.
365
        :raises TypeError: If tools is not a list of Tool objects, a Toolset, or a list of tool names (strings).
366
        """
367
        if tools is None:
1✔
368
            return self.tools
1✔
369

370
        if isinstance(tools, list) and all(isinstance(t, str) for t in tools):
1✔
371
            if not self.tools:
1✔
372
                raise ValueError("No tools were configured for the Agent at initialization.")
1✔
373
            available_tools = flatten_tools_or_toolsets(self.tools)
1✔
374
            selected_tool_names = cast(list[str], tools)  # mypy thinks this could still be list[Tool] or Toolset
1✔
375
            valid_tool_names = {tool.name for tool in available_tools}
1✔
376
            invalid_tool_names = {name for name in selected_tool_names if name not in valid_tool_names}
1✔
377
            if invalid_tool_names:
1✔
378
                raise ValueError(
1✔
379
                    f"The following tool names are not valid: {invalid_tool_names}. "
380
                    f"Valid tool names are: {valid_tool_names}."
381
                )
382
            return [tool for tool in available_tools if tool.name in selected_tool_names]
1✔
383

384
        if isinstance(tools, Toolset):
1✔
385
            return tools
×
386

387
        if isinstance(tools, list):
1✔
388
            return cast(list[Union[Tool, Toolset]], tools)  # mypy can't narrow the Union type from isinstance check
1✔
389

390
        raise TypeError(
1✔
391
            "tools must be a list of Tool and/or Toolset objects, a Toolset, or a list of tool names (strings)."
392
        )
393

394
    def _initialize_from_snapshot(
1✔
395
        self,
396
        snapshot: AgentSnapshot,
397
        streaming_callback: Optional[StreamingCallbackT],
398
        requires_async: bool,
399
        *,
400
        generation_kwargs: Optional[dict[str, Any]] = None,
401
        tools: Optional[Union[ToolsType, list[str]]] = None,
402
    ) -> _ExecutionContext:
403
        """
404
        Initialize execution context from an AgentSnapshot.
405

406
        :param snapshot: An AgentSnapshot containing the state of a previously saved agent execution.
407
        :param streaming_callback: Optional callback for streaming responses.
408
        :param requires_async: Whether the agent run requires asynchronous execution.
409
        :param generation_kwargs: Additional keyword arguments for chat generator. These parameters will
410
            override the parameters passed during component initialization.
411
        :param tools: Optional list of Tool objects, a Toolset, or list of tool names to use for this run.
412
            When passing tool names, tools are selected from the Agent's originally configured tools.
413
        """
414
        component_visits = snapshot.component_visits
1✔
415
        current_inputs = {
1✔
416
            "chat_generator": _deserialize_value_with_schema(snapshot.component_inputs["chat_generator"]),
417
            "tool_invoker": _deserialize_value_with_schema(snapshot.component_inputs["tool_invoker"]),
418
        }
419

420
        state_data = current_inputs["tool_invoker"]["state"].data
1✔
421
        state = State(schema=self.state_schema, data=state_data)
1✔
422

423
        skip_chat_generator = isinstance(snapshot.break_point.break_point, ToolBreakpoint)
1✔
424
        streaming_callback = current_inputs["chat_generator"].get("streaming_callback", streaming_callback)
1✔
425
        streaming_callback = select_streaming_callback(  # type: ignore[call-overload]
1✔
426
            init_callback=self.streaming_callback, runtime_callback=streaming_callback, requires_async=requires_async
427
        )
428

429
        selected_tools = self._select_tools(tools)
1✔
430
        tool_invoker_inputs: dict[str, Any] = {"tools": selected_tools}
1✔
431
        generator_inputs: dict[str, Any] = {"tools": selected_tools}
1✔
432
        if streaming_callback is not None:
1✔
433
            tool_invoker_inputs["streaming_callback"] = streaming_callback
×
434
            generator_inputs["streaming_callback"] = streaming_callback
×
435
        if generation_kwargs is not None:
1✔
436
            generator_inputs["generation_kwargs"] = generation_kwargs
×
437

438
        return _ExecutionContext(
1✔
439
            state=state,
440
            component_visits=component_visits,
441
            chat_generator_inputs=generator_inputs,
442
            tool_invoker_inputs=tool_invoker_inputs,
443
            counter=snapshot.break_point.break_point.visit_count,
444
            skip_chat_generator=skip_chat_generator,
445
        )
446

447
    def _runtime_checks(self, break_point: Optional[AgentBreakpoint]) -> None:
1✔
448
        """
449
        Perform runtime checks before running the agent.
450

451
        :param break_point: An AgentBreakpoint, can be a Breakpoint for the "chat_generator" or a ToolBreakpoint
452
            for "tool_invoker".
453
        :raises ValueError: If the break_point is invalid.
454
        """
455
        if not self._is_warmed_up:
1✔
456
            self.warm_up()
1✔
457

458
        if break_point and isinstance(break_point.break_point, ToolBreakpoint):
1✔
459
            _validate_tool_breakpoint_is_valid(agent_breakpoint=break_point, tools=self.tools)
1✔
460

461
    @staticmethod
1✔
462
    def _check_chat_generator_breakpoint(
1✔
463
        execution_context: _ExecutionContext,
464
        break_point: Optional[AgentBreakpoint],
465
        parent_snapshot: Optional[PipelineSnapshot],
466
    ) -> None:
467
        """
468
        Check if the chat generator breakpoint should be triggered.
469

470
        If the breakpoint should be triggered, create an agent snapshot and trigger the chat generator breakpoint.
471

472
        :param execution_context: The current execution context of the agent.
473
        :param break_point: An AgentBreakpoint, can be a Breakpoint for the "chat_generator" or a ToolBreakpoint
474
            for "tool_invoker".
475
        :param parent_snapshot: An optional parent snapshot for the agent execution.
476
        """
477
        if (
1✔
478
            break_point
479
            and break_point.break_point.component_name == "chat_generator"
480
            and execution_context.component_visits["chat_generator"] == break_point.break_point.visit_count
481
        ):
482
            pipeline_snapshot = _create_pipeline_snapshot_from_chat_generator(
1✔
483
                execution_context=execution_context, break_point=break_point, parent_snapshot=parent_snapshot
484
            )
485
            _trigger_chat_generator_breakpoint(pipeline_snapshot=pipeline_snapshot)
1✔
486

487
    @staticmethod
1✔
488
    def _check_tool_invoker_breakpoint(
1✔
489
        execution_context: _ExecutionContext,
490
        break_point: Optional[AgentBreakpoint],
491
        parent_snapshot: Optional[PipelineSnapshot],
492
    ) -> None:
493
        """
494
        Check if the tool invoker breakpoint should be triggered.
495

496
        If the breakpoint should be triggered, create an agent snapshot and trigger the tool invoker breakpoint.
497

498
        :param execution_context: The current execution context of the agent.
499
        :param break_point: An AgentBreakpoint, can be a Breakpoint for the "chat_generator" or a ToolBreakpoint
500
            for "tool_invoker".
501
        :param parent_snapshot: An optional parent snapshot for the agent execution.
502
        """
503
        if (
1✔
504
            break_point
505
            and break_point.break_point.component_name == "tool_invoker"
506
            and break_point.break_point.visit_count == execution_context.component_visits["tool_invoker"]
507
        ):
508
            pipeline_snapshot = _create_pipeline_snapshot_from_tool_invoker(
1✔
509
                execution_context=execution_context, break_point=break_point, parent_snapshot=parent_snapshot
510
            )
511
            _trigger_tool_invoker_breakpoint(
1✔
512
                llm_messages=execution_context.state.data["messages"][-1:], pipeline_snapshot=pipeline_snapshot
513
            )
514

515
    def run(  # noqa: PLR0915
1✔
516
        self,
517
        messages: list[ChatMessage],
518
        streaming_callback: Optional[StreamingCallbackT] = None,
519
        *,
520
        generation_kwargs: Optional[dict[str, Any]] = None,
521
        break_point: Optional[AgentBreakpoint] = None,
522
        snapshot: Optional[AgentSnapshot] = None,
523
        system_prompt: Optional[str] = None,
524
        tools: Optional[Union[ToolsType, list[str]]] = None,
525
        **kwargs: Any,
526
    ) -> dict[str, Any]:
527
        """
528
        Process messages and execute tools until an exit condition is met.
529

530
        :param messages: List of Haystack ChatMessage objects to process.
531
        :param streaming_callback: A callback that will be invoked when a response is streamed from the LLM.
532
            The same callback can be configured to emit tool results when a tool is called.
533
        :param generation_kwargs: Additional keyword arguments for LLM. These parameters will
534
            override the parameters passed during component initialization.
535
        :param break_point: An AgentBreakpoint, can be a Breakpoint for the "chat_generator" or a ToolBreakpoint
536
            for "tool_invoker".
537
        :param snapshot: A dictionary containing a snapshot of a previously saved agent execution. The snapshot contains
538
            the relevant information to restart the Agent execution from where it left off.
539
        :param system_prompt: System prompt for the agent. If provided, it overrides the default system prompt.
540
        :param tools: Optional list of Tool objects, a Toolset, or list of tool names to use for this run.
541
            When passing tool names, tools are selected from the Agent's originally configured tools.
542
        :param kwargs: Additional data to pass to the State schema used by the Agent.
543
            The keys must match the schema defined in the Agent's `state_schema`.
544
        :returns:
545
            A dictionary with the following keys:
546
            - "messages": List of all messages exchanged during the agent's run.
547
            - "last_message": The last message exchanged during the agent's run.
548
            - Any additional keys defined in the `state_schema`.
549
        :raises RuntimeError: If the Agent component wasn't warmed up before calling `run()`.
550
        :raises BreakpointException: If an agent breakpoint is triggered.
551
        """
552
        # We pop parent_snapshot from kwargs to avoid passing it into State.
553
        parent_snapshot = kwargs.pop("parent_snapshot", None)
1✔
554
        agent_inputs = {
1✔
555
            "messages": messages,
556
            "streaming_callback": streaming_callback,
557
            "break_point": break_point,
558
            "snapshot": snapshot,
559
            **kwargs,
560
        }
561
        self._runtime_checks(break_point=break_point)
1✔
562

563
        if snapshot:
1✔
564
            exe_context = self._initialize_from_snapshot(
1✔
565
                snapshot=snapshot,
566
                streaming_callback=streaming_callback,
567
                requires_async=False,
568
                tools=tools,
569
                generation_kwargs=generation_kwargs,
570
            )
571
        else:
572
            exe_context = self._initialize_fresh_execution(
1✔
573
                messages=messages,
574
                streaming_callback=streaming_callback,
575
                requires_async=False,
576
                system_prompt=system_prompt,
577
                tools=tools,
578
                generation_kwargs=generation_kwargs,
579
                **kwargs,
580
            )
581

582
        with self._create_agent_span() as span:
1✔
583
            span.set_content_tag("haystack.agent.input", _deepcopy_with_exceptions(agent_inputs))
1✔
584

585
            while exe_context.counter < self.max_agent_steps:
1✔
586
                # Handle breakpoint and ChatGenerator call
587
                Agent._check_chat_generator_breakpoint(
1✔
588
                    execution_context=exe_context, break_point=break_point, parent_snapshot=parent_snapshot
589
                )
590
                # We skip the chat generator when restarting from a snapshot from a ToolBreakpoint
591
                if exe_context.skip_chat_generator:
1✔
592
                    llm_messages = exe_context.state.get("messages", [])[-1:]
1✔
593
                    # Set to False so the next iteration will call the chat generator
594
                    exe_context.skip_chat_generator = False
1✔
595
                else:
596
                    try:
1✔
597
                        result = Pipeline._run_component(
1✔
598
                            component_name="chat_generator",
599
                            component={"instance": self.chat_generator},
600
                            inputs={
601
                                "messages": exe_context.state.data["messages"],
602
                                **exe_context.chat_generator_inputs,
603
                            },
604
                            component_visits=exe_context.component_visits,
605
                            parent_span=span,
606
                        )
607
                    except PipelineRuntimeError as e:
1✔
608
                        pipeline_snapshot = _create_pipeline_snapshot_from_chat_generator(
1✔
609
                            agent_name=getattr(self, "__component_name__", None),
610
                            execution_context=exe_context,
611
                            parent_snapshot=parent_snapshot,
612
                        )
613
                        e.pipeline_snapshot = pipeline_snapshot
1✔
614
                        raise e
1✔
615

616
                    llm_messages = result["replies"]
1✔
617
                    exe_context.state.set("messages", llm_messages)
1✔
618

619
                # Check if any of the LLM responses contain a tool call or if the LLM is not using tools
620
                if not any(msg.tool_call for msg in llm_messages) or self._tool_invoker is None:
1✔
621
                    exe_context.counter += 1
1✔
622
                    break
1✔
623

624
                # Handle breakpoint and ToolInvoker call
625
                Agent._check_tool_invoker_breakpoint(
1✔
626
                    execution_context=exe_context, break_point=break_point, parent_snapshot=parent_snapshot
627
                )
628
                try:
1✔
629
                    # We only send the messages from the LLM to the tool invoker
630
                    tool_invoker_result = Pipeline._run_component(
1✔
631
                        component_name="tool_invoker",
632
                        component={"instance": self._tool_invoker},
633
                        inputs={
634
                            "messages": llm_messages,
635
                            "state": exe_context.state,
636
                            **exe_context.tool_invoker_inputs,
637
                        },
638
                        component_visits=exe_context.component_visits,
639
                        parent_span=span,
640
                    )
641
                except PipelineRuntimeError as e:
1✔
642
                    # Access the original Tool Invoker exception
643
                    original_error = e.__cause__
1✔
644
                    tool_name = getattr(original_error, "tool_name", None)
1✔
645

646
                    pipeline_snapshot = _create_pipeline_snapshot_from_tool_invoker(
1✔
647
                        tool_name=tool_name,
648
                        agent_name=getattr(self, "__component_name__", None),
649
                        execution_context=exe_context,
650
                        parent_snapshot=parent_snapshot,
651
                    )
652
                    e.pipeline_snapshot = pipeline_snapshot
1✔
653
                    raise e
1✔
654

655
                tool_messages = tool_invoker_result["tool_messages"]
1✔
656
                exe_context.state = tool_invoker_result["state"]
1✔
657
                exe_context.state.set("messages", tool_messages)
1✔
658

659
                # Check if any LLM message's tool call name matches an exit condition
660
                if self.exit_conditions != ["text"] and self._check_exit_conditions(llm_messages, tool_messages):
1✔
661
                    exe_context.counter += 1
1✔
662
                    break
1✔
663

664
                # Increment the step counter
665
                exe_context.counter += 1
1✔
666

667
            if exe_context.counter >= self.max_agent_steps:
1✔
668
                logger.warning(
1✔
669
                    "Agent reached maximum agent steps of {max_agent_steps}, stopping.",
670
                    max_agent_steps=self.max_agent_steps,
671
                )
672
            span.set_content_tag("haystack.agent.output", exe_context.state.data)
1✔
673
            span.set_tag("haystack.agent.steps_taken", exe_context.counter)
1✔
674

675
        result = {**exe_context.state.data}
1✔
676
        if msgs := result.get("messages"):
1✔
677
            result["last_message"] = msgs[-1]
1✔
678
        return result
1✔
679

680
    async def run_async(
1✔
681
        self,
682
        messages: list[ChatMessage],
683
        streaming_callback: Optional[StreamingCallbackT] = None,
684
        *,
685
        generation_kwargs: Optional[dict[str, Any]] = None,
686
        break_point: Optional[AgentBreakpoint] = None,
687
        snapshot: Optional[AgentSnapshot] = None,
688
        system_prompt: Optional[str] = None,
689
        tools: Optional[Union[ToolsType, list[str]]] = None,
690
        **kwargs: Any,
691
    ) -> dict[str, Any]:
692
        """
693
        Asynchronously process messages and execute tools until the exit condition is met.
694

695
        This is the asynchronous version of the `run` method. It follows the same logic but uses
696
        asynchronous operations where possible, such as calling the `run_async` method of the ChatGenerator
697
        if available.
698

699
        :param messages: List of Haystack ChatMessage objects to process.
700
        :param streaming_callback: An asynchronous callback that will be invoked when a response is streamed from the
701
            LLM. The same callback can be configured to emit tool results when a tool is called.
702
        :param generation_kwargs: Additional keyword arguments for LLM. These parameters will
703
            override the parameters passed during component initialization.
704
        :param break_point: An AgentBreakpoint, can be a Breakpoint for the "chat_generator" or a ToolBreakpoint
705
            for "tool_invoker".
706
        :param snapshot: A dictionary containing a snapshot of a previously saved agent execution. The snapshot contains
707
            the relevant information to restart the Agent execution from where it left off.
708
        :param system_prompt: System prompt for the agent. If provided, it overrides the default system prompt.
709
        :param tools: Optional list of Tool objects, a Toolset, or list of tool names to use for this run.
710
        :param kwargs: Additional data to pass to the State schema used by the Agent.
711
            The keys must match the schema defined in the Agent's `state_schema`.
712
        :returns:
713
            A dictionary with the following keys:
714
            - "messages": List of all messages exchanged during the agent's run.
715
            - "last_message": The last message exchanged during the agent's run.
716
            - Any additional keys defined in the `state_schema`.
717
        :raises RuntimeError: If the Agent component wasn't warmed up before calling `run_async()`.
718
        :raises BreakpointException: If an agent breakpoint is triggered.
719
        """
720
        # We pop parent_snapshot from kwargs to avoid passing it into State.
721
        parent_snapshot = kwargs.pop("parent_snapshot", None)
1✔
722
        agent_inputs = {
1✔
723
            "messages": messages,
724
            "streaming_callback": streaming_callback,
725
            "break_point": break_point,
726
            "snapshot": snapshot,
727
            **kwargs,
728
        }
729
        self._runtime_checks(break_point=break_point)
1✔
730

731
        if snapshot:
1✔
732
            exe_context = self._initialize_from_snapshot(
1✔
733
                snapshot=snapshot,
734
                streaming_callback=streaming_callback,
735
                requires_async=True,
736
                tools=tools,
737
                generation_kwargs=generation_kwargs,
738
            )
739
        else:
740
            exe_context = self._initialize_fresh_execution(
1✔
741
                messages=messages,
742
                streaming_callback=streaming_callback,
743
                requires_async=True,
744
                system_prompt=system_prompt,
745
                tools=tools,
746
                generation_kwargs=generation_kwargs,
747
                **kwargs,
748
            )
749

750
        with self._create_agent_span() as span:
1✔
751
            span.set_content_tag("haystack.agent.input", _deepcopy_with_exceptions(agent_inputs))
1✔
752

753
            while exe_context.counter < self.max_agent_steps:
1✔
754
                # Handle breakpoint and ChatGenerator call
755
                self._check_chat_generator_breakpoint(
1✔
756
                    execution_context=exe_context, break_point=break_point, parent_snapshot=parent_snapshot
757
                )
758
                # We skip the chat generator when restarting from a snapshot from a ToolBreakpoint
759
                if exe_context.skip_chat_generator:
1✔
760
                    llm_messages = exe_context.state.get("messages", [])[-1:]
1✔
761
                    # Set to False so the next iteration will call the chat generator
762
                    exe_context.skip_chat_generator = False
1✔
763
                else:
764
                    result = await AsyncPipeline._run_component_async(
1✔
765
                        component_name="chat_generator",
766
                        component={"instance": self.chat_generator},
767
                        component_inputs={
768
                            "messages": exe_context.state.data["messages"],
769
                            **exe_context.chat_generator_inputs,
770
                        },
771
                        component_visits=exe_context.component_visits,
772
                        parent_span=span,
773
                    )
774
                    llm_messages = result["replies"]
1✔
775
                    exe_context.state.set("messages", llm_messages)
1✔
776

777
                # Check if any of the LLM responses contain a tool call or if the LLM is not using tools
778
                if not any(msg.tool_call for msg in llm_messages) or self._tool_invoker is None:
1✔
779
                    exe_context.counter += 1
1✔
780
                    break
1✔
781

782
                # Handle breakpoint and ToolInvoker call
783
                self._check_tool_invoker_breakpoint(
1✔
784
                    execution_context=exe_context, break_point=break_point, parent_snapshot=parent_snapshot
785
                )
786
                # We only send the messages from the LLM to the tool invoker
787
                tool_invoker_result = await AsyncPipeline._run_component_async(
1✔
788
                    component_name="tool_invoker",
789
                    component={"instance": self._tool_invoker},
790
                    component_inputs={
791
                        "messages": llm_messages,
792
                        "state": exe_context.state,
793
                        **exe_context.tool_invoker_inputs,
794
                    },
795
                    component_visits=exe_context.component_visits,
796
                    parent_span=span,
797
                )
798
                tool_messages = tool_invoker_result["tool_messages"]
1✔
799
                exe_context.state = tool_invoker_result["state"]
1✔
800
                exe_context.state.set("messages", tool_messages)
1✔
801

802
                # Check if any LLM message's tool call name matches an exit condition
803
                if self.exit_conditions != ["text"] and self._check_exit_conditions(llm_messages, tool_messages):
1✔
804
                    exe_context.counter += 1
×
805
                    break
×
806

807
                # Increment the step counter
808
                exe_context.counter += 1
1✔
809

810
            if exe_context.counter >= self.max_agent_steps:
1✔
811
                logger.warning(
×
812
                    "Agent reached maximum agent steps of {max_agent_steps}, stopping.",
813
                    max_agent_steps=self.max_agent_steps,
814
                )
815
            span.set_content_tag("haystack.agent.output", exe_context.state.data)
1✔
816
            span.set_tag("haystack.agent.steps_taken", exe_context.counter)
1✔
817

818
        result = {**exe_context.state.data}
1✔
819
        if msgs := result.get("messages"):
1✔
820
            result["last_message"] = msgs[-1]
1✔
821
        return result
1✔
822

823
    def _check_exit_conditions(self, llm_messages: list[ChatMessage], tool_messages: list[ChatMessage]) -> bool:
1✔
824
        """
825
        Check if any of the LLM messages' tool calls match an exit condition and if there are no errors.
826

827
        :param llm_messages: List of messages from the LLM
828
        :param tool_messages: List of messages from the tool invoker
829
        :return: True if an exit condition is met and there are no errors, False otherwise
830
        """
831
        matched_exit_conditions = set()
1✔
832
        has_errors = False
1✔
833

834
        for msg in llm_messages:
1✔
835
            if msg.tool_call and msg.tool_call.tool_name in self.exit_conditions:
1✔
836
                matched_exit_conditions.add(msg.tool_call.tool_name)
1✔
837

838
                # Check if any error is specifically from the tool matching the exit condition
839
                tool_errors = [
1✔
840
                    tool_msg.tool_call_result.error
841
                    for tool_msg in tool_messages
842
                    if tool_msg.tool_call_result is not None
843
                    and tool_msg.tool_call_result.origin.tool_name == msg.tool_call.tool_name
844
                ]
845
                if any(tool_errors):
1✔
846
                    has_errors = True
×
847
                    # No need to check further if we found an error
848
                    break
×
849

850
        # Only return True if at least one exit condition was matched AND none had errors
851
        return bool(matched_exit_conditions) and not has_errors
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc