• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 17432604695

03 Sep 2025 11:54AM UTC coverage: 92.051% (-0.03%) from 92.079%
17432604695

Pull #9759

github

web-flow
Merge a59e01427 into f48789f5f
Pull Request #9759: feat: Add PipelineTool to streamline using Pipeline as Tools with Agent

12970 of 14090 relevant lines covered (92.05%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.27
haystack/core/pipeline/pipeline.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
from copy import deepcopy
1✔
6
from typing import Any, Mapping, Optional, Union
1✔
7

8
from haystack import logging, tracing
1✔
9
from haystack.core.component import Component
1✔
10
from haystack.core.errors import BreakpointException, PipelineInvalidPipelineSnapshotError, PipelineRuntimeError
1✔
11
from haystack.core.pipeline.base import (
1✔
12
    _COMPONENT_INPUT,
13
    _COMPONENT_OUTPUT,
14
    _COMPONENT_VISITS,
15
    ComponentPriority,
16
    PipelineBase,
17
)
18
from haystack.core.pipeline.breakpoint import (
1✔
19
    _create_pipeline_snapshot,
20
    _save_pipeline_snapshot,
21
    _trigger_break_point,
22
    _validate_break_point_against_pipeline,
23
    _validate_pipeline_snapshot_against_pipeline,
24
)
25
from haystack.core.pipeline.utils import _deepcopy_with_exceptions
1✔
26
from haystack.dataclasses.breakpoints import AgentBreakpoint, Breakpoint, PipelineSnapshot
1✔
27
from haystack.telemetry import pipeline_running
1✔
28
from haystack.utils import _deserialize_value_with_schema
1✔
29
from haystack.utils.misc import _get_output_dir
1✔
30

31
logger = logging.getLogger(__name__)
1✔
32

33

34
class Pipeline(PipelineBase):
1✔
35
    """
36
    Synchronous version of the orchestration engine.
37

38
    Orchestrates component execution according to the execution graph, one after the other.
39
    """
40

41
    @staticmethod
1✔
42
    def _run_component(
1✔
43
        component_name: str,
44
        component: dict[str, Any],
45
        inputs: dict[str, Any],
46
        component_visits: dict[str, int],
47
        parent_span: Optional[tracing.Span] = None,
48
    ) -> Mapping[str, Any]:
49
        """
50
        Runs a Component with the given inputs.
51

52
        :param component_name: Name of the Component.
53
        :param component: Component with component metadata.
54
        :param inputs: Inputs for the Component.
55
        :param component_visits: Current state of component visits.
56
        :param parent_span: The parent span to use for the newly created span.
57
            This is to allow tracing to be correctly linked to the pipeline run.
58
        :raises PipelineRuntimeError: If Component doesn't return a dictionary.
59
        :return: The output of the Component.
60
        """
61
        instance: Component = component["instance"]
1✔
62

63
        with PipelineBase._create_component_span(
1✔
64
            component_name=component_name, instance=instance, inputs=inputs, parent_span=parent_span
65
        ) as span:
66
            # We deepcopy the inputs otherwise we might lose that information
67
            # when we delete them in case they're sent to other Components
68
            span.set_content_tag(_COMPONENT_INPUT, _deepcopy_with_exceptions(inputs))
1✔
69
            logger.info("Running component {component_name}", component_name=component_name)
1✔
70

71
            try:
1✔
72
                component_output = instance.run(**inputs)
1✔
73
            except BreakpointException as error:
1✔
74
                # Re-raise BreakpointException to preserve the original exception context
75
                # This is important when Agent components internally use Pipeline._run_component
76
                # and trigger breakpoints that need to bubble up to the main pipeline
77
                raise error
1✔
78
            except Exception as error:
1✔
79
                raise PipelineRuntimeError.from_exception(component_name, instance.__class__, error) from error
1✔
80

81
            component_visits[component_name] += 1
1✔
82

83
            if not isinstance(component_output, Mapping):
1✔
84
                raise PipelineRuntimeError.from_invalid_output(component_name, instance.__class__, component_output)
1✔
85

86
            span.set_tag(_COMPONENT_VISITS, component_visits[component_name])
1✔
87
            span.set_content_tag(_COMPONENT_OUTPUT, component_output)
1✔
88

89
            return component_output
1✔
90

91
    def run(  # noqa: PLR0915, PLR0912, C901, pylint: disable=too-many-branches
1✔
92
        self,
93
        data: dict[str, Any],
94
        include_outputs_from: Optional[set[str]] = None,
95
        *,
96
        break_point: Optional[Union[Breakpoint, AgentBreakpoint]] = None,
97
        pipeline_snapshot: Optional[PipelineSnapshot] = None,
98
    ) -> dict[str, Any]:
99
        """
100
        Runs the Pipeline with given input data.
101

102
        Usage:
103
        ```python
104
        from haystack import Pipeline, Document
105
        from haystack.utils import Secret
106
        from haystack.document_stores.in_memory import InMemoryDocumentStore
107
        from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
108
        from haystack.components.generators import OpenAIGenerator
109
        from haystack.components.builders.answer_builder import AnswerBuilder
110
        from haystack.components.builders.prompt_builder import PromptBuilder
111

112
        # Write documents to InMemoryDocumentStore
113
        document_store = InMemoryDocumentStore()
114
        document_store.write_documents([
115
            Document(content="My name is Jean and I live in Paris."),
116
            Document(content="My name is Mark and I live in Berlin."),
117
            Document(content="My name is Giorgio and I live in Rome.")
118
        ])
119

120
        prompt_template = \"\"\"
121
        Given these documents, answer the question.
122
        Documents:
123
        {% for doc in documents %}
124
            {{ doc.content }}
125
        {% endfor %}
126
        Question: {{question}}
127
        Answer:
128
        \"\"\"
129

130
        retriever = InMemoryBM25Retriever(document_store=document_store)
131
        prompt_builder = PromptBuilder(template=prompt_template)
132
        llm = OpenAIGenerator(api_key=Secret.from_token(api_key))
133

134
        rag_pipeline = Pipeline()
135
        rag_pipeline.add_component("retriever", retriever)
136
        rag_pipeline.add_component("prompt_builder", prompt_builder)
137
        rag_pipeline.add_component("llm", llm)
138
        rag_pipeline.connect("retriever", "prompt_builder.documents")
139
        rag_pipeline.connect("prompt_builder", "llm")
140

141
        # Ask a question
142
        question = "Who lives in Paris?"
143
        results = rag_pipeline.run(
144
            {
145
                "retriever": {"query": question},
146
                "prompt_builder": {"question": question},
147
            }
148
        )
149

150
        print(results["llm"]["replies"])
151
        # Jean lives in Paris
152
        ```
153

154
        :param data:
155
            A dictionary of inputs for the pipeline's components. Each key is a component name
156
            and its value is a dictionary of that component's input parameters:
157
            ```
158
            data = {
159
                "comp1": {"input1": 1, "input2": 2},
160
            }
161
            ```
162
            For convenience, this format is also supported when input names are unique:
163
            ```
164
            data = {
165
                "input1": 1, "input2": 2,
166
            }
167
            ```
168
        :param include_outputs_from:
169
            Set of component names whose individual outputs are to be
170
            included in the pipeline's output. For components that are
171
            invoked multiple times (in a loop), only the last-produced
172
            output is included.
173

174
        :param break_point:
175
            A set of breakpoints that can be used to debug the pipeline execution.
176

177
        :param pipeline_snapshot:
178
            A dictionary containing a snapshot of a previously saved pipeline execution.
179

180
        :returns:
181
            A dictionary where each entry corresponds to a component name
182
            and its output. If `include_outputs_from` is `None`, this dictionary
183
            will only contain the outputs of leaf components, i.e., components
184
            without outgoing connections.
185

186
        :raises ValueError:
187
            If invalid inputs are provided to the pipeline.
188
        :raises PipelineRuntimeError:
189
            If the Pipeline contains cycles with unsupported connections that would cause
190
            it to get stuck and fail running.
191
            Or if a Component fails or returns output in an unsupported type.
192
        :raises PipelineMaxComponentRuns:
193
            If a Component reaches the maximum number of times it can be run in this Pipeline.
194
        :raises PipelineBreakpointException:
195
            When a pipeline_breakpoint is triggered. Contains the component name, state, and partial results.
196
        """
197
        pipeline_running(self)
1✔
198

199
        if break_point and pipeline_snapshot:
1✔
200
            msg = (
×
201
                "pipeline_breakpoint and pipeline_snapshot cannot be provided at the same time. "
202
                "The pipeline run will be aborted."
203
            )
204
            raise PipelineInvalidPipelineSnapshotError(message=msg)
×
205

206
        # make sure all breakpoints are valid, i.e. reference components in the pipeline
207
        if break_point:
1✔
208
            _validate_break_point_against_pipeline(break_point, self.graph)
1✔
209

210
        # TODO: Remove this warmup once we can check reliably whether a component has been warmed up or not
211
        # As of now it's here to make sure we don't have failing tests that assume warm_up() is called in run()
212
        self.warm_up()
1✔
213

214
        if include_outputs_from is None:
1✔
215
            include_outputs_from = set()
1✔
216

217
        pipeline_outputs: dict[str, Any] = {}
1✔
218

219
        if not pipeline_snapshot:
1✔
220
            # normalize `data`
221
            data = self._prepare_component_input_data(data)
1✔
222

223
            # Raise ValueError if input is malformed in some way
224
            self.validate_input(data)
1✔
225

226
            # We create a list of components in the pipeline sorted by name, so that the algorithm runs
227
            # deterministically and independent of insertion order into the pipeline.
228
            ordered_component_names = sorted(self.graph.nodes.keys())
1✔
229

230
            # We track component visits to decide if a component can run.
231
            component_visits = dict.fromkeys(ordered_component_names, 0)
1✔
232

233
        else:
234
            # Validate the pipeline snapshot against the current pipeline graph
235
            _validate_pipeline_snapshot_against_pipeline(pipeline_snapshot, self.graph)
1✔
236

237
            # Handle resuming the pipeline from a snapshot
238
            component_visits = pipeline_snapshot.pipeline_state.component_visits
1✔
239
            ordered_component_names = pipeline_snapshot.ordered_component_names
1✔
240
            data = _deserialize_value_with_schema(pipeline_snapshot.pipeline_state.inputs)
1✔
241

242
            # include_outputs_from from the snapshot when resuming
243
            include_outputs_from = pipeline_snapshot.include_outputs_from
1✔
244

245
            # also intermediate_outputs from the snapshot when resuming
246
            pipeline_outputs = pipeline_snapshot.pipeline_state.pipeline_outputs
1✔
247

248
        cached_topological_sort = None
1✔
249
        # We need to access a component's receivers multiple times during a pipeline run.
250
        # We store them here for easy access.
251
        cached_receivers = {name: self._find_receivers_from(name) for name in ordered_component_names}
1✔
252

253
        with tracing.tracer.trace(
1✔
254
            "haystack.pipeline.run",
255
            tags={
256
                "haystack.pipeline.input_data": data,
257
                "haystack.pipeline.output_data": pipeline_outputs,
258
                "haystack.pipeline.metadata": self.metadata,
259
                "haystack.pipeline.max_runs_per_component": self._max_runs_per_component,
260
            },
261
        ) as span:
262
            inputs = self._convert_to_internal_format(pipeline_inputs=data)
1✔
263
            priority_queue = self._fill_queue(ordered_component_names, inputs, component_visits)
1✔
264

265
            # check if pipeline is blocked before execution
266
            self.validate_pipeline(priority_queue)
1✔
267

268
            while True:
269
                candidate = self._get_next_runnable_component(priority_queue, component_visits)
1✔
270

271
                # If there are no runnable components left, we can exit the loop
272
                if candidate is None:
1✔
273
                    break
×
274

275
                priority, component_name, component = candidate
1✔
276

277
                # If the next component is blocked, we do a check to see if the pipeline is possibly blocked and raise
278
                # a warning if it is.
279
                if priority == ComponentPriority.BLOCKED:
1✔
280
                    if self._is_pipeline_possibly_blocked(current_pipeline_outputs=pipeline_outputs):
1✔
281
                        # Pipeline is most likely blocked (most likely a configuration issue) so we raise a warning.
282
                        logger.warning(
×
283
                            "Cannot run pipeline - the next component that is meant to run is blocked.\n"
284
                            "Component name: '{component_name}'\n"
285
                            "Component type: '{component_type}'\n"
286
                            "This typically happens when the component is unable to receive all of its required "
287
                            "inputs.\nCheck the connections to this component and ensure all required inputs are "
288
                            "provided.",
289
                            component_name=component_name,
290
                            component_type=component["instance"].__class__.__name__,
291
                        )
292
                    # We always exit the loop since we cannot run the next component.
293
                    break
×
294

295
                if len(priority_queue) > 0 and priority in [ComponentPriority.DEFER, ComponentPriority.DEFER_LAST]:
1✔
296
                    component_name, topological_sort = self._tiebreak_waiting_components(
1✔
297
                        component_name=component_name,
298
                        priority=priority,
299
                        priority_queue=priority_queue,
300
                        topological_sort=cached_topological_sort,
301
                    )
302

303
                    cached_topological_sort = topological_sort
1✔
304
                    component = self._get_component_with_graph_metadata_and_visits(
1✔
305
                        component_name, component_visits[component_name]
306
                    )
307

308
                if pipeline_snapshot:
1✔
309
                    if isinstance(pipeline_snapshot.break_point, AgentBreakpoint):
1✔
310
                        name_to_check = pipeline_snapshot.break_point.agent_name
1✔
311
                    else:
312
                        name_to_check = pipeline_snapshot.break_point.component_name
×
313
                    is_resume = name_to_check == component_name
1✔
314
                else:
315
                    is_resume = False
1✔
316
                component_inputs = self._consume_component_inputs(
1✔
317
                    component_name=component_name, component=component, inputs=inputs, is_resume=is_resume
318
                )
319

320
                # We need to add missing defaults using default values from input sockets because the run signature
321
                # might not provide these defaults for components with inputs defined dynamically upon component
322
                # initialization
323
                component_inputs = self._add_missing_input_defaults(component_inputs, component["input_sockets"])
1✔
324

325
                # Scenario 1: Pipeline snapshot is provided to resume the pipeline at a specific component
326
                # Deserialize the component_inputs if they are passed in the pipeline_snapshot.
327
                # this check will prevent other component_inputs generated at runtime from being deserialized
328
                if pipeline_snapshot and component_name in pipeline_snapshot.pipeline_state.inputs.keys():
1✔
329
                    for key, value in component_inputs.items():
×
330
                        component_inputs[key] = _deserialize_value_with_schema(value)
×
331

332
                # If we are resuming from an AgentBreakpoint, we inject the agent_snapshot into the Agents inputs
333
                if (
1✔
334
                    pipeline_snapshot
335
                    and isinstance(pipeline_snapshot.break_point, AgentBreakpoint)
336
                    and component_name == pipeline_snapshot.break_point.agent_name
337
                ):
338
                    component_inputs["snapshot"] = pipeline_snapshot.agent_snapshot
1✔
339
                    component_inputs["break_point"] = None
1✔
340

341
                # Scenario 2: A breakpoint is provided to stop the pipeline at a specific component
342
                if break_point:
1✔
343
                    should_trigger_breakpoint = False
1✔
344
                    should_create_snapshot = False
1✔
345

346
                    # Scenario 2.1: an AgentBreakpoint is provided to stop the pipeline at a specific component
347
                    if isinstance(break_point, AgentBreakpoint) and component_name == break_point.agent_name:
1✔
348
                        should_create_snapshot = True
1✔
349
                        component_inputs["break_point"] = break_point
1✔
350

351
                    # Scenario 2.2: a regular breakpoint is provided to stop the pipeline at a specific component and
352
                    # visit count
353
                    elif (
1✔
354
                        isinstance(break_point, Breakpoint)
355
                        and break_point.component_name == component_name
356
                        and break_point.visit_count == component_visits[component_name]
357
                    ):
358
                        should_trigger_breakpoint = True
1✔
359
                        should_create_snapshot = True
1✔
360

361
                    if should_create_snapshot:
1✔
362
                        pipeline_snapshot_inputs_serialised = deepcopy(inputs)
1✔
363
                        pipeline_snapshot_inputs_serialised[component_name] = deepcopy(component_inputs)
1✔
364
                        new_pipeline_snapshot = _create_pipeline_snapshot(
1✔
365
                            inputs=pipeline_snapshot_inputs_serialised,
366
                            break_point=break_point,
367
                            component_visits=component_visits,
368
                            original_input_data=data,
369
                            ordered_component_names=ordered_component_names,
370
                            include_outputs_from=include_outputs_from,
371
                            pipeline_outputs=pipeline_outputs,
372
                        )
373

374
                        # add the parent_snapshot to agent inputs if needed
375
                        if isinstance(break_point, AgentBreakpoint) and component_name == break_point.agent_name:
1✔
376
                            component_inputs["parent_snapshot"] = new_pipeline_snapshot
1✔
377

378
                        # trigger the breakpoint if needed
379
                        if should_trigger_breakpoint:
1✔
380
                            _trigger_break_point(
1✔
381
                                pipeline_snapshot=new_pipeline_snapshot, pipeline_outputs=pipeline_outputs
382
                            )
383

384
                try:
1✔
385
                    component_outputs = self._run_component(
1✔
386
                        component_name=component_name,
387
                        component=component,
388
                        inputs=component_inputs,  # the inputs to the current component
389
                        component_visits=component_visits,
390
                        parent_span=span,
391
                    )
392
                except PipelineRuntimeError as error:
1✔
393
                    # Create a snapshot of the last good state of the pipeline before the error occurred.
394
                    pipeline_snapshot_inputs_serialised = deepcopy(inputs)
1✔
395
                    pipeline_snapshot_inputs_serialised[component_name] = deepcopy(component_inputs)
1✔
396
                    out_dir = _get_output_dir("pipeline_snapshot")
1✔
397
                    break_point = Breakpoint(
1✔
398
                        component_name=component_name,
399
                        visit_count=component_visits[component_name],
400
                        snapshot_file_path=out_dir,
401
                    )
402
                    last_good_state_snapshot = _create_pipeline_snapshot(
1✔
403
                        inputs=pipeline_snapshot_inputs_serialised,
404
                        break_point=break_point,
405
                        component_visits=component_visits,
406
                        original_input_data=data,
407
                        ordered_component_names=ordered_component_names,
408
                        include_outputs_from=include_outputs_from,
409
                        pipeline_outputs=pipeline_outputs,
410
                    )
411
                    # Attach the last good state snapshot to the error before re-raising and saving to disk
412
                    error.pipeline_snapshot = last_good_state_snapshot
1✔
413
                    try:
1✔
414
                        _save_pipeline_snapshot(pipeline_snapshot=last_good_state_snapshot)
1✔
415
                        logger.info(
1✔
416
                            "Saved a snapshot of the pipeline's last valid state to '{out_path}'. "
417
                            "Review this snapshot to debug the error and resume the pipeline from here.",
418
                            out_path=out_dir,
419
                        )
420
                    except Exception as save_error:
×
421
                        logger.error(
×
422
                            "Failed to save a snapshot of the pipeline's last valid state with error: {e}", e=save_error
423
                        )
424
                    raise error
1✔
425

426
                # Updates global input state with component outputs and returns outputs that should go to
427
                # pipeline outputs.
428
                component_pipeline_outputs = self._write_component_outputs(
1✔
429
                    component_name=component_name,
430
                    component_outputs=component_outputs,
431
                    inputs=inputs,
432
                    receivers=cached_receivers[component_name],
433
                    include_outputs_from=include_outputs_from,
434
                )
435

436
                if component_pipeline_outputs:
1✔
437
                    pipeline_outputs[component_name] = deepcopy(component_pipeline_outputs)
1✔
438
                if self._is_queue_stale(priority_queue):
1✔
439
                    priority_queue = self._fill_queue(ordered_component_names, inputs, component_visits)
1✔
440

441
            if isinstance(break_point, Breakpoint):
1✔
442
                logger.warning(
×
443
                    "The given breakpoint {break_point} was never triggered. This is because:\n"
444
                    "1. The provided component is not a part of the pipeline execution path.\n"
445
                    "2. The component did not reach the visit count specified in the pipeline_breakpoint",
446
                    pipeline_breakpoint=break_point,
447
                )
448

449
            return pipeline_outputs
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc