• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 14189712627

01 Apr 2025 07:25AM UTC coverage: 90.231% (-0.01%) from 90.241%
14189712627

Pull #9150

github

web-flow
Merge 183cdbf8c into b12af1e6a
Pull Request #9150: feat: Move storing of messages into State in Agent

10252 of 11362 relevant lines covered (90.23%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.06
haystack/components/agents/agent.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import inspect
1✔
6
from typing import Any, Dict, List, Optional
1✔
7

8
from haystack import component, default_from_dict, default_to_dict, logging
1✔
9
from haystack.components.generators.chat.types import ChatGenerator
1✔
10
from haystack.components.tools import ToolInvoker
1✔
11
from haystack.dataclasses import ChatMessage
1✔
12
from haystack.dataclasses.state import State, _schema_from_dict, _schema_to_dict, _validate_schema
1✔
13
from haystack.dataclasses.streaming_chunk import SyncStreamingCallbackT
1✔
14
from haystack.tools import Tool, deserialize_tools_inplace
1✔
15
from haystack.utils.callable_serialization import deserialize_callable, serialize_callable
1✔
16
from haystack.utils.deserialization import deserialize_chatgenerator_inplace
1✔
17

18
logger = logging.getLogger(__name__)
1✔
19

20

21
@component
1✔
22
class Agent:
1✔
23
    """
24
    A Haystack component that implements a tool-using agent with provider-agnostic chat model support.
25

26
    The component processes messages and executes tools until a exit_condition condition is met.
27
    The exit_condition can be triggered either by a direct text response or by invoking a specific designated tool.
28

29
    ### Usage example
30
    ```python
31
    from haystack.components.agents import Agent
32
    from haystack.components.generators.chat import OpenAIChatGenerator
33
    from haystack.dataclasses import ChatMessage
34
    from haystack.tools.tool import Tool
35

36
    tools = [Tool(name="calculator", description="..."), Tool(name="search", description="...")]
37

38
    agent = Agent(
39
        chat_generator=OpenAIChatGenerator(),
40
        tools=tools,
41
        exit_condition="search",
42
    )
43

44
    # Run the agent
45
    result = agent.run(
46
        messages=[ChatMessage.from_user("Find information about Haystack")]
47
    )
48

49
    assert "messages" in result  # Contains conversation history
50
    ```
51
    """
52

53
    def __init__(
1✔
54
        self,
55
        *,
56
        chat_generator: ChatGenerator,
57
        tools: Optional[List[Tool]] = None,
58
        system_prompt: Optional[str] = None,
59
        exit_conditions: Optional[List[str]] = None,
60
        state_schema: Optional[Dict[str, Any]] = None,
61
        max_agent_steps: int = 100,
62
        raise_on_tool_invocation_failure: bool = False,
63
        streaming_callback: Optional[SyncStreamingCallbackT] = None,
64
    ):
65
        """
66
        Initialize the agent component.
67

68
        :param chat_generator: An instance of the chat generator that your agent should use. It must support tools.
69
        :param tools: List of Tool objects available to the agent
70
        :param system_prompt: System prompt for the agent.
71
        :param exit_conditions: List of conditions that will cause the agent to return.
72
            Can include "text" if the agent should return when it generates a message without tool calls,
73
            or tool names that will cause the agent to return once the tool was executed. Defaults to ["text"].
74
        :param state_schema: The schema for the runtime state used by the tools.
75
        :param max_agent_steps: Maximum number of steps the agent will run before stopping. Defaults to 100.
76
            If the agent exceeds this number of steps, it will stop and return the current state.
77
        :param raise_on_tool_invocation_failure: Should the agent raise an exception when a tool invocation fails?
78
            If set to False, the exception will be turned into a chat message and passed to the LLM.
79
        :param streaming_callback: A callback that will be invoked when a response is streamed from the LLM.
80
        :raises TypeError: If the chat_generator does not support tools parameter in its run method.
81
        """
82
        # Check if chat_generator supports tools parameter
83
        chat_generator_run_method = inspect.signature(chat_generator.run)
1✔
84
        if "tools" not in chat_generator_run_method.parameters:
1✔
85
            raise TypeError(
1✔
86
                f"{type(chat_generator).__name__} does not accept tools parameter in its run method. "
87
                "The Agent component requires a chat generator that supports tools."
88
            )
89

90
        valid_exits = ["text"] + [tool.name for tool in tools or []]
1✔
91
        if exit_conditions is None:
1✔
92
            exit_conditions = ["text"]
1✔
93
        if not all(condition in valid_exits for condition in exit_conditions):
1✔
94
            raise ValueError(
1✔
95
                f"Invalid exit conditions provided: {exit_conditions}. "
96
                f"Valid exit conditions must be a subset of {valid_exits}. "
97
                "Ensure that each exit condition corresponds to either 'text' or a valid tool name."
98
            )
99

100
        if state_schema is not None:
1✔
101
            _validate_schema(state_schema)
1✔
102
        self.state_schema = state_schema or {}
1✔
103

104
        self.chat_generator = chat_generator
1✔
105
        self.tools = tools or []
1✔
106
        self.system_prompt = system_prompt
1✔
107
        self.exit_conditions = exit_conditions
1✔
108
        self.max_agent_steps = max_agent_steps
1✔
109
        self.raise_on_tool_invocation_failure = raise_on_tool_invocation_failure
1✔
110
        self.streaming_callback = streaming_callback
1✔
111

112
        output_types = {}
1✔
113
        for param, config in self.state_schema.items():
1✔
114
            component.set_input_type(self, name=param, type=config["type"], default=None)
1✔
115
            output_types[param] = config["type"]
1✔
116
        component.set_output_types(self, **output_types)
1✔
117

118
        self._tool_invoker = ToolInvoker(tools=self.tools, raise_on_failure=self.raise_on_tool_invocation_failure)
1✔
119
        self._is_warmed_up = False
1✔
120

121
    def warm_up(self) -> None:
1✔
122
        """
123
        Warm up the Agent.
124
        """
125
        if not self._is_warmed_up:
1✔
126
            if hasattr(self.chat_generator, "warm_up"):
1✔
127
                self.chat_generator.warm_up()
×
128
            self._is_warmed_up = True
1✔
129

130
    def to_dict(self) -> Dict[str, Any]:
1✔
131
        """
132
        Serialize the component to a dictionary.
133

134
        :return: Dictionary with serialized data
135
        """
136
        if self.streaming_callback is not None:
1✔
137
            streaming_callback = serialize_callable(self.streaming_callback)
1✔
138
        else:
139
            streaming_callback = None
1✔
140

141
        return default_to_dict(
1✔
142
            self,
143
            chat_generator=self.chat_generator.to_dict(),
144
            tools=[t.to_dict() for t in self.tools],
145
            system_prompt=self.system_prompt,
146
            exit_conditions=self.exit_conditions,
147
            state_schema=_schema_to_dict(self.state_schema),
148
            max_agent_steps=self.max_agent_steps,
149
            raise_on_tool_invocation_failure=self.raise_on_tool_invocation_failure,
150
            streaming_callback=streaming_callback,
151
        )
152

153
    @classmethod
1✔
154
    def from_dict(cls, data: Dict[str, Any]) -> "Agent":
1✔
155
        """
156
        Deserialize the agent from a dictionary.
157

158
        :param data: Dictionary to deserialize from
159
        :return: Deserialized agent
160
        """
161
        init_params = data.get("init_parameters", {})
1✔
162

163
        deserialize_chatgenerator_inplace(init_params, key="chat_generator")
1✔
164

165
        if "state_schema" in init_params:
1✔
166
            init_params["state_schema"] = _schema_from_dict(init_params["state_schema"])
1✔
167

168
        if init_params.get("streaming_callback") is not None:
1✔
169
            init_params["streaming_callback"] = deserialize_callable(init_params["streaming_callback"])
1✔
170

171
        deserialize_tools_inplace(init_params, key="tools")
1✔
172

173
        return default_from_dict(cls, data)
1✔
174

175
    def run(
1✔
176
        self,
177
        messages: List[ChatMessage],
178
        streaming_callback: Optional[SyncStreamingCallbackT] = None,
179
        **kwargs: Dict[str, Any],
180
    ) -> Dict[str, Any]:
181
        """
182
        Process messages and execute tools until the exit condition is met.
183

184
        :param messages: List of chat messages to process
185
        :param streaming_callback: A callback that will be invoked when a response is streamed from the LLM.
186
        :param kwargs: Additional data to pass to the State schema used by the Agent.
187
            The keys must match the schema defined in the Agent's `state_schema`.
188
        :return: Dictionary containing messages and outputs matching the defined output types
189
        """
190
        if not self._is_warmed_up and hasattr(self.chat_generator, "warm_up"):
1✔
191
            raise RuntimeError("The component Agent wasn't warmed up. Run 'warm_up()' before calling 'run()'.")
×
192

193
        state = State(schema=self.state_schema, data=kwargs)
1✔
194

195
        if self.system_prompt is not None:
1✔
196
            messages = [ChatMessage.from_system(self.system_prompt)] + messages
×
197
        state.set("messages", messages)
1✔
198

199
        generator_inputs: Dict[str, Any] = {"tools": self.tools}
1✔
200

201
        selected_callback = streaming_callback or self.streaming_callback
1✔
202
        if selected_callback is not None:
1✔
203
            generator_inputs["streaming_callback"] = selected_callback
1✔
204

205
        # Repeat until the exit condition is met
206
        counter = 0
1✔
207
        while counter < self.max_agent_steps:
1✔
208
            # 1. Call the ChatGenerator
209
            llm_messages = self.chat_generator.run(messages=messages, **generator_inputs)["replies"]
1✔
210
            state.set("messages", llm_messages)
1✔
211

212
            # TODO Possible for LLM to return multiple messages (e.g. multiple tool calls)
213
            #      Would a better check be to see if any of the messages contain a tool call?
214
            # 2. Check if the LLM response contains a tool call
215
            if llm_messages[0].tool_call is None:
1✔
216
                return {**state.data}
1✔
217

218
            # 3. Call the ToolInvoker
219
            # We only send the messages from the LLM to the tool invoker
220
            tool_invoker_result = self._tool_invoker.run(messages=llm_messages, state=state)
×
221
            tool_messages = tool_invoker_result["tool_messages"]
×
222
            state = tool_invoker_result["state"]
×
223
            state.set("messages", tool_messages)
×
224

225
            # 4. Check the LLM and Tool response for exit conditions, if exit_conditions contains a tool name
226
            # TODO Possible for LLM to return multiple messages (e.g. multiple tool calls)
227
            #      So exit conditions could be missed if it's not the first message
228
            if self.exit_conditions != ["text"] and (
×
229
                llm_messages[0].tool_call.tool_name in self.exit_conditions
230
                and not tool_messages[0].tool_call_result.error
231
            ):
232
                return {**state.data}
×
233

234
            # 5. Combine messages, llm_messages and tool_messages and send to the ChatGenerator
235
            messages = state.get("messages")
×
236
            counter += 1
×
237

238
        logger.warning(
×
239
            "Agent exceeded maximum runs per component ({max_agent_steps}), stopping.",
240
            max_agent_steps=self.max_agent_steps,
241
        )
242
        return {**state.data}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc