• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 15825051859

23 Jun 2025 01:05PM UTC coverage: 90.176% (-0.005%) from 90.181%
15825051859

Pull #9536

github

web-flow
Merge 0408d779d into 556dcc9e4
Pull Request #9536: feat: Add `finish_reason` field to `StreamingChunk`

11575 of 12836 relevant lines covered (90.18%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.94
haystack/components/tools/tool_invoker.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import asyncio
1✔
6
import inspect
1✔
7
import json
1✔
8
from concurrent.futures import ThreadPoolExecutor
1✔
9
from functools import partial
1✔
10
from typing import Any, Dict, List, Optional, Set, Union
1✔
11

12
from haystack import component, default_from_dict, default_to_dict, logging
1✔
13
from haystack.components.agents import State
1✔
14
from haystack.core.component.sockets import Sockets
1✔
15
from haystack.dataclasses import ChatMessage, ToolCall
1✔
16
from haystack.dataclasses.streaming_chunk import StreamingCallbackT, StreamingChunk, select_streaming_callback
1✔
17
from haystack.tools import (
1✔
18
    ComponentTool,
19
    Tool,
20
    Toolset,
21
    _check_duplicate_tool_names,
22
    deserialize_tools_or_toolset_inplace,
23
    serialize_tools_or_toolset,
24
)
25
from haystack.tools.errors import ToolInvocationError
1✔
26
from haystack.tracing.utils import _serializable_value
1✔
27
from haystack.utils.callable_serialization import deserialize_callable, serialize_callable
1✔
28

29
logger = logging.getLogger(__name__)
1✔
30

31

32
class ToolInvokerError(Exception):
1✔
33
    """Base exception class for ToolInvoker errors."""
34

35
    def __init__(self, message: str):
1✔
36
        super().__init__(message)
1✔
37

38

39
class ToolNotFoundException(ToolInvokerError):
1✔
40
    """Exception raised when a tool is not found in the list of available tools."""
41

42
    def __init__(self, tool_name: str, available_tools: List[str]):
1✔
43
        message = f"Tool '{tool_name}' not found. Available tools: {', '.join(available_tools)}"
1✔
44
        super().__init__(message)
1✔
45

46

47
class StringConversionError(ToolInvokerError):
1✔
48
    """Exception raised when the conversion of a tool result to a string fails."""
49

50
    def __init__(self, tool_name: str, conversion_function: str, error: Exception):
1✔
51
        message = f"Failed to convert tool result from tool {tool_name} using '{conversion_function}'. Error: {error}"
1✔
52
        super().__init__(message)
1✔
53

54

55
class ToolOutputMergeError(ToolInvokerError):
1✔
56
    """Exception raised when merging tool outputs into state fails."""
57

58
    pass
1✔
59

60

61
@component
1✔
62
class ToolInvoker:
1✔
63
    """
64
    Invokes tools based on prepared tool calls and returns the results as a list of ChatMessage objects.
65

66
    Also handles reading/writing from a shared `State`.
67
    At initialization, the ToolInvoker component is provided with a list of available tools.
68
    At runtime, the component processes a list of ChatMessage object containing tool calls
69
    and invokes the corresponding tools.
70
    The results of the tool invocations are returned as a list of ChatMessage objects with tool role.
71

72
    Usage example:
73
    ```python
74
    from haystack.dataclasses import ChatMessage, ToolCall
75
    from haystack.tools import Tool
76
    from haystack.components.tools import ToolInvoker
77

78
    # Tool definition
79
    def dummy_weather_function(city: str):
80
        return f"The weather in {city} is 20 degrees."
81

82
    parameters = {"type": "object",
83
                "properties": {"city": {"type": "string"}},
84
                "required": ["city"]}
85

86
    tool = Tool(name="weather_tool",
87
                description="A tool to get the weather",
88
                function=dummy_weather_function,
89
                parameters=parameters)
90

91
    # Usually, the ChatMessage with tool_calls is generated by a Language Model
92
    # Here, we create it manually for demonstration purposes
93
    tool_call = ToolCall(
94
        tool_name="weather_tool",
95
        arguments={"city": "Berlin"}
96
    )
97
    message = ChatMessage.from_assistant(tool_calls=[tool_call])
98

99
    # ToolInvoker initialization and run
100
    invoker = ToolInvoker(tools=[tool])
101
    result = invoker.run(messages=[message])
102

103
    print(result)
104
    ```
105

106
    ```
107
    >>  {
108
    >>      'tool_messages': [
109
    >>          ChatMessage(
110
    >>              _role=<ChatRole.TOOL: 'tool'>,
111
    >>              _content=[
112
    >>                  ToolCallResult(
113
    >>                      result='"The weather in Berlin is 20 degrees."',
114
    >>                      origin=ToolCall(
115
    >>                          tool_name='weather_tool',
116
    >>                          arguments={'city': 'Berlin'},
117
    >>                          id=None
118
    >>                      )
119
    >>                  )
120
    >>              ],
121
    >>              _meta={}
122
    >>          )
123
    >>      ]
124
    >>  }
125
    ```
126

127
    Usage example with a Toolset:
128
    ```python
129
    from haystack.dataclasses import ChatMessage, ToolCall
130
    from haystack.tools import Tool, Toolset
131
    from haystack.components.tools import ToolInvoker
132

133
    # Tool definition
134
    def dummy_weather_function(city: str):
135
        return f"The weather in {city} is 20 degrees."
136

137
    parameters = {"type": "object",
138
                "properties": {"city": {"type": "string"}},
139
                "required": ["city"]}
140

141
    tool = Tool(name="weather_tool",
142
                description="A tool to get the weather",
143
                function=dummy_weather_function,
144
                parameters=parameters)
145

146
    # Create a Toolset
147
    toolset = Toolset([tool])
148

149
    # Usually, the ChatMessage with tool_calls is generated by a Language Model
150
    # Here, we create it manually for demonstration purposes
151
    tool_call = ToolCall(
152
        tool_name="weather_tool",
153
        arguments={"city": "Berlin"}
154
    )
155
    message = ChatMessage.from_assistant(tool_calls=[tool_call])
156

157
    # ToolInvoker initialization and run with Toolset
158
    invoker = ToolInvoker(tools=toolset)
159
    result = invoker.run(messages=[message])
160

161
    print(result)
162
    """
163

164
    def __init__(
1✔
165
        self,
166
        tools: Union[List[Tool], Toolset],
167
        raise_on_failure: bool = True,
168
        convert_result_to_json_string: bool = False,
169
        streaming_callback: Optional[StreamingCallbackT] = None,
170
        *,
171
        enable_streaming_callback_passthrough: bool = False,
172
        async_executor: Optional[ThreadPoolExecutor] = None,
173
    ):
174
        """
175
        Initialize the ToolInvoker component.
176

177
        :param tools:
178
            A list of tools that can be invoked or a Toolset instance that can resolve tools.
179
        :param raise_on_failure:
180
            If True, the component will raise an exception in case of errors
181
            (tool not found, tool invocation errors, tool result conversion errors).
182
            If False, the component will return a ChatMessage object with `error=True`
183
            and a description of the error in `result`.
184
        :param convert_result_to_json_string:
185
            If True, the tool invocation result will be converted to a string using `json.dumps`.
186
            If False, the tool invocation result will be converted to a string using `str`.
187
        :param streaming_callback:
188
            A callback function that will be called to emit tool results.
189
            Note that the result is only emitted once it becomes available — it is not
190
            streamed incrementally in real time.
191
        :param enable_streaming_callback_passthrough:
192
            If True, the `streaming_callback` will be passed to the tool invocation if the tool supports it.
193
            This allows tools to stream their results back to the client.
194
            Note that this requires the tool to have a `streaming_callback` parameter in its `invoke` method signature.
195
            If False, the `streaming_callback` will not be passed to the tool invocation.
196
        :param async_executor:
197
            Optional ThreadPoolExecutor to use for async calls. If not provided, a single-threaded executor will be
198
            initialized and used.
199
        :raises ValueError:
200
            If no tools are provided or if duplicate tool names are found.
201
        """
202
        if not tools:
1✔
203
            raise ValueError("ToolInvoker requires at least one tool.")
1✔
204

205
        # could be a Toolset instance or a list of Tools
206
        self.tools = tools
1✔
207
        self.streaming_callback = streaming_callback
1✔
208
        self.enable_streaming_callback_passthrough = enable_streaming_callback_passthrough
1✔
209

210
        # Convert Toolset to list for internal use
211
        if isinstance(tools, Toolset):
1✔
212
            converted_tools = list(tools)
1✔
213
        else:
214
            converted_tools = tools
1✔
215

216
        _check_duplicate_tool_names(converted_tools)
1✔
217
        tool_names = [tool.name for tool in converted_tools]
1✔
218
        duplicates = {name for name in tool_names if tool_names.count(name) > 1}
1✔
219
        if duplicates:
1✔
220
            raise ValueError(f"Duplicate tool names found: {duplicates}")
×
221

222
        self._tools_with_names = dict(zip(tool_names, converted_tools))
1✔
223
        self.raise_on_failure = raise_on_failure
1✔
224
        self.convert_result_to_json_string = convert_result_to_json_string
1✔
225
        self._owns_executor = async_executor is None
1✔
226
        self.executor = (
1✔
227
            ThreadPoolExecutor(thread_name_prefix=f"async-ToolInvoker-executor-{id(self)}", max_workers=1)
228
            if async_executor is None
229
            else async_executor
230
        )
231

232
    def __del__(self):
1✔
233
        """
234
        Cleanup when the instance is being destroyed.
235
        """
236
        if hasattr(self, "_owns_executor") and self._owns_executor and hasattr(self, "executor"):
1✔
237
            self.executor.shutdown(wait=True)
1✔
238

239
    def shutdown(self):
1✔
240
        """
241
        Explicitly shutdown the executor if we own it.
242
        """
243
        if self._owns_executor:
×
244
            self.executor.shutdown(wait=True)
×
245

246
    def _handle_error(self, error: Exception) -> str:
1✔
247
        """
248
        Handles errors by logging and either raising or returning a fallback error message.
249

250
        :param error: The exception instance.
251
        :returns: The fallback error message when `raise_on_failure` is False.
252
        :raises: The provided error if `raise_on_failure` is True.
253
        """
254
        logger.error("{error_exception}", error_exception=error)
1✔
255
        if self.raise_on_failure:
1✔
256
            # We re-raise the original error maintaining the exception chain
257
            raise error
1✔
258
        return str(error)
1✔
259

260
    def _default_output_to_string_handler(self, result: Any) -> str:
1✔
261
        """
262
        Default handler for converting a tool result to a string.
263

264
        :param result: The tool result to convert to a string.
265
        :returns: The converted tool result as a string.
266
        """
267
        # We iterate through all items in result and call to_dict() if present
268
        # Relevant for a few reasons:
269
        # - If using convert_result_to_json_string we'd rather convert Haystack objects to JSON serializable dicts
270
        # - If using default str() we prefer converting Haystack objects to dicts rather than relying on the
271
        #   __repr__ method
272
        serializable = _serializable_value(result)
1✔
273

274
        if self.convert_result_to_json_string:
1✔
275
            try:
×
276
                # We disable ensure_ascii so special chars like emojis are not converted
277
                str_result = json.dumps(serializable, ensure_ascii=False)
×
278
            except Exception as error:
×
279
                # If the result is not JSON serializable, we fall back to str
280
                logger.warning(
×
281
                    "Tool result is not JSON serializable. Falling back to str conversion. "
282
                    "Result: {result}\n"
283
                    "Error: {error}",
284
                    result=result,
285
                    err=error,
286
                )
287
                str_result = str(result)
×
288
            return str_result
×
289

290
        return str(serializable)
1✔
291

292
    def _prepare_tool_result_message(self, result: Any, tool_call: ToolCall, tool_to_invoke: Tool) -> ChatMessage:
1✔
293
        """
294
        Prepares a ChatMessage with the result of a tool invocation.
295

296
        :param result:
297
            The tool result.
298
        :param tool_call:
299
            The ToolCall object containing the tool name and arguments.
300
        :param tool_to_invoke:
301
            The Tool object that was invoked.
302
        :returns:
303
            A ChatMessage object containing the tool result as a string.
304
        :raises
305
            StringConversionError: If the conversion of the tool result to a string fails
306
            and `raise_on_failure` is True.
307
        """
308
        source_key = None
1✔
309
        output_to_string_handler = None
1✔
310
        if tool_to_invoke.outputs_to_string is not None:
1✔
311
            if tool_to_invoke.outputs_to_string.get("source"):
1✔
312
                source_key = tool_to_invoke.outputs_to_string["source"]
×
313
            if tool_to_invoke.outputs_to_string.get("handler"):
1✔
314
                output_to_string_handler = tool_to_invoke.outputs_to_string["handler"]
1✔
315

316
        # If a source key is provided, we extract the result from the source key
317
        if source_key is not None:
1✔
318
            result_to_convert = result.get(source_key)
×
319
        else:
320
            result_to_convert = result
1✔
321

322
        # If no handler is provided, we use the default handler
323
        if output_to_string_handler is None:
1✔
324
            output_to_string_handler = self._default_output_to_string_handler
1✔
325

326
        error = False
1✔
327
        try:
1✔
328
            tool_result_str = output_to_string_handler(result_to_convert)
1✔
329
        except Exception as e:
1✔
330
            try:
1✔
331
                tool_result_str = self._handle_error(
1✔
332
                    StringConversionError(tool_call.tool_name, output_to_string_handler.__name__, e)
333
                )
334
                error = True
1✔
335
            except StringConversionError as conversion_error:
1✔
336
                # If _handle_error re-raises, this properly preserves the chain
337
                raise conversion_error from e
1✔
338
        return ChatMessage.from_tool(tool_result=tool_result_str, error=error, origin=tool_call)
1✔
339

340
    def _get_func_params(self, tool: Tool) -> Set:
1✔
341
        """
342
        Returns the function parameters of the tool's invoke method.
343

344
        This method inspects the tool's function signature to determine which parameters the tool accepts.
345
        """
346
        # ComponentTool wraps the function with a function that accepts kwargs, so we need to look at input sockets
347
        # to find out which parameters the tool accepts.
348
        if isinstance(tool, ComponentTool):
1✔
349
            # mypy doesn't know that ComponentMeta always adds __haystack_input__ to Component
350
            assert hasattr(tool._component, "__haystack_input__") and isinstance(
1✔
351
                tool._component.__haystack_input__, Sockets
352
            )
353
            func_params = set(tool._component.__haystack_input__._sockets_dict.keys())
1✔
354
        else:
355
            func_params = set(inspect.signature(tool.function).parameters.keys())
1✔
356

357
        return func_params
1✔
358

359
    def _inject_state_args(self, tool: Tool, llm_args: Dict[str, Any], state: State) -> Dict[str, Any]:
1✔
360
        """
361
        Combine LLM-provided arguments (llm_args) with state-based arguments.
362

363
        Tool arguments take precedence in the following order:
364
          - LLM overrides state if the same param is present in both
365
          - local tool.inputs mappings (if any)
366
          - function signature name matching
367
        """
368
        final_args = dict(llm_args)  # start with LLM-provided
1✔
369
        func_params = self._get_func_params(tool)
1✔
370

371
        # Determine the source of parameter mappings (explicit tool inputs or direct function parameters)
372
        # Typically, a "Tool" might have .inputs_from_state = {"state_key": "tool_param_name"}
373
        if hasattr(tool, "inputs_from_state") and isinstance(tool.inputs_from_state, dict):
1✔
374
            param_mappings = tool.inputs_from_state
1✔
375
        else:
376
            param_mappings = {name: name for name in func_params}
1✔
377

378
        # Populate final_args from state if not provided by LLM
379
        for state_key, param_name in param_mappings.items():
1✔
380
            if param_name not in final_args and state.has(state_key):
1✔
381
                final_args[param_name] = state.get(state_key)
1✔
382

383
        return final_args
1✔
384

385
    @staticmethod
1✔
386
    def _merge_tool_outputs(tool: Tool, result: Any, state: State) -> None:
1✔
387
        """
388
        Merges the tool result into the State.
389

390
        This method processes the output of a tool execution and integrates it into the global state.
391
        It also determines what message, if any, should be returned for further processing in a conversation.
392

393
        Processing Steps:
394
        1. If `result` is not a dictionary, nothing is stored into state and the full `result` is returned.
395
        2. If the `tool` does not define an `outputs_to_state` mapping nothing is stored into state.
396
           The return value in this case is simply the full `result` dictionary.
397
        3. If the tool defines an `outputs_to_state` mapping (a dictionary describing how the tool's output should be
398
           processed), the method delegates to `_handle_tool_outputs` to process the output accordingly.
399
           This allows certain fields in `result` to be mapped explicitly to state fields or formatted using custom
400
           handlers.
401

402
        :param tool: Tool instance containing optional `outputs_to_state` mapping to guide result processing.
403
        :param result: The output from tool execution. Can be a dictionary, or any other type.
404
        :param state: The global State object to which results should be merged.
405
        :returns: Three possible values:
406
            - A string message for conversation
407
            - The merged result dictionary
408
            - Or the raw result if not a dictionary
409
        """
410
        # If result is not a dictionary we exit
411
        if not isinstance(result, dict):
1✔
412
            return
1✔
413

414
        # If there is no specific `outputs_to_state` mapping, we exit
415
        if not hasattr(tool, "outputs_to_state") or not isinstance(tool.outputs_to_state, dict):
1✔
416
            return
1✔
417

418
        # Update the state with the tool outputs
419
        for state_key, config in tool.outputs_to_state.items():
1✔
420
            # Get the source key from the output config, otherwise use the entire result
421
            source_key = config.get("source", None)
1✔
422
            output_value = result if source_key is None else result.get(source_key)
1✔
423

424
            # Get the handler function, if any
425
            handler = config.get("handler", None)
1✔
426

427
            # Merge other outputs into the state
428
            state.set(state_key, output_value, handler_override=handler)
1✔
429

430
    @component.output_types(tool_messages=List[ChatMessage], state=State)
1✔
431
    def run(
1✔
432
        self,
433
        messages: List[ChatMessage],
434
        state: Optional[State] = None,
435
        streaming_callback: Optional[StreamingCallbackT] = None,
436
        *,
437
        enable_streaming_callback_passthrough: Optional[bool] = None,
438
    ) -> Dict[str, Any]:
439
        """
440
        Processes ChatMessage objects containing tool calls and invokes the corresponding tools, if available.
441

442
        :param messages:
443
            A list of ChatMessage objects.
444
        :param state: The runtime state that should be used by the tools.
445
        :param streaming_callback: A callback function that will be called to emit tool results.
446
            Note that the result is only emitted once it becomes available — it is not
447
            streamed incrementally in real time.
448
        :param enable_streaming_callback_passthrough:
449
            If True, the `streaming_callback` will be passed to the tool invocation if the tool supports it.
450
            This allows tools to stream their results back to the client.
451
            Note that this requires the tool to have a `streaming_callback` parameter in its `invoke` method signature.
452
            If False, the `streaming_callback` will not be passed to the tool invocation.
453
            If None, the value from the constructor will be used.
454
        :returns:
455
            A dictionary with the key `tool_messages` containing a list of ChatMessage objects with tool role.
456
            Each ChatMessage objects wraps the result of a tool invocation.
457

458
        :raises ToolNotFoundException:
459
            If the tool is not found in the list of available tools and `raise_on_failure` is True.
460
        :raises ToolInvocationError:
461
            If the tool invocation fails and `raise_on_failure` is True.
462
        :raises StringConversionError:
463
            If the conversion of the tool result to a string fails and `raise_on_failure` is True.
464
        :raises ToolOutputMergeError:
465
            If merging tool outputs into state fails and `raise_on_failure` is True.
466
        """
467
        if state is None:
1✔
468
            state = State(schema={})
1✔
469

470
        resolved_enable_streaming_passthrough = (
1✔
471
            enable_streaming_callback_passthrough
472
            if enable_streaming_callback_passthrough is not None
473
            else self.enable_streaming_callback_passthrough
474
        )
475

476
        # Only keep messages with tool calls
477
        messages_with_tool_calls = [message for message in messages if message.tool_calls]
1✔
478
        streaming_callback = select_streaming_callback(
1✔
479
            init_callback=self.streaming_callback, runtime_callback=streaming_callback, requires_async=False
480
        )
481

482
        tool_messages = []
1✔
483
        for message in messages_with_tool_calls:
1✔
484
            for tool_call in message.tool_calls:
1✔
485
                tool_name = tool_call.tool_name
1✔
486

487
                # Check if the tool is available, otherwise return an error message
488
                if tool_name not in self._tools_with_names:
1✔
489
                    error_message = self._handle_error(
1✔
490
                        ToolNotFoundException(tool_name, list(self._tools_with_names.keys()))
491
                    )
492
                    tool_messages.append(ChatMessage.from_tool(tool_result=error_message, origin=tool_call, error=True))
1✔
493
                    continue
1✔
494

495
                tool_to_invoke = self._tools_with_names[tool_name]
1✔
496

497
                # 1) Combine user + state inputs
498
                llm_args = tool_call.arguments.copy()
1✔
499
                final_args = self._inject_state_args(tool_to_invoke, llm_args, state)
1✔
500

501
                # Check whether to inject streaming_callback
502
                if (
1✔
503
                    resolved_enable_streaming_passthrough
504
                    and streaming_callback is not None
505
                    and "streaming_callback" not in final_args
506
                ):
507
                    invoke_params = self._get_func_params(tool_to_invoke)
1✔
508
                    if "streaming_callback" in invoke_params:
1✔
509
                        final_args["streaming_callback"] = streaming_callback
1✔
510

511
                # 2) Invoke the tool
512
                try:
1✔
513
                    tool_result = tool_to_invoke.invoke(**final_args)
1✔
514

515
                except ToolInvocationError as e:
1✔
516
                    error_message = self._handle_error(e)
1✔
517
                    tool_messages.append(ChatMessage.from_tool(tool_result=error_message, origin=tool_call, error=True))
1✔
518
                    continue
1✔
519

520
                # 3) Merge outputs into state
521
                try:
1✔
522
                    self._merge_tool_outputs(tool_to_invoke, tool_result, state)
1✔
523
                except Exception as e:
×
524
                    try:
×
525
                        error_message = self._handle_error(
×
526
                            ToolOutputMergeError(f"Failed to merge tool outputs from tool {tool_name} into State: {e}")
527
                        )
528
                        tool_messages.append(
×
529
                            ChatMessage.from_tool(tool_result=error_message, origin=tool_call, error=True)
530
                        )
531
                        continue
×
532
                    except ToolOutputMergeError as propagated_e:
×
533
                        # Re-raise with proper error chain
534
                        raise propagated_e from e
×
535

536
                # 4) Prepare the tool result ChatMessage message
537
                tool_messages.append(
1✔
538
                    self._prepare_tool_result_message(
539
                        result=tool_result, tool_call=tool_call, tool_to_invoke=tool_to_invoke
540
                    )
541
                )
542

543
                if streaming_callback is not None:
1✔
544
                    streaming_callback(
1✔
545
                        StreamingChunk(
546
                            content="",
547
                            index=len(tool_messages) - 1,
548
                            tool_call_result=tool_messages[-1].tool_call_results[0],
549
                            start=True,
550
                            meta={"tool_result": tool_messages[-1].tool_call_results[0].result, "tool_call": tool_call},
551
                        )
552
                    )
553

554
        # We stream one more chunk that contains a finish_reason if tool_messages were generated
555
        if len(tool_messages) > 0 and streaming_callback is not None:
1✔
556
            streaming_callback(
1✔
557
                StreamingChunk(
558
                    content="", finish_reason="tool_call_results", meta={"finish_reason": "tool_call_results"}
559
                )
560
            )
561

562
        return {"tool_messages": tool_messages, "state": state}
1✔
563

564
    @component.output_types(tool_messages=List[ChatMessage], state=State)
1✔
565
    async def run_async(
1✔
566
        self,
567
        messages: List[ChatMessage],
568
        state: Optional[State] = None,
569
        streaming_callback: Optional[StreamingCallbackT] = None,
570
        *,
571
        enable_streaming_callback_passthrough: Optional[bool] = None,
572
    ) -> Dict[str, Any]:
573
        """
574
        Asynchronously processes ChatMessage objects containing tool calls and invokes the corresponding tools.
575

576
        :param messages:
577
            A list of ChatMessage objects.
578
        :param state: The runtime state that should be used by the tools.
579
        :param streaming_callback: An asynchronous callback function that will be called to emit tool results.
580
            Note that the result is only emitted once it becomes available — it is not
581
            streamed incrementally in real time.
582
        :param enable_streaming_callback_passthrough:
583
            If True, the `streaming_callback` will be passed to the tool invocation if the tool supports it.
584
            This allows tools to stream their results back to the client.
585
            Note that this requires the tool to have a `streaming_callback` parameter in its `invoke` method signature.
586
            If False, the `streaming_callback` will not be passed to the tool invocation.
587
            If None, the value from the constructor will be used.
588
        :returns:
589
            A dictionary with the key `tool_messages` containing a list of ChatMessage objects with tool role.
590
            Each ChatMessage objects wraps the result of a tool invocation.
591

592
        :raises ToolNotFoundException:
593
            If the tool is not found in the list of available tools and `raise_on_failure` is True.
594
        :raises ToolInvocationError:
595
            If the tool invocation fails and `raise_on_failure` is True.
596
        :raises StringConversionError:
597
            If the conversion of the tool result to a string fails and `raise_on_failure` is True.
598
        :raises ToolOutputMergeError:
599
            If merging tool outputs into state fails and `raise_on_failure` is True.
600
        """
601
        if state is None:
1✔
602
            state = State(schema={})
1✔
603

604
        resolved_enable_streaming_passthrough = (
1✔
605
            enable_streaming_callback_passthrough
606
            if enable_streaming_callback_passthrough is not None
607
            else self.enable_streaming_callback_passthrough
608
        )
609

610
        # Only keep messages with tool calls
611
        messages_with_tool_calls = [message for message in messages if message.tool_calls]
1✔
612
        streaming_callback = select_streaming_callback(
1✔
613
            init_callback=self.streaming_callback, runtime_callback=streaming_callback, requires_async=True
614
        )
615

616
        tool_messages = []
1✔
617
        for message in messages_with_tool_calls:
1✔
618
            for tool_call in message.tool_calls:
1✔
619
                tool_name = tool_call.tool_name
1✔
620

621
                # Check if the tool is available, otherwise return an error message
622
                if tool_name not in self._tools_with_names:
1✔
623
                    error_message = self._handle_error(
×
624
                        ToolNotFoundException(tool_name, list(self._tools_with_names.keys()))
625
                    )
626
                    tool_messages.append(ChatMessage.from_tool(tool_result=error_message, origin=tool_call, error=True))
×
627
                    continue
×
628

629
                tool_to_invoke = self._tools_with_names[tool_name]
1✔
630

631
                # 1) Combine user + state inputs
632
                llm_args = tool_call.arguments.copy()
1✔
633
                final_args = self._inject_state_args(tool_to_invoke, llm_args, state)
1✔
634

635
                # Check whether to inject streaming_callback
636
                if (
1✔
637
                    resolved_enable_streaming_passthrough
638
                    and streaming_callback is not None
639
                    and "streaming_callback" not in final_args
640
                ):
641
                    invoke_params = self._get_func_params(tool_to_invoke)
×
642
                    if "streaming_callback" in invoke_params:
×
643
                        final_args["streaming_callback"] = streaming_callback
×
644

645
                # 2) Invoke the tool asynchronously
646
                try:
1✔
647
                    tool_result = await asyncio.get_running_loop().run_in_executor(
1✔
648
                        self.executor, partial(tool_to_invoke.invoke, **final_args)
649
                    )
650

651
                except ToolInvocationError as e:
×
652
                    error_message = self._handle_error(e)
×
653
                    tool_messages.append(ChatMessage.from_tool(tool_result=error_message, origin=tool_call, error=True))
×
654
                    continue
×
655

656
                # 3) Merge outputs into state
657
                try:
1✔
658
                    self._merge_tool_outputs(tool_to_invoke, tool_result, state)
1✔
659
                except Exception as e:
×
660
                    try:
×
661
                        error_message = self._handle_error(
×
662
                            ToolOutputMergeError(f"Failed to merge tool outputs from tool {tool_name} into State: {e}")
663
                        )
664
                        tool_messages.append(
×
665
                            ChatMessage.from_tool(tool_result=error_message, origin=tool_call, error=True)
666
                        )
667
                        continue
×
668
                    except ToolOutputMergeError as propagated_e:
×
669
                        # Re-raise with proper error chain
670
                        raise propagated_e from e
×
671

672
                # 4) Prepare the tool result ChatMessage message
673
                tool_messages.append(
1✔
674
                    self._prepare_tool_result_message(
675
                        result=tool_result, tool_call=tool_call, tool_to_invoke=tool_to_invoke
676
                    )
677
                )
678

679
                if streaming_callback is not None:
1✔
680
                    await streaming_callback(
1✔
681
                        StreamingChunk(
682
                            content="",
683
                            index=len(tool_messages) - 1,
684
                            tool_call_result=tool_messages[-1].tool_call_results[0],
685
                            start=True,
686
                            meta={"tool_result": tool_messages[-1].tool_call_results[0].result, "tool_call": tool_call},
687
                        )
688
                    )
689

690
        # We stream one more chunk that contains a finish_reason if tool_messages were generated
691
        if len(tool_messages) > 0 and streaming_callback is not None:
1✔
692
            await streaming_callback(
1✔
693
                StreamingChunk(
694
                    content="", finish_reason="tool_call_results", meta={"finish_reason": "tool_call_results"}
695
                )
696
            )
697

698
        return {"tool_messages": tool_messages, "state": state}
1✔
699

700
    def to_dict(self) -> Dict[str, Any]:
1✔
701
        """
702
        Serializes the component to a dictionary.
703

704
        :returns:
705
            Dictionary with serialized data.
706
        """
707
        if self.streaming_callback is not None:
1✔
708
            streaming_callback = serialize_callable(self.streaming_callback)
1✔
709
        else:
710
            streaming_callback = None
1✔
711

712
        return default_to_dict(
1✔
713
            self,
714
            tools=serialize_tools_or_toolset(self.tools),
715
            raise_on_failure=self.raise_on_failure,
716
            convert_result_to_json_string=self.convert_result_to_json_string,
717
            streaming_callback=streaming_callback,
718
            enable_streaming_callback_passthrough=self.enable_streaming_callback_passthrough,
719
        )
720

721
    @classmethod
1✔
722
    def from_dict(cls, data: Dict[str, Any]) -> "ToolInvoker":
1✔
723
        """
724
        Deserializes the component from a dictionary.
725

726
        :param data:
727
            The dictionary to deserialize from.
728
        :returns:
729
            The deserialized component.
730
        """
731
        deserialize_tools_or_toolset_inplace(data["init_parameters"], key="tools")
1✔
732
        if data["init_parameters"].get("streaming_callback") is not None:
1✔
733
            data["init_parameters"]["streaming_callback"] = deserialize_callable(
1✔
734
                data["init_parameters"]["streaming_callback"]
735
            )
736
        return default_from_dict(cls, data)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc