• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 18595249452

17 Oct 2025 02:08PM UTC coverage: 92.22% (+0.02%) from 92.2%
18595249452

Pull #9886

github

web-flow
Merge ad30d1879 into cc4f024af
Pull Request #9886: feat: Update tools param to Optional[Union[list[Union[Tool, Toolset]], Toolset]]

13382 of 14511 relevant lines covered (92.22%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.94
haystack/core/pipeline/breakpoint.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import json
1✔
6
from copy import deepcopy
1✔
7
from dataclasses import replace
1✔
8
from datetime import datetime
1✔
9
from pathlib import Path
1✔
10
from typing import TYPE_CHECKING, Any, Optional, Union
1✔
11

12
from networkx import MultiDiGraph
1✔
13

14
from haystack import logging
1✔
15
from haystack.core.errors import BreakpointException, PipelineInvalidPipelineSnapshotError
1✔
16
from haystack.dataclasses import ChatMessage
1✔
17
from haystack.dataclasses.breakpoints import (
1✔
18
    AgentBreakpoint,
19
    AgentSnapshot,
20
    Breakpoint,
21
    PipelineSnapshot,
22
    PipelineState,
23
    ToolBreakpoint,
24
)
25
from haystack.utils.base_serialization import _serialize_value_with_schema
1✔
26
from haystack.utils.misc import _get_output_dir
1✔
27

28
if TYPE_CHECKING:
29
    from haystack.components.agents.agent import _ExecutionContext
30
    from haystack.tools import Tool, Toolset, ToolsType
31

32
logger = logging.getLogger(__name__)
1✔
33

34

35
def _validate_break_point_against_pipeline(
1✔
36
    break_point: Union[Breakpoint, AgentBreakpoint], graph: MultiDiGraph
37
) -> None:
38
    """
39
    Validates the breakpoints passed to the pipeline.
40

41
    Makes sure the breakpoint contains a valid components registered in the pipeline.
42

43
    :param break_point: a breakpoint to validate, can be Breakpoint or AgentBreakpoint
44
    """
45

46
    # all Breakpoints must refer to a valid component in the pipeline
47
    if isinstance(break_point, Breakpoint) and break_point.component_name not in graph.nodes:
1✔
48
        raise ValueError(f"break_point {break_point} is not a registered component in the pipeline")
×
49

50
    if isinstance(break_point, AgentBreakpoint):
1✔
51
        breakpoint_agent_component = graph.nodes.get(break_point.agent_name)
1✔
52
        if not breakpoint_agent_component:
1✔
53
            raise ValueError(f"break_point {break_point} is not a registered Agent component in the pipeline")
×
54

55
        if isinstance(break_point.break_point, ToolBreakpoint):
1✔
56
            instance = breakpoint_agent_component["instance"]
1✔
57
            for tool in instance.tools:
1✔
58
                if break_point.break_point.tool_name == tool.name:
1✔
59
                    break
1✔
60
            else:
61
                raise ValueError(
×
62
                    f"break_point {break_point.break_point} is not a registered tool in the Agent component"
63
                )
64

65

66
def _validate_pipeline_snapshot_against_pipeline(pipeline_snapshot: PipelineSnapshot, graph: MultiDiGraph) -> None:
1✔
67
    """
68
    Validates that the pipeline_snapshot contains valid configuration for the current pipeline.
69

70
    Raises a PipelineInvalidPipelineSnapshotError if any component in pipeline_snapshot is not part of the
71
    target pipeline.
72

73
    :param pipeline_snapshot: The saved state to validate.
74
    """
75

76
    pipeline_state = pipeline_snapshot.pipeline_state
1✔
77
    valid_components = set(graph.nodes.keys())
1✔
78

79
    # Check if the ordered_component_names are valid components in the pipeline
80
    invalid_ordered_components = set(pipeline_snapshot.ordered_component_names) - valid_components
1✔
81
    if invalid_ordered_components:
1✔
82
        raise PipelineInvalidPipelineSnapshotError(
×
83
            f"Invalid pipeline snapshot: components {invalid_ordered_components} in 'ordered_component_names' "
84
            f"are not part of the current pipeline."
85
        )
86

87
    # Check if the original_input_data is valid components in the pipeline
88
    serialized_input_data = pipeline_snapshot.original_input_data["serialized_data"]
1✔
89
    invalid_input_data = set(serialized_input_data.keys()) - valid_components
1✔
90
    if invalid_input_data:
1✔
91
        raise PipelineInvalidPipelineSnapshotError(
×
92
            f"Invalid pipeline snapshot: components {invalid_input_data} in 'input_data' "
93
            f"are not part of the current pipeline."
94
        )
95

96
    # Validate 'component_visits'
97
    invalid_component_visits = set(pipeline_state.component_visits.keys()) - valid_components
1✔
98
    if invalid_component_visits:
1✔
99
        raise PipelineInvalidPipelineSnapshotError(
×
100
            f"Invalid pipeline snapshot: components {invalid_component_visits} in 'component_visits' "
101
            f"are not part of the current pipeline."
102
        )
103

104
    if isinstance(pipeline_snapshot.break_point, AgentBreakpoint):
1✔
105
        component_name = pipeline_snapshot.break_point.agent_name
1✔
106
    else:
107
        component_name = pipeline_snapshot.break_point.component_name
×
108

109
    visit_count = pipeline_snapshot.pipeline_state.component_visits[component_name]
1✔
110

111
    logger.info(
1✔
112
        "Resuming pipeline from {component} with visit count {visits}", component=component_name, visits=visit_count
113
    )
114

115

116
def load_pipeline_snapshot(file_path: Union[str, Path]) -> PipelineSnapshot:
1✔
117
    """
118
    Load a saved pipeline snapshot.
119

120
    :param file_path: Path to the pipeline_snapshot file.
121
    :returns:
122
        Dict containing the loaded pipeline_snapshot.
123
    """
124

125
    file_path = Path(file_path)
1✔
126

127
    try:
1✔
128
        with open(file_path, "r", encoding="utf-8") as f:
1✔
129
            pipeline_snapshot_dict = json.load(f)
1✔
130
    except FileNotFoundError:
×
131
        raise FileNotFoundError(f"File not found: {file_path}")
×
132
    except json.JSONDecodeError as e:
×
133
        raise json.JSONDecodeError(f"Invalid JSON file {file_path}: {str(e)}", e.doc, e.pos)
×
134
    except IOError as e:
×
135
        raise IOError(f"Error reading {file_path}: {str(e)}")
×
136

137
    try:
1✔
138
        pipeline_snapshot = PipelineSnapshot.from_dict(pipeline_snapshot_dict)
1✔
139
    except ValueError as e:
1✔
140
        raise ValueError(f"Invalid pipeline snapshot from {file_path}: {str(e)}")
1✔
141

142
    logger.info(f"Successfully loaded the pipeline snapshot from: {file_path}")
1✔
143
    return pipeline_snapshot
1✔
144

145

146
def _save_pipeline_snapshot(pipeline_snapshot: PipelineSnapshot, raise_on_failure: bool = True) -> Optional[str]:
1✔
147
    """
148
    Save the pipeline snapshot dictionary to a JSON file.
149

150
    - The filename is generated based on the component name, visit count, and timestamp.
151
        - The component name is taken from the break point's `component_name`.
152
        - The visit count is taken from the pipeline state's `component_visits` for the component name.
153
        - The timestamp is taken from the pipeline snapshot's `timestamp` or the current time if not available.
154
    - The file path is taken from the break point's `snapshot_file_path`.
155
    - If the `snapshot_file_path` is None, the function will return without saving.
156

157
    :param pipeline_snapshot: The pipeline snapshot to save.
158
    :param raise_on_failure: If True, raises an exception if saving fails. If False, logs the error and returns.
159

160
    :returns:
161
        The full path to the saved JSON file, or None if `snapshot_file_path` is None.
162
    :raises:
163
        Exception: If saving the JSON snapshot fails.
164
    """
165
    break_point = pipeline_snapshot.break_point
1✔
166
    snapshot_file_path = (
1✔
167
        break_point.break_point.snapshot_file_path
168
        if isinstance(break_point, AgentBreakpoint)
169
        else break_point.snapshot_file_path
170
    )
171

172
    if snapshot_file_path is None:
1✔
173
        return None
1✔
174

175
    dt = pipeline_snapshot.timestamp or datetime.now()
1✔
176
    snapshot_dir = Path(snapshot_file_path)
1✔
177

178
    # Generate filename
179
    # We check if the agent_name is provided to differentiate between agent and non-agent breakpoints
180
    if isinstance(break_point, AgentBreakpoint):
1✔
181
        agent_name = break_point.agent_name
1✔
182
        component_name = break_point.break_point.component_name
1✔
183
    else:
184
        component_name = break_point.component_name
1✔
185
        agent_name = None
1✔
186

187
    visit_nr = pipeline_snapshot.pipeline_state.component_visits.get(component_name, 0)
1✔
188
    timestamp = dt.strftime("%Y_%m_%d_%H_%M_%S")
1✔
189
    file_name = f"{agent_name + '_' if agent_name else ''}{component_name}_{visit_nr}_{timestamp}.json"
1✔
190
    full_path = snapshot_dir / file_name
1✔
191

192
    try:
1✔
193
        snapshot_dir.mkdir(parents=True, exist_ok=True)
1✔
194
        with open(full_path, "w") as f_out:
1✔
195
            json.dump(pipeline_snapshot.to_dict(), f_out, indent=2)
1✔
196
        logger.info(
1✔
197
            "Pipeline snapshot saved to '{full_path}'. You can use this file to debug or resume the pipeline.",
198
            full_path=full_path,
199
        )
200
    except Exception as error:
1✔
201
        logger.error("Failed to save pipeline snapshot to '{full_path}'. Error: {e}", full_path=full_path, e=error)
1✔
202
        if raise_on_failure:
1✔
203
            raise
1✔
204

205
    return str(full_path)
1✔
206

207

208
def _create_pipeline_snapshot(
1✔
209
    *,
210
    inputs: dict[str, Any],
211
    component_inputs: dict[str, Any],
212
    break_point: Union[AgentBreakpoint, Breakpoint],
213
    component_visits: dict[str, int],
214
    original_input_data: dict[str, Any],
215
    ordered_component_names: list[str],
216
    include_outputs_from: set[str],
217
    pipeline_outputs: dict[str, Any],
218
) -> PipelineSnapshot:
219
    """
220
    Create a snapshot of the pipeline at the point where the breakpoint was triggered.
221

222
    :param inputs: The current pipeline snapshot inputs.
223
    :param component_inputs: The inputs to the component that triggered the breakpoint.
224
    :param break_point: The breakpoint that triggered the snapshot, can be AgentBreakpoint or Breakpoint.
225
    :param component_visits: The visit count of the component that triggered the breakpoint.
226
    :param original_input_data: The original input data.
227
    :param ordered_component_names: The ordered component names.
228
    :param include_outputs_from: Set of component names whose outputs should be included in the pipeline results.
229
    :param pipeline_outputs: The current outputs of the pipeline.
230
    :returns:
231
        A PipelineSnapshot containing the state of the pipeline at the point of the breakpoint.
232
    """
233
    if isinstance(break_point, AgentBreakpoint):
1✔
234
        component_name = break_point.agent_name
1✔
235
    else:
236
        component_name = break_point.component_name
1✔
237

238
    transformed_original_input_data = _transform_json_structure(original_input_data)
1✔
239
    transformed_inputs = _transform_json_structure({**inputs, component_name: component_inputs})
1✔
240

241
    try:
1✔
242
        serialized_inputs = _serialize_value_with_schema(transformed_inputs)
1✔
243
    except Exception as error:
1✔
244
        logger.warning(
1✔
245
            "Failed to serialize the inputs of the current pipeline state. "
246
            "The inputs in the snapshot will be replaced with an empty dictionary. Error: {e}",
247
            e=error,
248
        )
249
        serialized_inputs = {}
1✔
250

251
    try:
1✔
252
        serialized_original_input_data = _serialize_value_with_schema(transformed_original_input_data)
1✔
253
    except Exception as error:
1✔
254
        logger.warning(
1✔
255
            "Failed to serialize original input data for `pipeline.run`. "
256
            "This likely occurred due to non-serializable object types. "
257
            "The snapshot will store an empty dictionary instead. Error: {e}",
258
            e=error,
259
        )
260
        serialized_original_input_data = {}
1✔
261

262
    pipeline_snapshot = PipelineSnapshot(
1✔
263
        pipeline_state=PipelineState(
264
            inputs=serialized_inputs, component_visits=component_visits, pipeline_outputs=pipeline_outputs
265
        ),
266
        timestamp=datetime.now(),
267
        break_point=break_point,
268
        original_input_data=serialized_original_input_data,
269
        ordered_component_names=ordered_component_names,
270
        include_outputs_from=include_outputs_from,
271
    )
272
    return pipeline_snapshot
1✔
273

274

275
def _transform_json_structure(data: Union[dict[str, Any], list[Any], Any]) -> Any:
1✔
276
    """
277
    Transforms a JSON structure by removing the 'sender' key and moving the 'value' to the top level.
278

279
    For example:
280
    "key": [{"sender": null, "value": "some value"}] -> "key": "some value"
281

282
    :param data: The JSON structure to transform.
283
    :returns: The transformed structure.
284
    """
285
    if isinstance(data, dict):
1✔
286
        # If this dict has both 'sender' and 'value', return just the value
287
        if "value" in data and "sender" in data:
1✔
288
            return data["value"]
1✔
289
        # Otherwise, recursively process each key-value pair
290
        return {k: _transform_json_structure(v) for k, v in data.items()}
1✔
291

292
    if isinstance(data, list):
1✔
293
        # First, transform each item in the list.
294
        transformed = [_transform_json_structure(item) for item in data]
1✔
295
        # If the original list has exactly one element and that element was a dict
296
        # with 'sender' and 'value', then unwrap the list.
297
        if len(data) == 1 and isinstance(data[0], dict) and "value" in data[0] and "sender" in data[0]:
1✔
298
            return transformed[0]
1✔
299
        return transformed
1✔
300

301
    # For other data types, just return the value as is.
302
    return data
1✔
303

304

305
def _trigger_break_point(*, pipeline_snapshot: PipelineSnapshot) -> None:
1✔
306
    """
307
    Trigger a breakpoint by saving a snapshot and raising exception.
308

309
    :param pipeline_snapshot: The current pipeline snapshot containing the state and break point
310
    :raises BreakpointException: When breakpoint is triggered
311
    """
312
    full_file_path = _save_pipeline_snapshot(pipeline_snapshot=pipeline_snapshot)
1✔
313

314
    if isinstance(pipeline_snapshot.break_point, Breakpoint):
1✔
315
        component_name = pipeline_snapshot.break_point.component_name
1✔
316
    else:
317
        component_name = pipeline_snapshot.break_point.agent_name
×
318

319
    component_visits = pipeline_snapshot.pipeline_state.component_visits
1✔
320
    msg = f"Breaking at component {component_name} at visit count {component_visits[component_name]}"
1✔
321
    raise BreakpointException(
1✔
322
        message=msg,
323
        component=component_name,
324
        pipeline_snapshot=pipeline_snapshot,
325
        pipeline_snapshot_file_path=full_file_path,
326
    )
327

328

329
def _create_agent_snapshot(
1✔
330
    *, component_visits: dict[str, int], agent_breakpoint: AgentBreakpoint, component_inputs: dict[str, Any]
331
) -> AgentSnapshot:
332
    """
333
    Create a snapshot of the agent's state.
334

335
    :param component_visits: The visit counts for the agent's components.
336
    :param agent_breakpoint: AgentBreakpoint object containing breakpoints
337
    :return: An AgentSnapshot containing the agent's state and component visits.
338
    """
339
    return AgentSnapshot(
1✔
340
        component_inputs={
341
            "chat_generator": _serialize_value_with_schema(deepcopy(component_inputs["chat_generator"])),
342
            "tool_invoker": _serialize_value_with_schema(deepcopy(component_inputs["tool_invoker"])),
343
        },
344
        component_visits=component_visits,
345
        break_point=agent_breakpoint,
346
        timestamp=datetime.now(),
347
    )
348

349

350
def _validate_tool_breakpoint_is_valid(agent_breakpoint: AgentBreakpoint, tools: "ToolsType") -> None:
1✔
351
    """
352
    Validates the AgentBreakpoint passed to the agent.
353

354
    Validates that the tool name in ToolBreakpoints correspond to a tool available in the agent.
355

356
    :param agent_breakpoint: AgentBreakpoint object containing breakpoints for the agent components.
357
    :param tools: A list of Tool and/or Toolset objects, or a Toolset that the agent can use.
358
    :raises ValueError: If any tool name in ToolBreakpoints is not available in the agent's tools.
359
    """
360
    from haystack.tools.utils import flatten_tools_or_toolsets  # avoid circular import
1✔
361

362
    available_tool_names = {tool.name for tool in flatten_tools_or_toolsets(tools)}
1✔
363
    tool_breakpoint = agent_breakpoint.break_point
1✔
364
    # Assert added for mypy to pass, but this is already checked before this function is called
365
    assert isinstance(tool_breakpoint, ToolBreakpoint)
1✔
366
    if tool_breakpoint.tool_name and tool_breakpoint.tool_name not in available_tool_names:
1✔
367
        raise ValueError(f"Tool '{tool_breakpoint.tool_name}' is not available in the agent's tools")
1✔
368

369

370
def _create_pipeline_snapshot_from_chat_generator(
1✔
371
    *,
372
    execution_context: "_ExecutionContext",
373
    agent_name: Optional[str] = None,
374
    break_point: Optional[AgentBreakpoint] = None,
375
    parent_snapshot: Optional[PipelineSnapshot] = None,
376
) -> PipelineSnapshot:
377
    """
378
    Create a pipeline snapshot when a chat generator breakpoint is raised or an exception during execution occurs.
379

380
    :param execution_context: The current execution context of the agent.
381
    :param agent_name: The name of the agent component if present in a pipeline.
382
    :param break_point: An optional AgentBreakpoint object. If provided, it will be used instead of creating a new one.
383
        A scenario where a new breakpoint is created is when an exception occurs during chat generation and we want to
384
        capture the state at that point.
385
    :param parent_snapshot: An optional parent PipelineSnapshot to build upon.
386
    :returns:
387
        A PipelineSnapshot containing the state of the pipeline and agent at the point of the breakpoint or exception.
388
    """
389
    if break_point is None:
1✔
390
        agent_breakpoint = AgentBreakpoint(
1✔
391
            agent_name=agent_name or "agent",
392
            break_point=Breakpoint(
393
                component_name="chat_generator",
394
                visit_count=execution_context.component_visits["chat_generator"],
395
                snapshot_file_path=_get_output_dir("pipeline_snapshot"),
396
            ),
397
        )
398
    else:
399
        agent_breakpoint = break_point
1✔
400

401
    agent_snapshot = _create_agent_snapshot(
1✔
402
        component_visits=execution_context.component_visits,
403
        agent_breakpoint=agent_breakpoint,
404
        component_inputs={
405
            "chat_generator": {
406
                "messages": execution_context.state.data["messages"],
407
                **execution_context.chat_generator_inputs,
408
            },
409
            "tool_invoker": {"messages": [], "state": execution_context.state, **execution_context.tool_invoker_inputs},
410
        },
411
    )
412
    if parent_snapshot is None:
1✔
413
        # Create an empty pipeline snapshot if no parent snapshot is provided
414
        final_snapshot = PipelineSnapshot(
1✔
415
            pipeline_state=PipelineState(inputs={}, component_visits={}, pipeline_outputs={}),
416
            timestamp=agent_snapshot.timestamp,
417
            break_point=agent_snapshot.break_point,
418
            agent_snapshot=agent_snapshot,
419
            original_input_data={},
420
            ordered_component_names=[],
421
            include_outputs_from=set(),
422
        )
423
    else:
424
        final_snapshot = replace(parent_snapshot, agent_snapshot=agent_snapshot)
1✔
425

426
    return final_snapshot
1✔
427

428

429
def _create_pipeline_snapshot_from_tool_invoker(
1✔
430
    *,
431
    execution_context: "_ExecutionContext",
432
    tool_name: Optional[str] = None,
433
    agent_name: Optional[str] = None,
434
    break_point: Optional[AgentBreakpoint] = None,
435
    parent_snapshot: Optional[PipelineSnapshot] = None,
436
) -> PipelineSnapshot:
437
    """
438
    Create a pipeline snapshot when a tool invoker breakpoint is raised or an exception during execution occurs.
439

440
    :param execution_context: The current execution context of the agent.
441
    :param tool_name: The name of the tool that triggered the breakpoint, if available.
442
    :param agent_name: The name of the agent component if present in a pipeline.
443
    :param break_point: An optional AgentBreakpoint object. If provided, it will be used instead of creating a new one.
444
        A scenario where a new breakpoint is created is when an exception occurs during tool execution and we want to
445
        capture the state at that point.
446
    :param parent_snapshot: An optional parent PipelineSnapshot to build upon.
447
    :returns:
448
        A PipelineSnapshot containing the state of the pipeline and agent at the point of the breakpoint or exception.
449
    """
450
    if break_point is None:
1✔
451
        agent_breakpoint = AgentBreakpoint(
1✔
452
            agent_name=agent_name or "agent",
453
            break_point=ToolBreakpoint(
454
                component_name="tool_invoker",
455
                visit_count=execution_context.component_visits["tool_invoker"],
456
                tool_name=tool_name,
457
                snapshot_file_path=_get_output_dir("pipeline_snapshot"),
458
            ),
459
        )
460
    else:
461
        agent_breakpoint = break_point
1✔
462

463
    messages = execution_context.state.data["messages"]
1✔
464
    agent_snapshot = _create_agent_snapshot(
1✔
465
        component_visits=execution_context.component_visits,
466
        agent_breakpoint=agent_breakpoint,
467
        component_inputs={
468
            "chat_generator": {"messages": messages[:-1], **execution_context.chat_generator_inputs},
469
            "tool_invoker": {
470
                "messages": messages[-1:],  # tool invoker consumes last msg from the chat_generator, contains tool call
471
                "state": execution_context.state,
472
                **execution_context.tool_invoker_inputs,
473
            },
474
        },
475
    )
476
    if parent_snapshot is None:
1✔
477
        # Create an empty pipeline snapshot if no parent snapshot is provided
478
        final_snapshot = PipelineSnapshot(
1✔
479
            pipeline_state=PipelineState(inputs={}, component_visits={}, pipeline_outputs={}),
480
            timestamp=agent_snapshot.timestamp,
481
            break_point=agent_snapshot.break_point,
482
            agent_snapshot=agent_snapshot,
483
            original_input_data={},
484
            ordered_component_names=[],
485
            include_outputs_from=set(),
486
        )
487
    else:
488
        final_snapshot = replace(parent_snapshot, agent_snapshot=agent_snapshot)
1✔
489

490
    return final_snapshot
1✔
491

492

493
def _trigger_chat_generator_breakpoint(*, pipeline_snapshot: PipelineSnapshot) -> None:
1✔
494
    """
495
    Trigger a breakpoint before ChatGenerator execution in Agent.
496

497
    :param pipeline_snapshot: PipelineSnapshot object containing the state of the pipeline and Agent snapshot.
498
    :raises BreakpointException: Always raised when this function is called, indicating a breakpoint has been triggered.
499
    """
500
    if not isinstance(pipeline_snapshot.break_point, AgentBreakpoint):
1✔
501
        raise ValueError("PipelineSnapshot must contain an AgentBreakpoint to trigger a chat generator breakpoint.")
×
502

503
    if not isinstance(pipeline_snapshot.agent_snapshot, AgentSnapshot):
1✔
504
        raise ValueError("PipelineSnapshot must contain an AgentSnapshot to trigger a chat generator breakpoint.")
×
505

506
    break_point = pipeline_snapshot.break_point.break_point
1✔
507
    full_file_path = _save_pipeline_snapshot(pipeline_snapshot=pipeline_snapshot)
1✔
508
    msg = (
1✔
509
        f"Breaking at {break_point.component_name} visit count "
510
        f"{pipeline_snapshot.agent_snapshot.component_visits[break_point.component_name]}"
511
    )
512
    raise BreakpointException(
1✔
513
        message=msg,
514
        component=break_point.component_name,
515
        pipeline_snapshot=pipeline_snapshot,
516
        pipeline_snapshot_file_path=full_file_path,
517
    )
518

519

520
def _trigger_tool_invoker_breakpoint(*, llm_messages: list[ChatMessage], pipeline_snapshot: PipelineSnapshot) -> None:
1✔
521
    """
522
    Check if a tool call breakpoint should be triggered before executing the tool invoker.
523

524
    :param llm_messages: List of ChatMessage objects containing potential tool calls.
525
    :param pipeline_snapshot: PipelineSnapshot object containing the state of the pipeline and Agent snapshot.
526
    :raises BreakpointException: If the breakpoint is triggered, indicating a breakpoint has been reached for a tool
527
        call.
528
    """
529
    if not pipeline_snapshot.agent_snapshot:
1✔
530
        raise ValueError("PipelineSnapshot must contain an AgentSnapshot to trigger a tool call breakpoint.")
×
531

532
    if not isinstance(pipeline_snapshot.agent_snapshot.break_point.break_point, ToolBreakpoint):
1✔
533
        return
×
534

535
    tool_breakpoint = pipeline_snapshot.agent_snapshot.break_point.break_point
1✔
536

537
    # Check if we should break for this specific tool or all tools
538
    if tool_breakpoint.tool_name is None:
1✔
539
        # Break for any tool call
540
        should_break = any(msg.tool_call for msg in llm_messages)
1✔
541
    else:
542
        # Break only for the specific tool
543
        should_break = any(
1✔
544
            tc.tool_name == tool_breakpoint.tool_name for msg in llm_messages for tc in msg.tool_calls or []
545
        )
546

547
    if not should_break:
1✔
548
        return  # No breakpoint triggered
1✔
549

550
    full_file_path = _save_pipeline_snapshot(pipeline_snapshot=pipeline_snapshot)
1✔
551

552
    msg = (
1✔
553
        f"Breaking at {tool_breakpoint.component_name} visit count "
554
        f"{pipeline_snapshot.agent_snapshot.component_visits[tool_breakpoint.component_name]}"
555
    )
556
    if tool_breakpoint.tool_name:
1✔
557
        msg += f" for tool {tool_breakpoint.tool_name}"
1✔
558
    logger.info(msg)
1✔
559

560
    raise BreakpointException(
1✔
561
        message=msg,
562
        component=tool_breakpoint.component_name,
563
        pipeline_snapshot=pipeline_snapshot,
564
        pipeline_snapshot_file_path=full_file_path,
565
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc