• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 18592817487

17 Oct 2025 12:33PM UTC coverage: 92.2% (+0.1%) from 92.062%
18592817487

Pull #9859

github

web-flow
Merge f20ff2b98 into a43c47b63
Pull Request #9859: feat: Add FallbackChatGenerator

13346 of 14475 relevant lines covered (92.2%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.89
haystack/core/pipeline/breakpoint.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import json
1✔
6
from copy import deepcopy
1✔
7
from dataclasses import replace
1✔
8
from datetime import datetime
1✔
9
from pathlib import Path
1✔
10
from typing import TYPE_CHECKING, Any, Optional, Union
1✔
11

12
from networkx import MultiDiGraph
1✔
13

14
from haystack import logging
1✔
15
from haystack.core.errors import BreakpointException, PipelineInvalidPipelineSnapshotError
1✔
16
from haystack.dataclasses import ChatMessage
1✔
17
from haystack.dataclasses.breakpoints import (
1✔
18
    AgentBreakpoint,
19
    AgentSnapshot,
20
    Breakpoint,
21
    PipelineSnapshot,
22
    PipelineState,
23
    ToolBreakpoint,
24
)
25
from haystack.utils.base_serialization import _serialize_value_with_schema
1✔
26
from haystack.utils.misc import _get_output_dir
1✔
27

28
if TYPE_CHECKING:
29
    from haystack.components.agents.agent import _ExecutionContext
30
    from haystack.tools.tool import Tool
31
    from haystack.tools.toolset import Toolset
32

33
logger = logging.getLogger(__name__)
1✔
34

35

36
def _validate_break_point_against_pipeline(
1✔
37
    break_point: Union[Breakpoint, AgentBreakpoint], graph: MultiDiGraph
38
) -> None:
39
    """
40
    Validates the breakpoints passed to the pipeline.
41

42
    Makes sure the breakpoint contains a valid components registered in the pipeline.
43

44
    :param break_point: a breakpoint to validate, can be Breakpoint or AgentBreakpoint
45
    """
46

47
    # all Breakpoints must refer to a valid component in the pipeline
48
    if isinstance(break_point, Breakpoint) and break_point.component_name not in graph.nodes:
1✔
49
        raise ValueError(f"break_point {break_point} is not a registered component in the pipeline")
×
50

51
    if isinstance(break_point, AgentBreakpoint):
1✔
52
        breakpoint_agent_component = graph.nodes.get(break_point.agent_name)
1✔
53
        if not breakpoint_agent_component:
1✔
54
            raise ValueError(f"break_point {break_point} is not a registered Agent component in the pipeline")
×
55

56
        if isinstance(break_point.break_point, ToolBreakpoint):
1✔
57
            instance = breakpoint_agent_component["instance"]
1✔
58
            for tool in instance.tools:
1✔
59
                if break_point.break_point.tool_name == tool.name:
1✔
60
                    break
1✔
61
            else:
62
                raise ValueError(
×
63
                    f"break_point {break_point.break_point} is not a registered tool in the Agent component"
64
                )
65

66

67
def _validate_pipeline_snapshot_against_pipeline(pipeline_snapshot: PipelineSnapshot, graph: MultiDiGraph) -> None:
1✔
68
    """
69
    Validates that the pipeline_snapshot contains valid configuration for the current pipeline.
70

71
    Raises a PipelineInvalidPipelineSnapshotError if any component in pipeline_snapshot is not part of the
72
    target pipeline.
73

74
    :param pipeline_snapshot: The saved state to validate.
75
    """
76

77
    pipeline_state = pipeline_snapshot.pipeline_state
1✔
78
    valid_components = set(graph.nodes.keys())
1✔
79

80
    # Check if the ordered_component_names are valid components in the pipeline
81
    invalid_ordered_components = set(pipeline_snapshot.ordered_component_names) - valid_components
1✔
82
    if invalid_ordered_components:
1✔
83
        raise PipelineInvalidPipelineSnapshotError(
×
84
            f"Invalid pipeline snapshot: components {invalid_ordered_components} in 'ordered_component_names' "
85
            f"are not part of the current pipeline."
86
        )
87

88
    # Check if the original_input_data is valid components in the pipeline
89
    serialized_input_data = pipeline_snapshot.original_input_data["serialized_data"]
1✔
90
    invalid_input_data = set(serialized_input_data.keys()) - valid_components
1✔
91
    if invalid_input_data:
1✔
92
        raise PipelineInvalidPipelineSnapshotError(
×
93
            f"Invalid pipeline snapshot: components {invalid_input_data} in 'input_data' "
94
            f"are not part of the current pipeline."
95
        )
96

97
    # Validate 'component_visits'
98
    invalid_component_visits = set(pipeline_state.component_visits.keys()) - valid_components
1✔
99
    if invalid_component_visits:
1✔
100
        raise PipelineInvalidPipelineSnapshotError(
×
101
            f"Invalid pipeline snapshot: components {invalid_component_visits} in 'component_visits' "
102
            f"are not part of the current pipeline."
103
        )
104

105
    if isinstance(pipeline_snapshot.break_point, AgentBreakpoint):
1✔
106
        component_name = pipeline_snapshot.break_point.agent_name
1✔
107
    else:
108
        component_name = pipeline_snapshot.break_point.component_name
×
109

110
    visit_count = pipeline_snapshot.pipeline_state.component_visits[component_name]
1✔
111

112
    logger.info(
1✔
113
        "Resuming pipeline from {component} with visit count {visits}", component=component_name, visits=visit_count
114
    )
115

116

117
def load_pipeline_snapshot(file_path: Union[str, Path]) -> PipelineSnapshot:
1✔
118
    """
119
    Load a saved pipeline snapshot.
120

121
    :param file_path: Path to the pipeline_snapshot file.
122
    :returns:
123
        Dict containing the loaded pipeline_snapshot.
124
    """
125

126
    file_path = Path(file_path)
1✔
127

128
    try:
1✔
129
        with open(file_path, "r", encoding="utf-8") as f:
1✔
130
            pipeline_snapshot_dict = json.load(f)
1✔
131
    except FileNotFoundError:
×
132
        raise FileNotFoundError(f"File not found: {file_path}")
×
133
    except json.JSONDecodeError as e:
×
134
        raise json.JSONDecodeError(f"Invalid JSON file {file_path}: {str(e)}", e.doc, e.pos)
×
135
    except IOError as e:
×
136
        raise IOError(f"Error reading {file_path}: {str(e)}")
×
137

138
    try:
1✔
139
        pipeline_snapshot = PipelineSnapshot.from_dict(pipeline_snapshot_dict)
1✔
140
    except ValueError as e:
1✔
141
        raise ValueError(f"Invalid pipeline snapshot from {file_path}: {str(e)}")
1✔
142

143
    logger.info(f"Successfully loaded the pipeline snapshot from: {file_path}")
1✔
144
    return pipeline_snapshot
1✔
145

146

147
def _save_pipeline_snapshot(pipeline_snapshot: PipelineSnapshot, raise_on_failure: bool = True) -> Optional[str]:
1✔
148
    """
149
    Save the pipeline snapshot dictionary to a JSON file.
150

151
    - The filename is generated based on the component name, visit count, and timestamp.
152
        - The component name is taken from the break point's `component_name`.
153
        - The visit count is taken from the pipeline state's `component_visits` for the component name.
154
        - The timestamp is taken from the pipeline snapshot's `timestamp` or the current time if not available.
155
    - The file path is taken from the break point's `snapshot_file_path`.
156
    - If the `snapshot_file_path` is None, the function will return without saving.
157

158
    :param pipeline_snapshot: The pipeline snapshot to save.
159
    :param raise_on_failure: If True, raises an exception if saving fails. If False, logs the error and returns.
160

161
    :returns:
162
        The full path to the saved JSON file, or None if `snapshot_file_path` is None.
163
    :raises:
164
        Exception: If saving the JSON snapshot fails.
165
    """
166
    break_point = pipeline_snapshot.break_point
1✔
167
    snapshot_file_path = (
1✔
168
        break_point.break_point.snapshot_file_path
169
        if isinstance(break_point, AgentBreakpoint)
170
        else break_point.snapshot_file_path
171
    )
172

173
    if snapshot_file_path is None:
1✔
174
        return None
1✔
175

176
    dt = pipeline_snapshot.timestamp or datetime.now()
1✔
177
    snapshot_dir = Path(snapshot_file_path)
1✔
178

179
    # Generate filename
180
    # We check if the agent_name is provided to differentiate between agent and non-agent breakpoints
181
    if isinstance(break_point, AgentBreakpoint):
1✔
182
        agent_name = break_point.agent_name
1✔
183
        component_name = break_point.break_point.component_name
1✔
184
    else:
185
        component_name = break_point.component_name
1✔
186
        agent_name = None
1✔
187

188
    visit_nr = pipeline_snapshot.pipeline_state.component_visits.get(component_name, 0)
1✔
189
    timestamp = dt.strftime("%Y_%m_%d_%H_%M_%S")
1✔
190
    file_name = f"{agent_name + '_' if agent_name else ''}{component_name}_{visit_nr}_{timestamp}.json"
1✔
191
    full_path = snapshot_dir / file_name
1✔
192

193
    try:
1✔
194
        snapshot_dir.mkdir(parents=True, exist_ok=True)
1✔
195
        with open(full_path, "w") as f_out:
1✔
196
            json.dump(pipeline_snapshot.to_dict(), f_out, indent=2)
1✔
197
        logger.info(
1✔
198
            "Pipeline snapshot saved to '{full_path}'. You can use this file to debug or resume the pipeline.",
199
            full_path=full_path,
200
        )
201
    except Exception as error:
1✔
202
        logger.error("Failed to save pipeline snapshot to '{full_path}'. Error: {e}", full_path=full_path, e=error)
1✔
203
        if raise_on_failure:
1✔
204
            raise
1✔
205

206
    return str(full_path)
1✔
207

208

209
def _create_pipeline_snapshot(
1✔
210
    *,
211
    inputs: dict[str, Any],
212
    component_inputs: dict[str, Any],
213
    break_point: Union[AgentBreakpoint, Breakpoint],
214
    component_visits: dict[str, int],
215
    original_input_data: dict[str, Any],
216
    ordered_component_names: list[str],
217
    include_outputs_from: set[str],
218
    pipeline_outputs: dict[str, Any],
219
) -> PipelineSnapshot:
220
    """
221
    Create a snapshot of the pipeline at the point where the breakpoint was triggered.
222

223
    :param inputs: The current pipeline snapshot inputs.
224
    :param component_inputs: The inputs to the component that triggered the breakpoint.
225
    :param break_point: The breakpoint that triggered the snapshot, can be AgentBreakpoint or Breakpoint.
226
    :param component_visits: The visit count of the component that triggered the breakpoint.
227
    :param original_input_data: The original input data.
228
    :param ordered_component_names: The ordered component names.
229
    :param include_outputs_from: Set of component names whose outputs should be included in the pipeline results.
230
    :param pipeline_outputs: The current outputs of the pipeline.
231
    :returns:
232
        A PipelineSnapshot containing the state of the pipeline at the point of the breakpoint.
233
    """
234
    if isinstance(break_point, AgentBreakpoint):
1✔
235
        component_name = break_point.agent_name
1✔
236
    else:
237
        component_name = break_point.component_name
1✔
238

239
    transformed_original_input_data = _transform_json_structure(original_input_data)
1✔
240
    transformed_inputs = _transform_json_structure({**inputs, component_name: component_inputs})
1✔
241

242
    try:
1✔
243
        serialized_inputs = _serialize_value_with_schema(transformed_inputs)
1✔
244
    except Exception as error:
1✔
245
        logger.warning(
1✔
246
            "Failed to serialize the inputs of the current pipeline state. "
247
            "The inputs in the snapshot will be replaced with an empty dictionary. Error: {e}",
248
            e=error,
249
        )
250
        serialized_inputs = {}
1✔
251

252
    try:
1✔
253
        serialized_original_input_data = _serialize_value_with_schema(transformed_original_input_data)
1✔
254
    except Exception as error:
1✔
255
        logger.warning(
1✔
256
            "Failed to serialize original input data for `pipeline.run`. "
257
            "This likely occurred due to non-serializable object types. "
258
            "The snapshot will store an empty dictionary instead. Error: {e}",
259
            e=error,
260
        )
261
        serialized_original_input_data = {}
1✔
262

263
    pipeline_snapshot = PipelineSnapshot(
1✔
264
        pipeline_state=PipelineState(
265
            inputs=serialized_inputs, component_visits=component_visits, pipeline_outputs=pipeline_outputs
266
        ),
267
        timestamp=datetime.now(),
268
        break_point=break_point,
269
        original_input_data=serialized_original_input_data,
270
        ordered_component_names=ordered_component_names,
271
        include_outputs_from=include_outputs_from,
272
    )
273
    return pipeline_snapshot
1✔
274

275

276
def _transform_json_structure(data: Union[dict[str, Any], list[Any], Any]) -> Any:
1✔
277
    """
278
    Transforms a JSON structure by removing the 'sender' key and moving the 'value' to the top level.
279

280
    For example:
281
    "key": [{"sender": null, "value": "some value"}] -> "key": "some value"
282

283
    :param data: The JSON structure to transform.
284
    :returns: The transformed structure.
285
    """
286
    if isinstance(data, dict):
1✔
287
        # If this dict has both 'sender' and 'value', return just the value
288
        if "value" in data and "sender" in data:
1✔
289
            return data["value"]
1✔
290
        # Otherwise, recursively process each key-value pair
291
        return {k: _transform_json_structure(v) for k, v in data.items()}
1✔
292

293
    if isinstance(data, list):
1✔
294
        # First, transform each item in the list.
295
        transformed = [_transform_json_structure(item) for item in data]
1✔
296
        # If the original list has exactly one element and that element was a dict
297
        # with 'sender' and 'value', then unwrap the list.
298
        if len(data) == 1 and isinstance(data[0], dict) and "value" in data[0] and "sender" in data[0]:
1✔
299
            return transformed[0]
1✔
300
        return transformed
1✔
301

302
    # For other data types, just return the value as is.
303
    return data
1✔
304

305

306
def _trigger_break_point(*, pipeline_snapshot: PipelineSnapshot) -> None:
1✔
307
    """
308
    Trigger a breakpoint by saving a snapshot and raising exception.
309

310
    :param pipeline_snapshot: The current pipeline snapshot containing the state and break point
311
    :raises BreakpointException: When breakpoint is triggered
312
    """
313
    full_file_path = _save_pipeline_snapshot(pipeline_snapshot=pipeline_snapshot)
1✔
314

315
    if isinstance(pipeline_snapshot.break_point, Breakpoint):
1✔
316
        component_name = pipeline_snapshot.break_point.component_name
1✔
317
    else:
318
        component_name = pipeline_snapshot.break_point.agent_name
×
319

320
    component_visits = pipeline_snapshot.pipeline_state.component_visits
1✔
321
    msg = f"Breaking at component {component_name} at visit count {component_visits[component_name]}"
1✔
322
    raise BreakpointException(
1✔
323
        message=msg,
324
        component=component_name,
325
        pipeline_snapshot=pipeline_snapshot,
326
        pipeline_snapshot_file_path=full_file_path,
327
    )
328

329

330
def _create_agent_snapshot(
1✔
331
    *, component_visits: dict[str, int], agent_breakpoint: AgentBreakpoint, component_inputs: dict[str, Any]
332
) -> AgentSnapshot:
333
    """
334
    Create a snapshot of the agent's state.
335

336
    :param component_visits: The visit counts for the agent's components.
337
    :param agent_breakpoint: AgentBreakpoint object containing breakpoints
338
    :return: An AgentSnapshot containing the agent's state and component visits.
339
    """
340
    return AgentSnapshot(
1✔
341
        component_inputs={
342
            "chat_generator": _serialize_value_with_schema(deepcopy(component_inputs["chat_generator"])),
343
            "tool_invoker": _serialize_value_with_schema(deepcopy(component_inputs["tool_invoker"])),
344
        },
345
        component_visits=component_visits,
346
        break_point=agent_breakpoint,
347
        timestamp=datetime.now(),
348
    )
349

350

351
def _validate_tool_breakpoint_is_valid(
1✔
352
    agent_breakpoint: AgentBreakpoint, tools: Union[list["Tool"], "Toolset"]
353
) -> None:
354
    """
355
    Validates the AgentBreakpoint passed to the agent.
356

357
    Validates that the tool name in ToolBreakpoints correspond to a tool available in the agent.
358

359
    :param agent_breakpoint: AgentBreakpoint object containing breakpoints for the agent components.
360
    :param tools: List of Tool objects or a Toolset that the agent can use.
361
    :raises ValueError: If any tool name in ToolBreakpoints is not available in the agent's tools.
362
    """
363

364
    available_tool_names = {tool.name for tool in tools}
1✔
365
    tool_breakpoint = agent_breakpoint.break_point
1✔
366
    # Assert added for mypy to pass, but this is already checked before this function is called
367
    assert isinstance(tool_breakpoint, ToolBreakpoint)
1✔
368
    if tool_breakpoint.tool_name and tool_breakpoint.tool_name not in available_tool_names:
1✔
369
        raise ValueError(f"Tool '{tool_breakpoint.tool_name}' is not available in the agent's tools")
1✔
370

371

372
def _create_pipeline_snapshot_from_chat_generator(
1✔
373
    *,
374
    execution_context: "_ExecutionContext",
375
    agent_name: Optional[str] = None,
376
    break_point: Optional[AgentBreakpoint] = None,
377
    parent_snapshot: Optional[PipelineSnapshot] = None,
378
) -> PipelineSnapshot:
379
    """
380
    Create a pipeline snapshot when a chat generator breakpoint is raised or an exception during execution occurs.
381

382
    :param execution_context: The current execution context of the agent.
383
    :param agent_name: The name of the agent component if present in a pipeline.
384
    :param break_point: An optional AgentBreakpoint object. If provided, it will be used instead of creating a new one.
385
        A scenario where a new breakpoint is created is when an exception occurs during chat generation and we want to
386
        capture the state at that point.
387
    :param parent_snapshot: An optional parent PipelineSnapshot to build upon.
388
    :returns:
389
        A PipelineSnapshot containing the state of the pipeline and agent at the point of the breakpoint or exception.
390
    """
391
    if break_point is None:
1✔
392
        agent_breakpoint = AgentBreakpoint(
1✔
393
            agent_name=agent_name or "agent",
394
            break_point=Breakpoint(
395
                component_name="chat_generator",
396
                visit_count=execution_context.component_visits["chat_generator"],
397
                snapshot_file_path=_get_output_dir("pipeline_snapshot"),
398
            ),
399
        )
400
    else:
401
        agent_breakpoint = break_point
1✔
402

403
    agent_snapshot = _create_agent_snapshot(
1✔
404
        component_visits=execution_context.component_visits,
405
        agent_breakpoint=agent_breakpoint,
406
        component_inputs={
407
            "chat_generator": {
408
                "messages": execution_context.state.data["messages"],
409
                **execution_context.chat_generator_inputs,
410
            },
411
            "tool_invoker": {"messages": [], "state": execution_context.state, **execution_context.tool_invoker_inputs},
412
        },
413
    )
414
    if parent_snapshot is None:
1✔
415
        # Create an empty pipeline snapshot if no parent snapshot is provided
416
        final_snapshot = PipelineSnapshot(
1✔
417
            pipeline_state=PipelineState(inputs={}, component_visits={}, pipeline_outputs={}),
418
            timestamp=agent_snapshot.timestamp,
419
            break_point=agent_snapshot.break_point,
420
            agent_snapshot=agent_snapshot,
421
            original_input_data={},
422
            ordered_component_names=[],
423
            include_outputs_from=set(),
424
        )
425
    else:
426
        final_snapshot = replace(parent_snapshot, agent_snapshot=agent_snapshot)
1✔
427

428
    return final_snapshot
1✔
429

430

431
def _create_pipeline_snapshot_from_tool_invoker(
1✔
432
    *,
433
    execution_context: "_ExecutionContext",
434
    tool_name: Optional[str] = None,
435
    agent_name: Optional[str] = None,
436
    break_point: Optional[AgentBreakpoint] = None,
437
    parent_snapshot: Optional[PipelineSnapshot] = None,
438
) -> PipelineSnapshot:
439
    """
440
    Create a pipeline snapshot when a tool invoker breakpoint is raised or an exception during execution occurs.
441

442
    :param execution_context: The current execution context of the agent.
443
    :param tool_name: The name of the tool that triggered the breakpoint, if available.
444
    :param agent_name: The name of the agent component if present in a pipeline.
445
    :param break_point: An optional AgentBreakpoint object. If provided, it will be used instead of creating a new one.
446
        A scenario where a new breakpoint is created is when an exception occurs during tool execution and we want to
447
        capture the state at that point.
448
    :param parent_snapshot: An optional parent PipelineSnapshot to build upon.
449
    :returns:
450
        A PipelineSnapshot containing the state of the pipeline and agent at the point of the breakpoint or exception.
451
    """
452
    if break_point is None:
1✔
453
        agent_breakpoint = AgentBreakpoint(
1✔
454
            agent_name=agent_name or "agent",
455
            break_point=ToolBreakpoint(
456
                component_name="tool_invoker",
457
                visit_count=execution_context.component_visits["tool_invoker"],
458
                tool_name=tool_name,
459
                snapshot_file_path=_get_output_dir("pipeline_snapshot"),
460
            ),
461
        )
462
    else:
463
        agent_breakpoint = break_point
1✔
464

465
    messages = execution_context.state.data["messages"]
1✔
466
    agent_snapshot = _create_agent_snapshot(
1✔
467
        component_visits=execution_context.component_visits,
468
        agent_breakpoint=agent_breakpoint,
469
        component_inputs={
470
            "chat_generator": {"messages": messages[:-1], **execution_context.chat_generator_inputs},
471
            "tool_invoker": {
472
                "messages": messages[-1:],  # tool invoker consumes last msg from the chat_generator, contains tool call
473
                "state": execution_context.state,
474
                **execution_context.tool_invoker_inputs,
475
            },
476
        },
477
    )
478
    if parent_snapshot is None:
1✔
479
        # Create an empty pipeline snapshot if no parent snapshot is provided
480
        final_snapshot = PipelineSnapshot(
1✔
481
            pipeline_state=PipelineState(inputs={}, component_visits={}, pipeline_outputs={}),
482
            timestamp=agent_snapshot.timestamp,
483
            break_point=agent_snapshot.break_point,
484
            agent_snapshot=agent_snapshot,
485
            original_input_data={},
486
            ordered_component_names=[],
487
            include_outputs_from=set(),
488
        )
489
    else:
490
        final_snapshot = replace(parent_snapshot, agent_snapshot=agent_snapshot)
1✔
491

492
    return final_snapshot
1✔
493

494

495
def _trigger_chat_generator_breakpoint(*, pipeline_snapshot: PipelineSnapshot) -> None:
1✔
496
    """
497
    Trigger a breakpoint before ChatGenerator execution in Agent.
498

499
    :param pipeline_snapshot: PipelineSnapshot object containing the state of the pipeline and Agent snapshot.
500
    :raises BreakpointException: Always raised when this function is called, indicating a breakpoint has been triggered.
501
    """
502
    if not isinstance(pipeline_snapshot.break_point, AgentBreakpoint):
1✔
503
        raise ValueError("PipelineSnapshot must contain an AgentBreakpoint to trigger a chat generator breakpoint.")
×
504

505
    if not isinstance(pipeline_snapshot.agent_snapshot, AgentSnapshot):
1✔
506
        raise ValueError("PipelineSnapshot must contain an AgentSnapshot to trigger a chat generator breakpoint.")
×
507

508
    break_point = pipeline_snapshot.break_point.break_point
1✔
509
    full_file_path = _save_pipeline_snapshot(pipeline_snapshot=pipeline_snapshot)
1✔
510
    msg = (
1✔
511
        f"Breaking at {break_point.component_name} visit count "
512
        f"{pipeline_snapshot.agent_snapshot.component_visits[break_point.component_name]}"
513
    )
514
    raise BreakpointException(
1✔
515
        message=msg,
516
        component=break_point.component_name,
517
        pipeline_snapshot=pipeline_snapshot,
518
        pipeline_snapshot_file_path=full_file_path,
519
    )
520

521

522
def _trigger_tool_invoker_breakpoint(*, llm_messages: list[ChatMessage], pipeline_snapshot: PipelineSnapshot) -> None:
1✔
523
    """
524
    Check if a tool call breakpoint should be triggered before executing the tool invoker.
525

526
    :param llm_messages: List of ChatMessage objects containing potential tool calls.
527
    :param pipeline_snapshot: PipelineSnapshot object containing the state of the pipeline and Agent snapshot.
528
    :raises BreakpointException: If the breakpoint is triggered, indicating a breakpoint has been reached for a tool
529
        call.
530
    """
531
    if not pipeline_snapshot.agent_snapshot:
1✔
532
        raise ValueError("PipelineSnapshot must contain an AgentSnapshot to trigger a tool call breakpoint.")
×
533

534
    if not isinstance(pipeline_snapshot.agent_snapshot.break_point.break_point, ToolBreakpoint):
1✔
535
        return
×
536

537
    tool_breakpoint = pipeline_snapshot.agent_snapshot.break_point.break_point
1✔
538

539
    # Check if we should break for this specific tool or all tools
540
    if tool_breakpoint.tool_name is None:
1✔
541
        # Break for any tool call
542
        should_break = any(msg.tool_call for msg in llm_messages)
1✔
543
    else:
544
        # Break only for the specific tool
545
        should_break = any(
1✔
546
            tc.tool_name == tool_breakpoint.tool_name for msg in llm_messages for tc in msg.tool_calls or []
547
        )
548

549
    if not should_break:
1✔
550
        return  # No breakpoint triggered
1✔
551

552
    full_file_path = _save_pipeline_snapshot(pipeline_snapshot=pipeline_snapshot)
1✔
553

554
    msg = (
1✔
555
        f"Breaking at {tool_breakpoint.component_name} visit count "
556
        f"{pipeline_snapshot.agent_snapshot.component_visits[tool_breakpoint.component_name]}"
557
    )
558
    if tool_breakpoint.tool_name:
1✔
559
        msg += f" for tool {tool_breakpoint.tool_name}"
1✔
560
    logger.info(msg)
1✔
561

562
    raise BreakpointException(
1✔
563
        message=msg,
564
        component=tool_breakpoint.component_name,
565
        pipeline_snapshot=pipeline_snapshot,
566
        pipeline_snapshot_file_path=full_file_path,
567
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc