• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 19327416769

13 Nov 2025 09:50AM UTC coverage: 91.42% (-0.02%) from 91.44%
19327416769

Pull #10071

github

web-flow
Merge fd87dfd63 into 5aeec2aa4
Pull Request #10071: Fix: Update Agent's docstrings

13713 of 15000 relevant lines covered (91.42%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.48
haystack/components/agents/agent.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import inspect
1✔
6
from dataclasses import dataclass
1✔
7
from typing import Any, Optional, Union, cast
1✔
8

9
from haystack import logging, tracing
1✔
10
from haystack.components.generators.chat.types import ChatGenerator
1✔
11
from haystack.components.tools import ToolInvoker
1✔
12
from haystack.core.component.component import component
1✔
13
from haystack.core.errors import PipelineRuntimeError
1✔
14
from haystack.core.pipeline.async_pipeline import AsyncPipeline
1✔
15
from haystack.core.pipeline.breakpoint import (
1✔
16
    _create_pipeline_snapshot_from_chat_generator,
17
    _create_pipeline_snapshot_from_tool_invoker,
18
    _trigger_chat_generator_breakpoint,
19
    _trigger_tool_invoker_breakpoint,
20
    _validate_tool_breakpoint_is_valid,
21
)
22
from haystack.core.pipeline.pipeline import Pipeline
1✔
23
from haystack.core.pipeline.utils import _deepcopy_with_exceptions
1✔
24
from haystack.core.serialization import component_to_dict, default_from_dict, default_to_dict
1✔
25
from haystack.dataclasses import ChatMessage, ChatRole
1✔
26
from haystack.dataclasses.breakpoints import AgentBreakpoint, AgentSnapshot, PipelineSnapshot, ToolBreakpoint
1✔
27
from haystack.dataclasses.streaming_chunk import StreamingCallbackT, select_streaming_callback
1✔
28
from haystack.tools import (
1✔
29
    Tool,
30
    Toolset,
31
    ToolsType,
32
    deserialize_tools_or_toolset_inplace,
33
    flatten_tools_or_toolsets,
34
    serialize_tools_or_toolset,
35
)
36
from haystack.utils import _deserialize_value_with_schema
1✔
37
from haystack.utils.callable_serialization import deserialize_callable, serialize_callable
1✔
38
from haystack.utils.deserialization import deserialize_chatgenerator_inplace
1✔
39

40
from .state.state import State, _schema_from_dict, _schema_to_dict, _validate_schema
1✔
41
from .state.state_utils import merge_lists
1✔
42

43
logger = logging.getLogger(__name__)
1✔
44

45

46
@dataclass
1✔
47
class _ExecutionContext:
1✔
48
    """
49
    Context for executing the agent.
50

51
    :param state: The current state of the agent, including messages and any additional data.
52
    :param component_visits: A dictionary tracking how many times each component has been visited.
53
    :param chat_generator_inputs: Runtime inputs to be passed to the chat generator.
54
    :param tool_invoker_inputs: Runtime inputs to be passed to the tool invoker.
55
    :param counter: A counter to track the number of steps taken in the agent's run.
56
    :param skip_chat_generator: A flag to indicate whether to skip the chat generator in the next iteration.
57
        This is useful when resuming from a ToolBreakpoint where the ToolInvoker needs to be called first.
58
    """
59

60
    state: State
1✔
61
    component_visits: dict
1✔
62
    chat_generator_inputs: dict
1✔
63
    tool_invoker_inputs: dict
1✔
64
    counter: int = 0
1✔
65
    skip_chat_generator: bool = False
1✔
66

67

68
@component
1✔
69
class Agent:
1✔
70
    """
71
    A tool-using Agent powered by a large language model.
72

73
    The Agent processes messages and calls tools until it meets an exit condition.
74
    You can set one or more exit conditions to control when it stops.
75
    For example, it can stop after generating a response or after calling a tool.
76

77
    Without tools, the Agent works like a standard LLM that generates text. It produces one response and then stops.
78

79
    ### Usage examples
80

81
    This is an example agent that:
82
    1. Searches for tipping customs in France.
83
    2. Uses a calculator to compute tips based on its findings.
84
    3. Returns the final answer with its context.
85

86
    ```python
87
    from haystack.components.agents import Agent
88
    from haystack.components.generators.chat import OpenAIChatGenerator
89
    from haystack.dataclasses import ChatMessage
90
    from haystack.tools import Tool
91

92
    # Tool functions - in practice, these would have real implementations
93
    def search(query: str) -> str:
94
        '''Search for information on the web.'''
95
        # Placeholder: would call actual search API
96
        return "In France, a 15% service charge is typically included, but leaving 5-10% extra is appreciated."
97

98
    def calculator(operation: str, a: float, b: float) -> float:
99
        '''Perform mathematical calculations.'''
100
        if operation == "multiply":
101
            return a * b
102
        elif operation == "percentage":
103
            return (a / 100) * b
104
        return 0
105

106
    # Define tools with JSON Schema
107
    tools = [
108
        Tool(
109
            name="search",
110
            description="Searches for information on the web",
111
            parameters={
112
                "type": "object",
113
                "properties": {
114
                    "query": {"type": "string", "description": "The search query"}
115
                },
116
                "required": ["query"]
117
            },
118
            function=search
119
        ),
120
        Tool(
121
            name="calculator",
122
            description="Performs mathematical calculations",
123
            parameters={
124
                "type": "object",
125
                "properties": {
126
                    "operation": {"type": "string", "description": "Operation: multiply, percentage"},
127
                    "a": {"type": "number", "description": "First number"},
128
                    "b": {"type": "number", "description": "Second number"}
129
                },
130
                "required": ["operation", "a", "b"]
131
            },
132
            function=calculator
133
        )
134
    ]
135

136
    # Create and run the agent
137
    agent = Agent(
138
        chat_generator=OpenAIChatGenerator(),
139
        tools=tools
140
    )
141

142
    result = agent.run(
143
        messages=[ChatMessage.from_user("Calculate the appropriate tip for an €85 meal in France")]
144
    )
145

146
    print(result["messages"][-1].text)
147
    ```
148

149
    This is a minimal Agent that has deepwiki's MCP server configured as its tool:
150

151
    ```yaml
152
    components:
153
        Agent:
154
            type: haystack.components.agents.agent.Agent
155
            init_parameters:
156
            chat_generator:
157
                init_parameters:
158
                model: gpt-5
159
                type: haystack.components.generators.chat.openai.OpenAIChatGenerator
160
            tools:
161
            - type: haystack_integrations.tools.mcp.MCPToolset
162
                data:
163
                server_info:
164
                    type: haystack_integrations.tools.mcp.mcp_tool.StreamableHttpServerInfo
165
                    url: https://mcp.deepwiki.com/mcp
166
                    timeout: 900
167
                    token:
168
                tool_names:
169
                - read_wiki_structure
170
                - read_wiki_contents
171
                - ask_question
172
                eager_connect: false
173
                _meta:
174
                name: deepwiki
175
                description:
176
                tool_id:
177
            system_prompt: "You are a deep research assistant.
178
            You create comprehensive research reports to answer to user's questions.
179
            You have deepwiki at your disposal.
180
            Use deepwiki to understand, navigate, and explore software projects. "
181
            exit_conditions:
182
            state_schema: {}
183
            max_agent_steps: 100
184
            streaming_callback:
185
            raise_on_tool_invocation_failure: false
186
            tool_invoker_kwargs:
187
        DeepsetChatHistoryParser:
188
            type: deepset_cloud_custom_nodes.parsers.chat_history_parser.DeepsetChatHistoryParser
189
            init_parameters: {}
190
        AnswerBuilder:
191
            type: haystack.components.builders.answer_builder.AnswerBuilder
192
            init_parameters:
193
            pattern:
194
            reference_pattern:
195
            last_message_only: false
196

197
        connections:
198
        - sender: DeepsetChatHistoryParser.messages
199
        receiver: Agent.messages
200
        - sender: Agent.messages
201
        receiver: AnswerBuilder.replies
202

203
        max_runs_per_component: 100
204

205
        metadata: {}
206

207
        inputs:
208
        query:
209
        - DeepsetChatHistoryParser.history_and_query
210
        - AnswerBuilder.query
211

212
        outputs:
213
        answers: AnswerBuilder.answers
214

215
    ```
216

217
    """
218

219
    def __init__(
1✔
220
        self,
221
        *,
222
        chat_generator: ChatGenerator,
223
        tools: Optional[ToolsType] = None,
224
        system_prompt: Optional[str] = None,
225
        exit_conditions: Optional[list[str]] = None,
226
        state_schema: Optional[dict[str, Any]] = None,
227
        max_agent_steps: int = 100,
228
        streaming_callback: Optional[StreamingCallbackT] = None,
229
        raise_on_tool_invocation_failure: bool = False,
230
        tool_invoker_kwargs: Optional[dict[str, Any]] = None,
231
    ) -> None:
232
        """
233
        Initialize the agent component.
234

235
        :param chat_generator: An instance of the chat generator that your agent should use. It must support tools.
236
        :param tools: A list of Tool and/or Toolset objects, or a single Toolset that the agent can use.
237
        :param system_prompt: System prompt for the agent.
238
        :param exit_conditions: List of conditions that will cause the agent to return.
239
            Can include "text" if the agent should return when it generates a message without tool calls,
240
            or tool names that will cause the agent to return once the tool was executed. Defaults to ["text"].
241
        :param state_schema: The schema for the runtime state used by the tools.
242
        :param max_agent_steps: Maximum number of steps the agent will run before stopping. Defaults to 100.
243
            If the agent exceeds this number of steps, it will stop and return the current state.
244
        :param streaming_callback: A callback that will be invoked when a response is streamed from the LLM.
245
            The same callback can be configured to emit tool results when a tool is called.
246
        :param raise_on_tool_invocation_failure: Should the agent raise an exception when a tool invocation fails?
247
            If set to False, the exception will be turned into a chat message and passed to the LLM.
248
        :param tool_invoker_kwargs: Additional keyword arguments to pass to the ToolInvoker.
249
        :raises TypeError: If the chat_generator does not support tools parameter in its run method.
250
        :raises ValueError: If the exit_conditions are not valid.
251
        """
252
        # Check if chat_generator supports tools parameter
253
        chat_generator_run_method = inspect.signature(chat_generator.run)
1✔
254
        if "tools" not in chat_generator_run_method.parameters:
1✔
255
            raise TypeError(
1✔
256
                f"{type(chat_generator).__name__} does not accept tools parameter in its run method. "
257
                "The Agent component requires a chat generator that supports tools."
258
            )
259

260
        valid_exits = ["text"] + [tool.name for tool in flatten_tools_or_toolsets(tools)]
1✔
261
        if exit_conditions is None:
1✔
262
            exit_conditions = ["text"]
1✔
263
        if not all(condition in valid_exits for condition in exit_conditions):
1✔
264
            raise ValueError(
1✔
265
                f"Invalid exit conditions provided: {exit_conditions}. "
266
                f"Valid exit conditions must be a subset of {valid_exits}. "
267
                "Ensure that each exit condition corresponds to either 'text' or a valid tool name."
268
            )
269

270
        # Validate state schema if provided
271
        if state_schema is not None:
1✔
272
            _validate_schema(state_schema)
1✔
273
        self._state_schema = state_schema or {}
1✔
274

275
        # Initialize state schema
276
        resolved_state_schema = _deepcopy_with_exceptions(self._state_schema)
1✔
277
        if resolved_state_schema.get("messages") is None:
1✔
278
            resolved_state_schema["messages"] = {"type": list[ChatMessage], "handler": merge_lists}
1✔
279
        self.state_schema = resolved_state_schema
1✔
280

281
        self.chat_generator = chat_generator
1✔
282
        self.tools = tools or []
1✔
283
        self.system_prompt = system_prompt
1✔
284
        self.exit_conditions = exit_conditions
1✔
285
        self.max_agent_steps = max_agent_steps
1✔
286
        self.raise_on_tool_invocation_failure = raise_on_tool_invocation_failure
1✔
287
        self.streaming_callback = streaming_callback
1✔
288

289
        output_types = {"last_message": ChatMessage}
1✔
290
        for param, config in self.state_schema.items():
1✔
291
            output_types[param] = config["type"]
1✔
292
            # Skip setting input types for parameters that are already in the run method
293
            if param in ["messages", "streaming_callback"]:
1✔
294
                continue
1✔
295
            component.set_input_type(self, name=param, type=config["type"], default=None)
1✔
296
        component.set_output_types(self, **output_types)
1✔
297

298
        self.tool_invoker_kwargs = tool_invoker_kwargs
1✔
299
        self._tool_invoker = None
1✔
300
        if self.tools:
1✔
301
            resolved_tool_invoker_kwargs = {
1✔
302
                "tools": self.tools,
303
                "raise_on_failure": self.raise_on_tool_invocation_failure,
304
                **(tool_invoker_kwargs or {}),
305
            }
306
            self._tool_invoker = ToolInvoker(**resolved_tool_invoker_kwargs)
1✔
307
        else:
308
            logger.warning(
1✔
309
                "No tools provided to the Agent. The Agent will behave like a ChatGenerator and only return text "
310
                "responses. To enable tool usage, pass tools directly to the Agent, not to the chat_generator."
311
            )
312

313
        self._is_warmed_up = False
1✔
314

315
    def warm_up(self) -> None:
1✔
316
        """
317
        Warm up the Agent.
318
        """
319
        if not self._is_warmed_up:
1✔
320
            if hasattr(self.chat_generator, "warm_up"):
1✔
321
                self.chat_generator.warm_up()
1✔
322
            if hasattr(self._tool_invoker, "warm_up") and self._tool_invoker is not None:
1✔
323
                self._tool_invoker.warm_up()
1✔
324
            self._is_warmed_up = True
1✔
325

326
    def to_dict(self) -> dict[str, Any]:
1✔
327
        """
328
        Serialize the component to a dictionary.
329

330
        :return: Dictionary with serialized data
331
        """
332
        return default_to_dict(
1✔
333
            self,
334
            chat_generator=component_to_dict(obj=self.chat_generator, name="chat_generator"),
335
            tools=serialize_tools_or_toolset(self.tools),
336
            system_prompt=self.system_prompt,
337
            exit_conditions=self.exit_conditions,
338
            # We serialize the original state schema, not the resolved one to reflect the original user input
339
            state_schema=_schema_to_dict(self._state_schema),
340
            max_agent_steps=self.max_agent_steps,
341
            streaming_callback=serialize_callable(self.streaming_callback) if self.streaming_callback else None,
342
            raise_on_tool_invocation_failure=self.raise_on_tool_invocation_failure,
343
            tool_invoker_kwargs=self.tool_invoker_kwargs,
344
        )
345

346
    @classmethod
1✔
347
    def from_dict(cls, data: dict[str, Any]) -> "Agent":
1✔
348
        """
349
        Deserialize the agent from a dictionary.
350

351
        :param data: Dictionary to deserialize from
352
        :return: Deserialized agent
353
        """
354
        init_params = data.get("init_parameters", {})
1✔
355

356
        deserialize_chatgenerator_inplace(init_params, key="chat_generator")
1✔
357

358
        if init_params.get("state_schema") is not None:
1✔
359
            init_params["state_schema"] = _schema_from_dict(init_params["state_schema"])
1✔
360

361
        if init_params.get("streaming_callback") is not None:
1✔
362
            init_params["streaming_callback"] = deserialize_callable(init_params["streaming_callback"])
1✔
363

364
        deserialize_tools_or_toolset_inplace(init_params, key="tools")
1✔
365

366
        return default_from_dict(cls, data)
1✔
367

368
    def _create_agent_span(self) -> Any:
1✔
369
        """Create a span for the agent run."""
370
        return tracing.tracer.trace(
1✔
371
            "haystack.agent.run",
372
            tags={
373
                "haystack.agent.max_steps": self.max_agent_steps,
374
                "haystack.agent.tools": self.tools,
375
                "haystack.agent.exit_conditions": self.exit_conditions,
376
                "haystack.agent.state_schema": _schema_to_dict(self.state_schema),
377
            },
378
        )
379

380
    def _initialize_fresh_execution(
1✔
381
        self,
382
        messages: list[ChatMessage],
383
        streaming_callback: Optional[StreamingCallbackT],
384
        requires_async: bool,
385
        *,
386
        system_prompt: Optional[str] = None,
387
        generation_kwargs: Optional[dict[str, Any]] = None,
388
        tools: Optional[Union[ToolsType, list[str]]] = None,
389
        **kwargs,
390
    ) -> _ExecutionContext:
391
        """
392
        Initialize execution context for a fresh run of the agent.
393

394
        :param messages: List of ChatMessage objects to start the agent with.
395
        :param streaming_callback: Optional callback for streaming responses.
396
        :param requires_async: Whether the agent run requires asynchronous execution.
397
        :param system_prompt: System prompt for the agent. If provided, it overrides the default system prompt.
398
        :param generation_kwargs: Additional keyword arguments for chat generator. These parameters will
399
            override the parameters passed during component initialization.
400
        :param tools: Optional list of Tool objects, a Toolset, or list of tool names to use for this run.
401
            When passing tool names, tools are selected from the Agent's originally configured tools.
402
        :param kwargs: Additional data to pass to the State used by the Agent.
403
        """
404
        system_prompt = system_prompt or self.system_prompt
1✔
405
        if system_prompt is not None:
1✔
406
            messages = [ChatMessage.from_system(system_prompt)] + messages
1✔
407

408
        if all(m.is_from(ChatRole.SYSTEM) for m in messages):
1✔
409
            logger.warning("All messages provided to the Agent component are system messages. This is not recommended.")
1✔
410

411
        state = State(schema=self.state_schema, data=kwargs)
1✔
412
        state.set("messages", messages)
1✔
413

414
        streaming_callback = select_streaming_callback(  # type: ignore[call-overload]
1✔
415
            init_callback=self.streaming_callback, runtime_callback=streaming_callback, requires_async=requires_async
416
        )
417

418
        selected_tools = self._select_tools(tools)
1✔
419
        tool_invoker_inputs: dict[str, Any] = {"tools": selected_tools}
1✔
420
        generator_inputs: dict[str, Any] = {"tools": selected_tools}
1✔
421
        if streaming_callback is not None:
1✔
422
            tool_invoker_inputs["streaming_callback"] = streaming_callback
1✔
423
            generator_inputs["streaming_callback"] = streaming_callback
1✔
424
        if generation_kwargs is not None:
1✔
425
            generator_inputs["generation_kwargs"] = generation_kwargs
1✔
426

427
        return _ExecutionContext(
1✔
428
            state=state,
429
            component_visits=dict.fromkeys(["chat_generator", "tool_invoker"], 0),
430
            chat_generator_inputs=generator_inputs,
431
            tool_invoker_inputs=tool_invoker_inputs,
432
        )
433

434
    def _select_tools(self, tools: Optional[Union[ToolsType, list[str]]] = None) -> ToolsType:
1✔
435
        """
436
        Select tools for the current run based on the provided tools parameter.
437

438
        :param tools: Optional list of Tool objects, a Toolset, or list of tool names to use for this run.
439
            When passing tool names, tools are selected from the Agent's originally configured tools.
440
        :returns: Selected tools for the current run.
441
        :raises ValueError: If tool names are provided but no tools were configured at initialization,
442
            or if any provided tool name is not valid.
443
        :raises TypeError: If tools is not a list of Tool objects, a Toolset, or a list of tool names (strings).
444
        """
445
        if tools is None:
1✔
446
            return self.tools
1✔
447

448
        if isinstance(tools, list) and all(isinstance(t, str) for t in tools):
1✔
449
            if not self.tools:
1✔
450
                raise ValueError("No tools were configured for the Agent at initialization.")
1✔
451
            available_tools = flatten_tools_or_toolsets(self.tools)
1✔
452
            selected_tool_names = cast(list[str], tools)  # mypy thinks this could still be list[Tool] or Toolset
1✔
453
            valid_tool_names = {tool.name for tool in available_tools}
1✔
454
            invalid_tool_names = {name for name in selected_tool_names if name not in valid_tool_names}
1✔
455
            if invalid_tool_names:
1✔
456
                raise ValueError(
1✔
457
                    f"The following tool names are not valid: {invalid_tool_names}. "
458
                    f"Valid tool names are: {valid_tool_names}."
459
                )
460
            return [tool for tool in available_tools if tool.name in selected_tool_names]
1✔
461

462
        if isinstance(tools, Toolset):
1✔
463
            return tools
×
464

465
        if isinstance(tools, list):
1✔
466
            return cast(list[Union[Tool, Toolset]], tools)  # mypy can't narrow the Union type from isinstance check
1✔
467

468
        raise TypeError(
1✔
469
            "tools must be a list of Tool and/or Toolset objects, a Toolset, or a list of tool names (strings)."
470
        )
471

472
    def _initialize_from_snapshot(
1✔
473
        self,
474
        snapshot: AgentSnapshot,
475
        streaming_callback: Optional[StreamingCallbackT],
476
        requires_async: bool,
477
        *,
478
        generation_kwargs: Optional[dict[str, Any]] = None,
479
        tools: Optional[Union[ToolsType, list[str]]] = None,
480
    ) -> _ExecutionContext:
481
        """
482
        Initialize execution context from an AgentSnapshot.
483

484
        :param snapshot: An AgentSnapshot containing the state of a previously saved agent execution.
485
        :param streaming_callback: Optional callback for streaming responses.
486
        :param requires_async: Whether the agent run requires asynchronous execution.
487
        :param generation_kwargs: Additional keyword arguments for chat generator. These parameters will
488
            override the parameters passed during component initialization.
489
        :param tools: Optional list of Tool objects, a Toolset, or list of tool names to use for this run.
490
            When passing tool names, tools are selected from the Agent's originally configured tools.
491
        """
492
        component_visits = snapshot.component_visits
1✔
493
        current_inputs = {
1✔
494
            "chat_generator": _deserialize_value_with_schema(snapshot.component_inputs["chat_generator"]),
495
            "tool_invoker": _deserialize_value_with_schema(snapshot.component_inputs["tool_invoker"]),
496
        }
497

498
        state_data = current_inputs["tool_invoker"]["state"].data
1✔
499
        state = State(schema=self.state_schema, data=state_data)
1✔
500

501
        skip_chat_generator = isinstance(snapshot.break_point.break_point, ToolBreakpoint)
1✔
502
        streaming_callback = current_inputs["chat_generator"].get("streaming_callback", streaming_callback)
1✔
503
        streaming_callback = select_streaming_callback(  # type: ignore[call-overload]
1✔
504
            init_callback=self.streaming_callback, runtime_callback=streaming_callback, requires_async=requires_async
505
        )
506

507
        selected_tools = self._select_tools(tools)
1✔
508
        tool_invoker_inputs: dict[str, Any] = {"tools": selected_tools}
1✔
509
        generator_inputs: dict[str, Any] = {"tools": selected_tools}
1✔
510
        if streaming_callback is not None:
1✔
511
            tool_invoker_inputs["streaming_callback"] = streaming_callback
×
512
            generator_inputs["streaming_callback"] = streaming_callback
×
513
        if generation_kwargs is not None:
1✔
514
            generator_inputs["generation_kwargs"] = generation_kwargs
×
515

516
        return _ExecutionContext(
1✔
517
            state=state,
518
            component_visits=component_visits,
519
            chat_generator_inputs=generator_inputs,
520
            tool_invoker_inputs=tool_invoker_inputs,
521
            counter=snapshot.break_point.break_point.visit_count,
522
            skip_chat_generator=skip_chat_generator,
523
        )
524

525
    def _runtime_checks(self, break_point: Optional[AgentBreakpoint]) -> None:
1✔
526
        """
527
        Perform runtime checks before running the agent.
528

529
        :param break_point: An AgentBreakpoint, can be a Breakpoint for the "chat_generator" or a ToolBreakpoint
530
            for "tool_invoker".
531
        :raises ValueError: If the break_point is invalid.
532
        """
533
        if not self._is_warmed_up:
1✔
534
            self.warm_up()
1✔
535

536
        if break_point and isinstance(break_point.break_point, ToolBreakpoint):
1✔
537
            _validate_tool_breakpoint_is_valid(agent_breakpoint=break_point, tools=self.tools)
1✔
538

539
    @staticmethod
1✔
540
    def _check_chat_generator_breakpoint(
1✔
541
        execution_context: _ExecutionContext,
542
        break_point: Optional[AgentBreakpoint],
543
        parent_snapshot: Optional[PipelineSnapshot],
544
    ) -> None:
545
        """
546
        Check if the chat generator breakpoint should be triggered.
547

548
        If the breakpoint should be triggered, create an agent snapshot and trigger the chat generator breakpoint.
549

550
        :param execution_context: The current execution context of the agent.
551
        :param break_point: An AgentBreakpoint, can be a Breakpoint for the "chat_generator" or a ToolBreakpoint
552
            for "tool_invoker".
553
        :param parent_snapshot: An optional parent snapshot for the agent execution.
554
        """
555
        if (
1✔
556
            break_point
557
            and break_point.break_point.component_name == "chat_generator"
558
            and execution_context.component_visits["chat_generator"] == break_point.break_point.visit_count
559
        ):
560
            pipeline_snapshot = _create_pipeline_snapshot_from_chat_generator(
1✔
561
                execution_context=execution_context, break_point=break_point, parent_snapshot=parent_snapshot
562
            )
563
            _trigger_chat_generator_breakpoint(pipeline_snapshot=pipeline_snapshot)
1✔
564

565
    @staticmethod
1✔
566
    def _check_tool_invoker_breakpoint(
1✔
567
        execution_context: _ExecutionContext,
568
        break_point: Optional[AgentBreakpoint],
569
        parent_snapshot: Optional[PipelineSnapshot],
570
    ) -> None:
571
        """
572
        Check if the tool invoker breakpoint should be triggered.
573

574
        If the breakpoint should be triggered, create an agent snapshot and trigger the tool invoker breakpoint.
575

576
        :param execution_context: The current execution context of the agent.
577
        :param break_point: An AgentBreakpoint, can be a Breakpoint for the "chat_generator" or a ToolBreakpoint
578
            for "tool_invoker".
579
        :param parent_snapshot: An optional parent snapshot for the agent execution.
580
        """
581
        if (
1✔
582
            break_point
583
            and break_point.break_point.component_name == "tool_invoker"
584
            and break_point.break_point.visit_count == execution_context.component_visits["tool_invoker"]
585
        ):
586
            pipeline_snapshot = _create_pipeline_snapshot_from_tool_invoker(
1✔
587
                execution_context=execution_context, break_point=break_point, parent_snapshot=parent_snapshot
588
            )
589
            _trigger_tool_invoker_breakpoint(
1✔
590
                llm_messages=execution_context.state.data["messages"][-1:], pipeline_snapshot=pipeline_snapshot
591
            )
592

593
    def run(  # noqa: PLR0915
1✔
594
        self,
595
        messages: list[ChatMessage],
596
        streaming_callback: Optional[StreamingCallbackT] = None,
597
        *,
598
        generation_kwargs: Optional[dict[str, Any]] = None,
599
        break_point: Optional[AgentBreakpoint] = None,
600
        snapshot: Optional[AgentSnapshot] = None,
601
        system_prompt: Optional[str] = None,
602
        tools: Optional[Union[ToolsType, list[str]]] = None,
603
        **kwargs: Any,
604
    ) -> dict[str, Any]:
605
        """
606
        Process messages and execute tools until an exit condition is met.
607

608
        :param messages: List of Haystack ChatMessage objects to process.
609
        :param streaming_callback: A callback that will be invoked when a response is streamed from the LLM.
610
            The same callback can be configured to emit tool results when a tool is called.
611
        :param generation_kwargs: Additional keyword arguments for LLM. These parameters will
612
            override the parameters passed during component initialization.
613
        :param break_point: An AgentBreakpoint, can be a Breakpoint for the "chat_generator" or a ToolBreakpoint
614
            for "tool_invoker".
615
        :param snapshot: A dictionary containing a snapshot of a previously saved agent execution. The snapshot contains
616
            the relevant information to restart the Agent execution from where it left off.
617
        :param system_prompt: System prompt for the agent. If provided, it overrides the default system prompt.
618
        :param tools: Optional list of Tool objects, a Toolset, or list of tool names to use for this run.
619
            When passing tool names, tools are selected from the Agent's originally configured tools.
620
        :param kwargs: Additional data to pass to the State schema used by the Agent.
621
            The keys must match the schema defined in the Agent's `state_schema`.
622
        :returns:
623
            A dictionary with the following keys:
624
            - "messages": List of all messages exchanged during the agent's run.
625
            - "last_message": The last message exchanged during the agent's run.
626
            - Any additional keys defined in the `state_schema`.
627
        :raises RuntimeError: If the Agent component wasn't warmed up before calling `run()`.
628
        :raises BreakpointException: If an agent breakpoint is triggered.
629
        """
630
        # We pop parent_snapshot from kwargs to avoid passing it into State.
631
        parent_snapshot = kwargs.pop("parent_snapshot", None)
1✔
632
        agent_inputs = {
1✔
633
            "messages": messages,
634
            "streaming_callback": streaming_callback,
635
            "break_point": break_point,
636
            "snapshot": snapshot,
637
            **kwargs,
638
        }
639
        self._runtime_checks(break_point=break_point)
1✔
640

641
        if snapshot:
1✔
642
            exe_context = self._initialize_from_snapshot(
1✔
643
                snapshot=snapshot,
644
                streaming_callback=streaming_callback,
645
                requires_async=False,
646
                tools=tools,
647
                generation_kwargs=generation_kwargs,
648
            )
649
        else:
650
            exe_context = self._initialize_fresh_execution(
1✔
651
                messages=messages,
652
                streaming_callback=streaming_callback,
653
                requires_async=False,
654
                system_prompt=system_prompt,
655
                tools=tools,
656
                generation_kwargs=generation_kwargs,
657
                **kwargs,
658
            )
659

660
        with self._create_agent_span() as span:
1✔
661
            span.set_content_tag("haystack.agent.input", _deepcopy_with_exceptions(agent_inputs))
1✔
662

663
            while exe_context.counter < self.max_agent_steps:
1✔
664
                # Handle breakpoint and ChatGenerator call
665
                Agent._check_chat_generator_breakpoint(
1✔
666
                    execution_context=exe_context, break_point=break_point, parent_snapshot=parent_snapshot
667
                )
668
                # We skip the chat generator when restarting from a snapshot from a ToolBreakpoint
669
                if exe_context.skip_chat_generator:
1✔
670
                    llm_messages = exe_context.state.get("messages", [])[-1:]
1✔
671
                    # Set to False so the next iteration will call the chat generator
672
                    exe_context.skip_chat_generator = False
1✔
673
                else:
674
                    try:
1✔
675
                        result = Pipeline._run_component(
1✔
676
                            component_name="chat_generator",
677
                            component={"instance": self.chat_generator},
678
                            inputs={
679
                                "messages": exe_context.state.data["messages"],
680
                                **exe_context.chat_generator_inputs,
681
                            },
682
                            component_visits=exe_context.component_visits,
683
                            parent_span=span,
684
                        )
685
                    except PipelineRuntimeError as e:
1✔
686
                        pipeline_snapshot = _create_pipeline_snapshot_from_chat_generator(
1✔
687
                            agent_name=getattr(self, "__component_name__", None),
688
                            execution_context=exe_context,
689
                            parent_snapshot=parent_snapshot,
690
                        )
691
                        e.pipeline_snapshot = pipeline_snapshot
1✔
692
                        raise e
1✔
693

694
                    llm_messages = result["replies"]
1✔
695
                    exe_context.state.set("messages", llm_messages)
1✔
696

697
                # Check if any of the LLM responses contain a tool call or if the LLM is not using tools
698
                if not any(msg.tool_call for msg in llm_messages) or self._tool_invoker is None:
1✔
699
                    exe_context.counter += 1
1✔
700
                    break
1✔
701

702
                # Handle breakpoint and ToolInvoker call
703
                Agent._check_tool_invoker_breakpoint(
1✔
704
                    execution_context=exe_context, break_point=break_point, parent_snapshot=parent_snapshot
705
                )
706
                try:
1✔
707
                    # We only send the messages from the LLM to the tool invoker
708
                    tool_invoker_result = Pipeline._run_component(
1✔
709
                        component_name="tool_invoker",
710
                        component={"instance": self._tool_invoker},
711
                        inputs={
712
                            "messages": llm_messages,
713
                            "state": exe_context.state,
714
                            **exe_context.tool_invoker_inputs,
715
                        },
716
                        component_visits=exe_context.component_visits,
717
                        parent_span=span,
718
                    )
719
                except PipelineRuntimeError as e:
1✔
720
                    # Access the original Tool Invoker exception
721
                    original_error = e.__cause__
1✔
722
                    tool_name = getattr(original_error, "tool_name", None)
1✔
723

724
                    pipeline_snapshot = _create_pipeline_snapshot_from_tool_invoker(
1✔
725
                        tool_name=tool_name,
726
                        agent_name=getattr(self, "__component_name__", None),
727
                        execution_context=exe_context,
728
                        parent_snapshot=parent_snapshot,
729
                    )
730
                    e.pipeline_snapshot = pipeline_snapshot
1✔
731
                    raise e
1✔
732

733
                tool_messages = tool_invoker_result["tool_messages"]
1✔
734
                exe_context.state = tool_invoker_result["state"]
1✔
735
                exe_context.state.set("messages", tool_messages)
1✔
736

737
                # Check if any LLM message's tool call name matches an exit condition
738
                if self.exit_conditions != ["text"] and self._check_exit_conditions(llm_messages, tool_messages):
1✔
739
                    exe_context.counter += 1
1✔
740
                    break
1✔
741

742
                # Increment the step counter
743
                exe_context.counter += 1
1✔
744

745
            if exe_context.counter >= self.max_agent_steps:
1✔
746
                logger.warning(
1✔
747
                    "Agent reached maximum agent steps of {max_agent_steps}, stopping.",
748
                    max_agent_steps=self.max_agent_steps,
749
                )
750
            span.set_content_tag("haystack.agent.output", exe_context.state.data)
1✔
751
            span.set_tag("haystack.agent.steps_taken", exe_context.counter)
1✔
752

753
        result = {**exe_context.state.data}
1✔
754
        if msgs := result.get("messages"):
1✔
755
            result["last_message"] = msgs[-1]
1✔
756
        return result
1✔
757

758
    async def run_async(
1✔
759
        self,
760
        messages: list[ChatMessage],
761
        streaming_callback: Optional[StreamingCallbackT] = None,
762
        *,
763
        generation_kwargs: Optional[dict[str, Any]] = None,
764
        break_point: Optional[AgentBreakpoint] = None,
765
        snapshot: Optional[AgentSnapshot] = None,
766
        system_prompt: Optional[str] = None,
767
        tools: Optional[Union[ToolsType, list[str]]] = None,
768
        **kwargs: Any,
769
    ) -> dict[str, Any]:
770
        """
771
        Asynchronously process messages and execute tools until the exit condition is met.
772

773
        This is the asynchronous version of the `run` method. It follows the same logic but uses
774
        asynchronous operations where possible, such as calling the `run_async` method of the ChatGenerator
775
        if available.
776

777
        :param messages: List of Haystack ChatMessage objects to process.
778
        :param streaming_callback: An asynchronous callback that will be invoked when a response is streamed from the
779
            LLM. The same callback can be configured to emit tool results when a tool is called.
780
        :param generation_kwargs: Additional keyword arguments for LLM. These parameters will
781
            override the parameters passed during component initialization.
782
        :param break_point: An AgentBreakpoint, can be a Breakpoint for the "chat_generator" or a ToolBreakpoint
783
            for "tool_invoker".
784
        :param snapshot: A dictionary containing a snapshot of a previously saved agent execution. The snapshot contains
785
            the relevant information to restart the Agent execution from where it left off.
786
        :param system_prompt: System prompt for the agent. If provided, it overrides the default system prompt.
787
        :param tools: Optional list of Tool objects, a Toolset, or list of tool names to use for this run.
788
        :param kwargs: Additional data to pass to the State schema used by the Agent.
789
            The keys must match the schema defined in the Agent's `state_schema`.
790
        :returns:
791
            A dictionary with the following keys:
792
            - "messages": List of all messages exchanged during the agent's run.
793
            - "last_message": The last message exchanged during the agent's run.
794
            - Any additional keys defined in the `state_schema`.
795
        :raises RuntimeError: If the Agent component wasn't warmed up before calling `run_async()`.
796
        :raises BreakpointException: If an agent breakpoint is triggered.
797
        """
798
        # We pop parent_snapshot from kwargs to avoid passing it into State.
799
        parent_snapshot = kwargs.pop("parent_snapshot", None)
1✔
800
        agent_inputs = {
1✔
801
            "messages": messages,
802
            "streaming_callback": streaming_callback,
803
            "break_point": break_point,
804
            "snapshot": snapshot,
805
            **kwargs,
806
        }
807
        self._runtime_checks(break_point=break_point)
1✔
808

809
        if snapshot:
1✔
810
            exe_context = self._initialize_from_snapshot(
1✔
811
                snapshot=snapshot,
812
                streaming_callback=streaming_callback,
813
                requires_async=True,
814
                tools=tools,
815
                generation_kwargs=generation_kwargs,
816
            )
817
        else:
818
            exe_context = self._initialize_fresh_execution(
1✔
819
                messages=messages,
820
                streaming_callback=streaming_callback,
821
                requires_async=True,
822
                system_prompt=system_prompt,
823
                tools=tools,
824
                generation_kwargs=generation_kwargs,
825
                **kwargs,
826
            )
827

828
        with self._create_agent_span() as span:
1✔
829
            span.set_content_tag("haystack.agent.input", _deepcopy_with_exceptions(agent_inputs))
1✔
830

831
            while exe_context.counter < self.max_agent_steps:
1✔
832
                # Handle breakpoint and ChatGenerator call
833
                self._check_chat_generator_breakpoint(
1✔
834
                    execution_context=exe_context, break_point=break_point, parent_snapshot=parent_snapshot
835
                )
836
                # We skip the chat generator when restarting from a snapshot from a ToolBreakpoint
837
                if exe_context.skip_chat_generator:
1✔
838
                    llm_messages = exe_context.state.get("messages", [])[-1:]
1✔
839
                    # Set to False so the next iteration will call the chat generator
840
                    exe_context.skip_chat_generator = False
1✔
841
                else:
842
                    result = await AsyncPipeline._run_component_async(
1✔
843
                        component_name="chat_generator",
844
                        component={"instance": self.chat_generator},
845
                        component_inputs={
846
                            "messages": exe_context.state.data["messages"],
847
                            **exe_context.chat_generator_inputs,
848
                        },
849
                        component_visits=exe_context.component_visits,
850
                        parent_span=span,
851
                    )
852
                    llm_messages = result["replies"]
1✔
853
                    exe_context.state.set("messages", llm_messages)
1✔
854

855
                # Check if any of the LLM responses contain a tool call or if the LLM is not using tools
856
                if not any(msg.tool_call for msg in llm_messages) or self._tool_invoker is None:
1✔
857
                    exe_context.counter += 1
1✔
858
                    break
1✔
859

860
                # Handle breakpoint and ToolInvoker call
861
                self._check_tool_invoker_breakpoint(
1✔
862
                    execution_context=exe_context, break_point=break_point, parent_snapshot=parent_snapshot
863
                )
864
                # We only send the messages from the LLM to the tool invoker
865
                tool_invoker_result = await AsyncPipeline._run_component_async(
1✔
866
                    component_name="tool_invoker",
867
                    component={"instance": self._tool_invoker},
868
                    component_inputs={
869
                        "messages": llm_messages,
870
                        "state": exe_context.state,
871
                        **exe_context.tool_invoker_inputs,
872
                    },
873
                    component_visits=exe_context.component_visits,
874
                    parent_span=span,
875
                )
876
                tool_messages = tool_invoker_result["tool_messages"]
1✔
877
                exe_context.state = tool_invoker_result["state"]
1✔
878
                exe_context.state.set("messages", tool_messages)
1✔
879

880
                # Check if any LLM message's tool call name matches an exit condition
881
                if self.exit_conditions != ["text"] and self._check_exit_conditions(llm_messages, tool_messages):
1✔
882
                    exe_context.counter += 1
×
883
                    break
×
884

885
                # Increment the step counter
886
                exe_context.counter += 1
1✔
887

888
            if exe_context.counter >= self.max_agent_steps:
1✔
889
                logger.warning(
×
890
                    "Agent reached maximum agent steps of {max_agent_steps}, stopping.",
891
                    max_agent_steps=self.max_agent_steps,
892
                )
893
            span.set_content_tag("haystack.agent.output", exe_context.state.data)
1✔
894
            span.set_tag("haystack.agent.steps_taken", exe_context.counter)
1✔
895

896
        result = {**exe_context.state.data}
1✔
897
        if msgs := result.get("messages"):
1✔
898
            result["last_message"] = msgs[-1]
1✔
899
        return result
1✔
900

901
    def _check_exit_conditions(self, llm_messages: list[ChatMessage], tool_messages: list[ChatMessage]) -> bool:
1✔
902
        """
903
        Check if any of the LLM messages' tool calls match an exit condition and if there are no errors.
904

905
        :param llm_messages: List of messages from the LLM
906
        :param tool_messages: List of messages from the tool invoker
907
        :return: True if an exit condition is met and there are no errors, False otherwise
908
        """
909
        matched_exit_conditions = set()
1✔
910
        has_errors = False
1✔
911

912
        for msg in llm_messages:
1✔
913
            if msg.tool_call and msg.tool_call.tool_name in self.exit_conditions:
1✔
914
                matched_exit_conditions.add(msg.tool_call.tool_name)
1✔
915

916
                # Check if any error is specifically from the tool matching the exit condition
917
                tool_errors = [
1✔
918
                    tool_msg.tool_call_result.error
919
                    for tool_msg in tool_messages
920
                    if tool_msg.tool_call_result is not None
921
                    and tool_msg.tool_call_result.origin.tool_name == msg.tool_call.tool_name
922
                ]
923
                if any(tool_errors):
1✔
924
                    has_errors = True
×
925
                    # No need to check further if we found an error
926
                    break
×
927

928
        # Only return True if at least one exit condition was matched AND none had errors
929
        return bool(matched_exit_conditions) and not has_errors
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc