• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 18530018322

15 Oct 2025 01:10PM UTC coverage: 92.075% (-0.03%) from 92.103%
18530018322

Pull #9880

github

web-flow
Merge 6dad544fe into cfa5d2761
Pull Request #9880: draft: Expand tools param to include list[Toolset]

13279 of 14422 relevant lines covered (92.07%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.89
haystack/core/pipeline/breakpoint.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import json
1✔
6
from copy import deepcopy
1✔
7
from dataclasses import replace
1✔
8
from datetime import datetime
1✔
9
from pathlib import Path
1✔
10
from typing import TYPE_CHECKING, Any, Optional, Union
1✔
11

12
from networkx import MultiDiGraph
1✔
13

14
from haystack import logging
1✔
15
from haystack.core.errors import BreakpointException, PipelineInvalidPipelineSnapshotError
1✔
16
from haystack.dataclasses import ChatMessage
1✔
17
from haystack.dataclasses.breakpoints import (
1✔
18
    AgentBreakpoint,
19
    AgentSnapshot,
20
    Breakpoint,
21
    PipelineSnapshot,
22
    PipelineState,
23
    ToolBreakpoint,
24
)
25
from haystack.utils.base_serialization import _serialize_value_with_schema
1✔
26
from haystack.utils.misc import _get_output_dir
1✔
27

28
if TYPE_CHECKING:
29
    from haystack.components.agents.agent import _ExecutionContext
30
    from haystack.tools.tool import Tool
31
    from haystack.tools.toolset import Toolset
32

33
logger = logging.getLogger(__name__)
1✔
34

35

36
def _validate_break_point_against_pipeline(
1✔
37
    break_point: Union[Breakpoint, AgentBreakpoint], graph: MultiDiGraph
38
) -> None:
39
    """
40
    Validates the breakpoints passed to the pipeline.
41

42
    Makes sure the breakpoint contains a valid components registered in the pipeline.
43

44
    :param break_point: a breakpoint to validate, can be Breakpoint or AgentBreakpoint
45
    """
46

47
    # all Breakpoints must refer to a valid component in the pipeline
48
    if isinstance(break_point, Breakpoint) and break_point.component_name not in graph.nodes:
1✔
49
        raise ValueError(f"break_point {break_point} is not a registered component in the pipeline")
×
50

51
    if isinstance(break_point, AgentBreakpoint):
1✔
52
        breakpoint_agent_component = graph.nodes.get(break_point.agent_name)
1✔
53
        if not breakpoint_agent_component:
1✔
54
            raise ValueError(f"break_point {break_point} is not a registered Agent component in the pipeline")
×
55

56
        if isinstance(break_point.break_point, ToolBreakpoint):
1✔
57
            instance = breakpoint_agent_component["instance"]
1✔
58
            for tool in instance.tools:
1✔
59
                if break_point.break_point.tool_name == tool.name:
1✔
60
                    break
1✔
61
            else:
62
                raise ValueError(
×
63
                    f"break_point {break_point.break_point} is not a registered tool in the Agent component"
64
                )
65

66

67
def _validate_pipeline_snapshot_against_pipeline(pipeline_snapshot: PipelineSnapshot, graph: MultiDiGraph) -> None:
1✔
68
    """
69
    Validates that the pipeline_snapshot contains valid configuration for the current pipeline.
70

71
    Raises a PipelineInvalidPipelineSnapshotError if any component in pipeline_snapshot is not part of the
72
    target pipeline.
73

74
    :param pipeline_snapshot: The saved state to validate.
75
    """
76

77
    pipeline_state = pipeline_snapshot.pipeline_state
1✔
78
    valid_components = set(graph.nodes.keys())
1✔
79

80
    # Check if the ordered_component_names are valid components in the pipeline
81
    invalid_ordered_components = set(pipeline_snapshot.ordered_component_names) - valid_components
1✔
82
    if invalid_ordered_components:
1✔
83
        raise PipelineInvalidPipelineSnapshotError(
×
84
            f"Invalid pipeline snapshot: components {invalid_ordered_components} in 'ordered_component_names' "
85
            f"are not part of the current pipeline."
86
        )
87

88
    # Check if the original_input_data is valid components in the pipeline
89
    serialized_input_data = pipeline_snapshot.original_input_data["serialized_data"]
1✔
90
    invalid_input_data = set(serialized_input_data.keys()) - valid_components
1✔
91
    if invalid_input_data:
1✔
92
        raise PipelineInvalidPipelineSnapshotError(
×
93
            f"Invalid pipeline snapshot: components {invalid_input_data} in 'input_data' "
94
            f"are not part of the current pipeline."
95
        )
96

97
    # Validate 'component_visits'
98
    invalid_component_visits = set(pipeline_state.component_visits.keys()) - valid_components
1✔
99
    if invalid_component_visits:
1✔
100
        raise PipelineInvalidPipelineSnapshotError(
×
101
            f"Invalid pipeline snapshot: components {invalid_component_visits} in 'component_visits' "
102
            f"are not part of the current pipeline."
103
        )
104

105
    if isinstance(pipeline_snapshot.break_point, AgentBreakpoint):
1✔
106
        component_name = pipeline_snapshot.break_point.agent_name
1✔
107
    else:
108
        component_name = pipeline_snapshot.break_point.component_name
×
109

110
    visit_count = pipeline_snapshot.pipeline_state.component_visits[component_name]
1✔
111

112
    logger.info(
1✔
113
        "Resuming pipeline from {component} with visit count {visits}", component=component_name, visits=visit_count
114
    )
115

116

117
def load_pipeline_snapshot(file_path: Union[str, Path]) -> PipelineSnapshot:
1✔
118
    """
119
    Load a saved pipeline snapshot.
120

121
    :param file_path: Path to the pipeline_snapshot file.
122
    :returns:
123
        Dict containing the loaded pipeline_snapshot.
124
    """
125

126
    file_path = Path(file_path)
1✔
127

128
    try:
1✔
129
        with open(file_path, "r", encoding="utf-8") as f:
1✔
130
            pipeline_snapshot_dict = json.load(f)
1✔
131
    except FileNotFoundError:
×
132
        raise FileNotFoundError(f"File not found: {file_path}")
×
133
    except json.JSONDecodeError as e:
×
134
        raise json.JSONDecodeError(f"Invalid JSON file {file_path}: {str(e)}", e.doc, e.pos)
×
135
    except IOError as e:
×
136
        raise IOError(f"Error reading {file_path}: {str(e)}")
×
137

138
    try:
1✔
139
        pipeline_snapshot = PipelineSnapshot.from_dict(pipeline_snapshot_dict)
1✔
140
    except ValueError as e:
1✔
141
        raise ValueError(f"Invalid pipeline snapshot from {file_path}: {str(e)}")
1✔
142

143
    logger.info(f"Successfully loaded the pipeline snapshot from: {file_path}")
1✔
144
    return pipeline_snapshot
1✔
145

146

147
def _save_pipeline_snapshot(pipeline_snapshot: PipelineSnapshot, raise_on_failure: bool = True) -> None:
1✔
148
    """
149
    Save the pipeline snapshot dictionary to a JSON file.
150

151
    - The filename is generated based on the component name, visit count, and timestamp.
152
        - The component name is taken from the break point's `component_name`.
153
        - The visit count is taken from the pipeline state's `component_visits` for the component name.
154
        - The timestamp is taken from the pipeline snapshot's `timestamp` or the current time if not available.
155
    - The file path is taken from the break point's `snapshot_file_path`.
156
    - If the `snapshot_file_path` is None, the function will return without saving.
157

158
    :param pipeline_snapshot: The pipeline snapshot to save.
159
    :param raise_on_failure: If True, raises an exception if saving fails. If False, logs the error and returns.
160

161
    :raises:
162
        Exception: If saving the JSON snapshot fails.
163
    """
164
    break_point = pipeline_snapshot.break_point
1✔
165
    snapshot_file_path = (
1✔
166
        break_point.break_point.snapshot_file_path
167
        if isinstance(break_point, AgentBreakpoint)
168
        else break_point.snapshot_file_path
169
    )
170

171
    if snapshot_file_path is None:
1✔
172
        return
1✔
173

174
    dt = pipeline_snapshot.timestamp or datetime.now()
1✔
175
    snapshot_dir = Path(snapshot_file_path)
1✔
176

177
    # Generate filename
178
    # We check if the agent_name is provided to differentiate between agent and non-agent breakpoints
179
    if isinstance(break_point, AgentBreakpoint):
1✔
180
        agent_name = break_point.agent_name
1✔
181
        component_name = break_point.break_point.component_name
1✔
182
    else:
183
        component_name = break_point.component_name
1✔
184
        agent_name = None
1✔
185

186
    visit_nr = pipeline_snapshot.pipeline_state.component_visits.get(component_name, 0)
1✔
187
    timestamp = dt.strftime("%Y_%m_%d_%H_%M_%S")
1✔
188
    file_name = f"{agent_name + '_' if agent_name else ''}{component_name}_{visit_nr}_{timestamp}.json"
1✔
189
    full_path = snapshot_dir / file_name
1✔
190

191
    try:
1✔
192
        snapshot_dir.mkdir(parents=True, exist_ok=True)
1✔
193
        with open(full_path, "w") as f_out:
1✔
194
            json.dump(pipeline_snapshot.to_dict(), f_out, indent=2)
1✔
195
        logger.info(
1✔
196
            "Pipeline snapshot saved to '{full_path}'. You can use this file to debug or resume the pipeline.",
197
            full_path=full_path,
198
        )
199
    except Exception as error:
1✔
200
        logger.error("Failed to save pipeline snapshot to '{full_path}'. Error: {e}", full_path=full_path, e=error)
1✔
201
        if raise_on_failure:
1✔
202
            raise
1✔
203

204

205
def _create_pipeline_snapshot(
1✔
206
    *,
207
    inputs: dict[str, Any],
208
    component_inputs: dict[str, Any],
209
    break_point: Union[AgentBreakpoint, Breakpoint],
210
    component_visits: dict[str, int],
211
    original_input_data: dict[str, Any],
212
    ordered_component_names: list[str],
213
    include_outputs_from: set[str],
214
    pipeline_outputs: dict[str, Any],
215
) -> PipelineSnapshot:
216
    """
217
    Create a snapshot of the pipeline at the point where the breakpoint was triggered.
218

219
    :param inputs: The current pipeline snapshot inputs.
220
    :param component_inputs: The inputs to the component that triggered the breakpoint.
221
    :param break_point: The breakpoint that triggered the snapshot, can be AgentBreakpoint or Breakpoint.
222
    :param component_visits: The visit count of the component that triggered the breakpoint.
223
    :param original_input_data: The original input data.
224
    :param ordered_component_names: The ordered component names.
225
    :param include_outputs_from: Set of component names whose outputs should be included in the pipeline results.
226
    :param pipeline_outputs: The current outputs of the pipeline.
227
    :returns:
228
        A PipelineSnapshot containing the state of the pipeline at the point of the breakpoint.
229
    """
230
    if isinstance(break_point, AgentBreakpoint):
1✔
231
        component_name = break_point.agent_name
1✔
232
    else:
233
        component_name = break_point.component_name
1✔
234

235
    transformed_original_input_data = _transform_json_structure(original_input_data)
1✔
236
    transformed_inputs = _transform_json_structure({**inputs, component_name: component_inputs})
1✔
237

238
    try:
1✔
239
        serialized_inputs = _serialize_value_with_schema(transformed_inputs)
1✔
240
    except Exception as error:
1✔
241
        logger.warning(
1✔
242
            "Failed to serialize the inputs of the current pipeline state. "
243
            "The inputs in the snapshot will be replaced with an empty dictionary. Error: {e}",
244
            e=error,
245
        )
246
        serialized_inputs = {}
1✔
247

248
    try:
1✔
249
        serialized_original_input_data = _serialize_value_with_schema(transformed_original_input_data)
1✔
250
    except Exception as error:
1✔
251
        logger.warning(
1✔
252
            "Failed to serialize original input data for `pipeline.run`. "
253
            "This likely occurred due to non-serializable object types. "
254
            "The snapshot will store an empty dictionary instead. Error: {e}",
255
            e=error,
256
        )
257
        serialized_original_input_data = {}
1✔
258

259
    pipeline_snapshot = PipelineSnapshot(
1✔
260
        pipeline_state=PipelineState(
261
            inputs=serialized_inputs, component_visits=component_visits, pipeline_outputs=pipeline_outputs
262
        ),
263
        timestamp=datetime.now(),
264
        break_point=break_point,
265
        original_input_data=serialized_original_input_data,
266
        ordered_component_names=ordered_component_names,
267
        include_outputs_from=include_outputs_from,
268
    )
269
    return pipeline_snapshot
1✔
270

271

272
def _transform_json_structure(data: Union[dict[str, Any], list[Any], Any]) -> Any:
1✔
273
    """
274
    Transforms a JSON structure by removing the 'sender' key and moving the 'value' to the top level.
275

276
    For example:
277
    "key": [{"sender": null, "value": "some value"}] -> "key": "some value"
278

279
    :param data: The JSON structure to transform.
280
    :returns: The transformed structure.
281
    """
282
    if isinstance(data, dict):
1✔
283
        # If this dict has both 'sender' and 'value', return just the value
284
        if "value" in data and "sender" in data:
1✔
285
            return data["value"]
1✔
286
        # Otherwise, recursively process each key-value pair
287
        return {k: _transform_json_structure(v) for k, v in data.items()}
1✔
288

289
    if isinstance(data, list):
1✔
290
        # First, transform each item in the list.
291
        transformed = [_transform_json_structure(item) for item in data]
1✔
292
        # If the original list has exactly one element and that element was a dict
293
        # with 'sender' and 'value', then unwrap the list.
294
        if len(data) == 1 and isinstance(data[0], dict) and "value" in data[0] and "sender" in data[0]:
1✔
295
            return transformed[0]
1✔
296
        return transformed
1✔
297

298
    # For other data types, just return the value as is.
299
    return data
1✔
300

301

302
def _trigger_break_point(*, pipeline_snapshot: PipelineSnapshot) -> None:
1✔
303
    """
304
    Trigger a breakpoint by saving a snapshot and raising exception.
305

306
    :param pipeline_snapshot: The current pipeline snapshot containing the state and break point
307
    :raises PipelineBreakpointException: When breakpoint is triggered
308
    """
309
    _save_pipeline_snapshot(pipeline_snapshot=pipeline_snapshot)
1✔
310

311
    if isinstance(pipeline_snapshot.break_point, Breakpoint):
1✔
312
        component_name = pipeline_snapshot.break_point.component_name
1✔
313
    else:
314
        component_name = pipeline_snapshot.break_point.agent_name
×
315

316
    component_visits = pipeline_snapshot.pipeline_state.component_visits
1✔
317
    msg = f"Breaking at component {component_name} at visit count {component_visits[component_name]}"
1✔
318
    raise BreakpointException(
1✔
319
        message=msg,
320
        component=component_name,
321
        inputs=pipeline_snapshot.pipeline_state.inputs,
322
        results=pipeline_snapshot.pipeline_state.pipeline_outputs,
323
    )
324

325

326
def _create_agent_snapshot(
1✔
327
    *, component_visits: dict[str, int], agent_breakpoint: AgentBreakpoint, component_inputs: dict[str, Any]
328
) -> AgentSnapshot:
329
    """
330
    Create a snapshot of the agent's state.
331

332
    :param component_visits: The visit counts for the agent's components.
333
    :param agent_breakpoint: AgentBreakpoint object containing breakpoints
334
    :return: An AgentSnapshot containing the agent's state and component visits.
335
    """
336
    return AgentSnapshot(
1✔
337
        component_inputs={
338
            "chat_generator": _serialize_value_with_schema(deepcopy(component_inputs["chat_generator"])),
339
            "tool_invoker": _serialize_value_with_schema(deepcopy(component_inputs["tool_invoker"])),
340
        },
341
        component_visits=component_visits,
342
        break_point=agent_breakpoint,
343
        timestamp=datetime.now(),
344
    )
345

346

347
def _validate_tool_breakpoint_is_valid(
1✔
348
    agent_breakpoint: AgentBreakpoint, tools: Union[list["Tool"], "Toolset", list["Toolset"]]
349
) -> None:
350
    """
351
    Validates the AgentBreakpoint passed to the agent.
352

353
    Validates that the tool name in ToolBreakpoints correspond to a tool available in the agent.
354

355
    :param agent_breakpoint: AgentBreakpoint object containing breakpoints for the agent components.
356
    :param tools: List of Tool objects, a Toolset, or a list of Toolset instances that the agent can use.
357
    :raises ValueError: If any tool name in ToolBreakpoints is not available in the agent's tools.
358
    """
359
    from haystack.tools import flatten_tools_or_toolsets
1✔
360

361
    flat_tools = flatten_tools_or_toolsets(tools)
1✔
362
    available_tool_names = {tool.name for tool in flat_tools}
1✔
363
    tool_breakpoint = agent_breakpoint.break_point
1✔
364
    # Assert added for mypy to pass, but this is already checked before this function is called
365
    assert isinstance(tool_breakpoint, ToolBreakpoint)
1✔
366
    if tool_breakpoint.tool_name and tool_breakpoint.tool_name not in available_tool_names:
1✔
367
        raise ValueError(f"Tool '{tool_breakpoint.tool_name}' is not available in the agent's tools")
1✔
368

369

370
def _create_pipeline_snapshot_from_chat_generator(
1✔
371
    *,
372
    execution_context: "_ExecutionContext",
373
    agent_name: Optional[str] = None,
374
    break_point: Optional[AgentBreakpoint] = None,
375
    parent_snapshot: Optional[PipelineSnapshot] = None,
376
) -> PipelineSnapshot:
377
    """
378
    Create a pipeline snapshot when a chat generator breakpoint is raised or an exception during execution occurs.
379

380
    :param execution_context: The current execution context of the agent.
381
    :param agent_name: The name of the agent component if present in a pipeline.
382
    :param break_point: An optional AgentBreakpoint object. If provided, it will be used instead of creating a new one.
383
        A scenario where a new breakpoint is created is when an exception occurs during chat generation and we want to
384
        capture the state at that point.
385
    :param parent_snapshot: An optional parent PipelineSnapshot to build upon.
386
    :returns:
387
        A PipelineSnapshot containing the state of the pipeline and agent at the point of the breakpoint or exception.
388
    """
389
    if break_point is None:
1✔
390
        agent_breakpoint = AgentBreakpoint(
×
391
            agent_name=agent_name or "agent",
392
            break_point=Breakpoint(
393
                component_name="chat_generator",
394
                visit_count=execution_context.component_visits["chat_generator"],
395
                snapshot_file_path=_get_output_dir("pipeline_snapshot"),
396
            ),
397
        )
398
    else:
399
        agent_breakpoint = break_point
1✔
400

401
    agent_snapshot = _create_agent_snapshot(
1✔
402
        component_visits=execution_context.component_visits,
403
        agent_breakpoint=agent_breakpoint,
404
        component_inputs={
405
            "chat_generator": {
406
                "messages": execution_context.state.data["messages"],
407
                **execution_context.chat_generator_inputs,
408
            },
409
            "tool_invoker": {"messages": [], "state": execution_context.state, **execution_context.tool_invoker_inputs},
410
        },
411
    )
412
    if parent_snapshot is None:
1✔
413
        # Create an empty pipeline snapshot if no parent snapshot is provided
414
        final_snapshot = PipelineSnapshot(
1✔
415
            pipeline_state=PipelineState(inputs={}, component_visits={}, pipeline_outputs={}),
416
            timestamp=agent_snapshot.timestamp,
417
            break_point=agent_snapshot.break_point,
418
            agent_snapshot=agent_snapshot,
419
            original_input_data={},
420
            ordered_component_names=[],
421
            include_outputs_from=set(),
422
        )
423
    else:
424
        final_snapshot = replace(parent_snapshot, agent_snapshot=agent_snapshot)
1✔
425

426
    return final_snapshot
1✔
427

428

429
def _create_pipeline_snapshot_from_tool_invoker(
1✔
430
    *,
431
    execution_context: "_ExecutionContext",
432
    tool_name: Optional[str] = None,
433
    agent_name: Optional[str] = None,
434
    break_point: Optional[AgentBreakpoint] = None,
435
    parent_snapshot: Optional[PipelineSnapshot] = None,
436
) -> PipelineSnapshot:
437
    """
438
    Create a pipeline snapshot when a tool invoker breakpoint is raised or an exception during execution occurs.
439

440
    :param execution_context: The current execution context of the agent.
441
    :param tool_name: The name of the tool that triggered the breakpoint, if available.
442
    :param agent_name: The name of the agent component if present in a pipeline.
443
    :param break_point: An optional AgentBreakpoint object. If provided, it will be used instead of creating a new one.
444
        A scenario where a new breakpoint is created is when an exception occurs during tool execution and we want to
445
        capture the state at that point.
446
    :param parent_snapshot: An optional parent PipelineSnapshot to build upon.
447
    :returns:
448
        A PipelineSnapshot containing the state of the pipeline and agent at the point of the breakpoint or exception.
449
    """
450
    if break_point is None:
1✔
451
        agent_breakpoint = AgentBreakpoint(
×
452
            agent_name=agent_name or "agent",
453
            break_point=ToolBreakpoint(
454
                component_name="tool_invoker",
455
                visit_count=execution_context.component_visits["tool_invoker"],
456
                tool_name=tool_name,
457
                snapshot_file_path=_get_output_dir("pipeline_snapshot"),
458
            ),
459
        )
460
    else:
461
        agent_breakpoint = break_point
1✔
462

463
    messages = execution_context.state.data["messages"]
1✔
464
    agent_snapshot = _create_agent_snapshot(
1✔
465
        component_visits=execution_context.component_visits,
466
        agent_breakpoint=agent_breakpoint,
467
        component_inputs={
468
            "chat_generator": {"messages": messages[:-1], **execution_context.chat_generator_inputs},
469
            "tool_invoker": {
470
                "messages": messages[-1:],  # tool invoker consumes last msg from the chat_generator, contains tool call
471
                "state": execution_context.state,
472
                **execution_context.tool_invoker_inputs,
473
            },
474
        },
475
    )
476
    if parent_snapshot is None:
1✔
477
        # Create an empty pipeline snapshot if no parent snapshot is provided
478
        final_snapshot = PipelineSnapshot(
1✔
479
            pipeline_state=PipelineState(inputs={}, component_visits={}, pipeline_outputs={}),
480
            timestamp=agent_snapshot.timestamp,
481
            break_point=agent_snapshot.break_point,
482
            agent_snapshot=agent_snapshot,
483
            original_input_data={},
484
            ordered_component_names=[],
485
            include_outputs_from=set(),
486
        )
487
    else:
488
        final_snapshot = replace(parent_snapshot, agent_snapshot=agent_snapshot)
1✔
489

490
    return final_snapshot
1✔
491

492

493
def _trigger_chat_generator_breakpoint(*, pipeline_snapshot: PipelineSnapshot) -> None:
1✔
494
    """
495
    Trigger a breakpoint before ChatGenerator execution in Agent.
496

497
    :param pipeline_snapshot: PipelineSnapshot object containing the state of the pipeline and Agent snapshot.
498
    :raises BreakpointException: Always raised when this function is called, indicating a breakpoint has been triggered.
499
    """
500
    if not isinstance(pipeline_snapshot.break_point, AgentBreakpoint):
1✔
501
        raise ValueError("PipelineSnapshot must contain an AgentBreakpoint to trigger a chat generator breakpoint.")
×
502

503
    if not isinstance(pipeline_snapshot.agent_snapshot, AgentSnapshot):
1✔
504
        raise ValueError("PipelineSnapshot must contain an AgentSnapshot to trigger a chat generator breakpoint.")
×
505

506
    break_point = pipeline_snapshot.break_point.break_point
1✔
507
    _save_pipeline_snapshot(pipeline_snapshot=pipeline_snapshot)
1✔
508
    msg = (
1✔
509
        f"Breaking at {break_point.component_name} visit count "
510
        f"{pipeline_snapshot.agent_snapshot.component_visits[break_point.component_name]}"
511
    )
512
    logger.info(msg)
1✔
513
    raise BreakpointException(
1✔
514
        message=msg,
515
        component=break_point.component_name,
516
        inputs=pipeline_snapshot.agent_snapshot.component_inputs,
517
        results=pipeline_snapshot.agent_snapshot.component_inputs["tool_invoker"]["serialized_data"]["state"],
518
    )
519

520

521
def _trigger_tool_invoker_breakpoint(*, llm_messages: list[ChatMessage], pipeline_snapshot: PipelineSnapshot) -> None:
1✔
522
    """
523
    Check if a tool call breakpoint should be triggered before executing the tool invoker.
524

525
    :param llm_messages: List of ChatMessage objects containing potential tool calls.
526
    :param pipeline_snapshot: PipelineSnapshot object containing the state of the pipeline and Agent snapshot.
527
    :raises BreakpointException: If the breakpoint is triggered, indicating a breakpoint has been reached for a tool
528
        call.
529
    """
530
    if not pipeline_snapshot.agent_snapshot:
1✔
531
        raise ValueError("PipelineSnapshot must contain an AgentSnapshot to trigger a tool call breakpoint.")
×
532

533
    if not isinstance(pipeline_snapshot.agent_snapshot.break_point.break_point, ToolBreakpoint):
1✔
534
        return
×
535

536
    tool_breakpoint = pipeline_snapshot.agent_snapshot.break_point.break_point
1✔
537

538
    # Check if we should break for this specific tool or all tools
539
    if tool_breakpoint.tool_name is None:
1✔
540
        # Break for any tool call
541
        should_break = any(msg.tool_call for msg in llm_messages)
1✔
542
    else:
543
        # Break only for the specific tool
544
        should_break = any(
1✔
545
            tc.tool_name == tool_breakpoint.tool_name for msg in llm_messages for tc in msg.tool_calls or []
546
        )
547

548
    if not should_break:
1✔
549
        return  # No breakpoint triggered
1✔
550

551
    _save_pipeline_snapshot(pipeline_snapshot=pipeline_snapshot)
1✔
552

553
    msg = (
1✔
554
        f"Breaking at {tool_breakpoint.component_name} visit count "
555
        f"{pipeline_snapshot.agent_snapshot.component_visits[tool_breakpoint.component_name]}"
556
    )
557
    if tool_breakpoint.tool_name:
1✔
558
        msg += f" for tool {tool_breakpoint.tool_name}"
1✔
559
    logger.info(msg)
1✔
560

561
    raise BreakpointException(
1✔
562
        message=msg,
563
        component=tool_breakpoint.component_name,
564
        inputs=pipeline_snapshot.agent_snapshot.component_inputs,
565
        results=pipeline_snapshot.agent_snapshot.component_inputs["tool_invoker"]["serialized_data"]["state"],
566
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc