• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 18592817487

17 Oct 2025 12:33PM UTC coverage: 92.2% (+0.1%) from 92.062%
18592817487

Pull #9859

github

web-flow
Merge f20ff2b98 into a43c47b63
Pull Request #9859: feat: Add FallbackChatGenerator

13346 of 14475 relevant lines covered (92.2%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.44
haystack/core/pipeline/pipeline.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
from copy import deepcopy
1✔
6
from typing import Any, Mapping, Optional, Union
1✔
7

8
from haystack import logging, tracing
1✔
9
from haystack.core.component import Component
1✔
10
from haystack.core.errors import BreakpointException, PipelineInvalidPipelineSnapshotError, PipelineRuntimeError
1✔
11
from haystack.core.pipeline.base import (
1✔
12
    _COMPONENT_INPUT,
13
    _COMPONENT_OUTPUT,
14
    _COMPONENT_VISITS,
15
    ComponentPriority,
16
    PipelineBase,
17
)
18
from haystack.core.pipeline.breakpoint import (
1✔
19
    _create_pipeline_snapshot,
20
    _save_pipeline_snapshot,
21
    _trigger_break_point,
22
    _validate_break_point_against_pipeline,
23
    _validate_pipeline_snapshot_against_pipeline,
24
)
25
from haystack.core.pipeline.utils import _deepcopy_with_exceptions
1✔
26
from haystack.dataclasses.breakpoints import AgentBreakpoint, Breakpoint, PipelineSnapshot
1✔
27
from haystack.telemetry import pipeline_running
1✔
28
from haystack.utils import _deserialize_value_with_schema
1✔
29
from haystack.utils.misc import _get_output_dir
1✔
30

31
logger = logging.getLogger(__name__)
1✔
32

33

34
class Pipeline(PipelineBase):
1✔
35
    """
36
    Synchronous version of the orchestration engine.
37

38
    Orchestrates component execution according to the execution graph, one after the other.
39
    """
40

41
    @staticmethod
1✔
42
    def _run_component(
1✔
43
        component_name: str,
44
        component: dict[str, Any],
45
        inputs: dict[str, Any],
46
        component_visits: dict[str, int],
47
        parent_span: Optional[tracing.Span] = None,
48
    ) -> Mapping[str, Any]:
49
        """
50
        Runs a Component with the given inputs.
51

52
        :param component_name: Name of the Component.
53
        :param component: Component with component metadata.
54
        :param inputs: Inputs for the Component.
55
        :param component_visits: Current state of component visits.
56
        :param parent_span: The parent span to use for the newly created span.
57
            This is to allow tracing to be correctly linked to the pipeline run.
58
        :raises PipelineRuntimeError: If Component doesn't return a dictionary.
59
        :return: The output of the Component.
60
        """
61
        instance: Component = component["instance"]
1✔
62

63
        with PipelineBase._create_component_span(
1✔
64
            component_name=component_name, instance=instance, inputs=inputs, parent_span=parent_span
65
        ) as span:
66
            # We deepcopy the inputs otherwise we might lose that information
67
            # when we delete them in case they're sent to other Components
68
            span.set_content_tag(_COMPONENT_INPUT, _deepcopy_with_exceptions(inputs))
1✔
69
            logger.info("Running component {component_name}", component_name=component_name)
1✔
70

71
            try:
1✔
72
                component_output = instance.run(**inputs)
1✔
73
            except BreakpointException as error:
1✔
74
                # Re-raise BreakpointException to preserve the original exception context
75
                # This is important when Agent components internally use Pipeline._run_component
76
                # and trigger breakpoints that need to bubble up to the main pipeline
77
                raise error
1✔
78

79
            # Any components that internally use Pipeline._run_component could raise a PipelineRuntimeError with
80
            # additional context (e.g. Agent raises an agent snapshot) so we re-raise here instead of wrapping it in
81
            # another PipelineRuntimeError
82

83
            except PipelineRuntimeError as runtime_error:
1✔
84
                raise runtime_error
1✔
85

86
            # Catch all other exceptions and wrap them in a PipelineRuntimeError
87
            except Exception as error:
1✔
88
                raise PipelineRuntimeError.from_exception(component_name, instance.__class__, error) from error
1✔
89

90
            component_visits[component_name] += 1
1✔
91

92
            if not isinstance(component_output, Mapping):
1✔
93
                raise PipelineRuntimeError.from_invalid_output(component_name, instance.__class__, component_output)
1✔
94

95
            span.set_tag(_COMPONENT_VISITS, component_visits[component_name])
1✔
96
            span.set_content_tag(_COMPONENT_OUTPUT, component_output)
1✔
97

98
            return component_output
1✔
99

100
    def run(  # noqa: PLR0915, PLR0912, C901, pylint: disable=too-many-branches
1✔
101
        self,
102
        data: dict[str, Any],
103
        include_outputs_from: Optional[set[str]] = None,
104
        *,
105
        break_point: Optional[Union[Breakpoint, AgentBreakpoint]] = None,
106
        pipeline_snapshot: Optional[PipelineSnapshot] = None,
107
    ) -> dict[str, Any]:
108
        """
109
        Runs the Pipeline with given input data.
110

111
        Usage:
112
        ```python
113
        from haystack import Pipeline, Document
114
        from haystack.utils import Secret
115
        from haystack.document_stores.in_memory import InMemoryDocumentStore
116
        from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
117
        from haystack.components.generators import OpenAIGenerator
118
        from haystack.components.builders.answer_builder import AnswerBuilder
119
        from haystack.components.builders.prompt_builder import PromptBuilder
120

121
        # Write documents to InMemoryDocumentStore
122
        document_store = InMemoryDocumentStore()
123
        document_store.write_documents([
124
            Document(content="My name is Jean and I live in Paris."),
125
            Document(content="My name is Mark and I live in Berlin."),
126
            Document(content="My name is Giorgio and I live in Rome.")
127
        ])
128

129
        prompt_template = \"\"\"
130
        Given these documents, answer the question.
131
        Documents:
132
        {% for doc in documents %}
133
            {{ doc.content }}
134
        {% endfor %}
135
        Question: {{question}}
136
        Answer:
137
        \"\"\"
138

139
        retriever = InMemoryBM25Retriever(document_store=document_store)
140
        prompt_builder = PromptBuilder(template=prompt_template)
141
        llm = OpenAIGenerator(api_key=Secret.from_token(api_key))
142

143
        rag_pipeline = Pipeline()
144
        rag_pipeline.add_component("retriever", retriever)
145
        rag_pipeline.add_component("prompt_builder", prompt_builder)
146
        rag_pipeline.add_component("llm", llm)
147
        rag_pipeline.connect("retriever", "prompt_builder.documents")
148
        rag_pipeline.connect("prompt_builder", "llm")
149

150
        # Ask a question
151
        question = "Who lives in Paris?"
152
        results = rag_pipeline.run(
153
            {
154
                "retriever": {"query": question},
155
                "prompt_builder": {"question": question},
156
            }
157
        )
158

159
        print(results["llm"]["replies"])
160
        # Jean lives in Paris
161
        ```
162

163
        :param data:
164
            A dictionary of inputs for the pipeline's components. Each key is a component name
165
            and its value is a dictionary of that component's input parameters:
166
            ```
167
            data = {
168
                "comp1": {"input1": 1, "input2": 2},
169
            }
170
            ```
171
            For convenience, this format is also supported when input names are unique:
172
            ```
173
            data = {
174
                "input1": 1, "input2": 2,
175
            }
176
            ```
177
        :param include_outputs_from:
178
            Set of component names whose individual outputs are to be
179
            included in the pipeline's output. For components that are
180
            invoked multiple times (in a loop), only the last-produced
181
            output is included.
182

183
        :param break_point:
184
            A set of breakpoints that can be used to debug the pipeline execution.
185

186
        :param pipeline_snapshot:
187
            A dictionary containing a snapshot of a previously saved pipeline execution.
188

189
        :returns:
190
            A dictionary where each entry corresponds to a component name
191
            and its output. If `include_outputs_from` is `None`, this dictionary
192
            will only contain the outputs of leaf components, i.e., components
193
            without outgoing connections.
194

195
        :raises ValueError:
196
            If invalid inputs are provided to the pipeline.
197
        :raises PipelineRuntimeError:
198
            If the Pipeline contains cycles with unsupported connections that would cause
199
            it to get stuck and fail running.
200
            Or if a Component fails or returns output in an unsupported type.
201
        :raises PipelineMaxComponentRuns:
202
            If a Component reaches the maximum number of times it can be run in this Pipeline.
203
        :raises PipelineBreakpointException:
204
            When a pipeline_breakpoint is triggered. Contains the component name, state, and partial results.
205
        """
206
        pipeline_running(self)
1✔
207

208
        if break_point and pipeline_snapshot:
1✔
209
            msg = (
×
210
                "pipeline_breakpoint and pipeline_snapshot cannot be provided at the same time. "
211
                "The pipeline run will be aborted."
212
            )
213
            raise PipelineInvalidPipelineSnapshotError(message=msg)
×
214

215
        # make sure all breakpoints are valid, i.e. reference components in the pipeline
216
        if break_point:
1✔
217
            _validate_break_point_against_pipeline(break_point, self.graph)
1✔
218

219
        # TODO: Remove this warmup once we can check reliably whether a component has been warmed up or not
220
        # As of now it's here to make sure we don't have failing tests that assume warm_up() is called in run()
221
        self.warm_up()
1✔
222

223
        if include_outputs_from is None:
1✔
224
            include_outputs_from = set()
1✔
225

226
        pipeline_outputs: dict[str, Any] = {}
1✔
227

228
        if not pipeline_snapshot:
1✔
229
            # normalize `data`
230
            data = self._prepare_component_input_data(data)
1✔
231

232
            # Raise ValueError if input is malformed in some way
233
            self.validate_input(data)
1✔
234

235
            # We create a list of components in the pipeline sorted by name, so that the algorithm runs
236
            # deterministically and independent of insertion order into the pipeline.
237
            ordered_component_names = sorted(self.graph.nodes.keys())
1✔
238

239
            # We track component visits to decide if a component can run.
240
            component_visits = dict.fromkeys(ordered_component_names, 0)
1✔
241

242
        else:
243
            # Validate the pipeline snapshot against the current pipeline graph
244
            _validate_pipeline_snapshot_against_pipeline(pipeline_snapshot, self.graph)
1✔
245

246
            # Handle resuming the pipeline from a snapshot
247
            component_visits = pipeline_snapshot.pipeline_state.component_visits
1✔
248
            ordered_component_names = pipeline_snapshot.ordered_component_names
1✔
249
            data = _deserialize_value_with_schema(pipeline_snapshot.pipeline_state.inputs)
1✔
250

251
            # include_outputs_from from the snapshot when resuming
252
            include_outputs_from = pipeline_snapshot.include_outputs_from
1✔
253

254
            # also intermediate_outputs from the snapshot when resuming
255
            pipeline_outputs = pipeline_snapshot.pipeline_state.pipeline_outputs
1✔
256

257
        cached_topological_sort = None
1✔
258
        # We need to access a component's receivers multiple times during a pipeline run.
259
        # We store them here for easy access.
260
        cached_receivers = {name: self._find_receivers_from(name) for name in ordered_component_names}
1✔
261

262
        with tracing.tracer.trace(
1✔
263
            "haystack.pipeline.run",
264
            tags={
265
                "haystack.pipeline.input_data": data,
266
                "haystack.pipeline.output_data": pipeline_outputs,
267
                "haystack.pipeline.metadata": self.metadata,
268
                "haystack.pipeline.max_runs_per_component": self._max_runs_per_component,
269
            },
270
        ) as span:
271
            inputs = self._convert_to_internal_format(pipeline_inputs=data)
1✔
272
            priority_queue = self._fill_queue(ordered_component_names, inputs, component_visits)
1✔
273

274
            # check if pipeline is blocked before execution
275
            self.validate_pipeline(priority_queue)
1✔
276

277
            while True:
278
                candidate = self._get_next_runnable_component(priority_queue, component_visits)
1✔
279

280
                # If there are no runnable components left, we can exit the loop
281
                if candidate is None:
1✔
282
                    break
×
283

284
                priority, component_name, component = candidate
1✔
285

286
                # If the next component is blocked, we do a check to see if the pipeline is possibly blocked and raise
287
                # a warning if it is.
288
                if priority == ComponentPriority.BLOCKED:
1✔
289
                    if self._is_pipeline_possibly_blocked(current_pipeline_outputs=pipeline_outputs):
1✔
290
                        # Pipeline is most likely blocked (most likely a configuration issue) so we raise a warning.
291
                        logger.warning(
×
292
                            "Cannot run pipeline - the next component that is meant to run is blocked.\n"
293
                            "Component name: '{component_name}'\n"
294
                            "Component type: '{component_type}'\n"
295
                            "This typically happens when the component is unable to receive all of its required "
296
                            "inputs.\nCheck the connections to this component and ensure all required inputs are "
297
                            "provided.",
298
                            component_name=component_name,
299
                            component_type=component["instance"].__class__.__name__,
300
                        )
301
                    # We always exit the loop since we cannot run the next component.
302
                    break
×
303

304
                if len(priority_queue) > 0 and priority in [ComponentPriority.DEFER, ComponentPriority.DEFER_LAST]:
1✔
305
                    component_name, topological_sort = self._tiebreak_waiting_components(
1✔
306
                        component_name=component_name,
307
                        priority=priority,
308
                        priority_queue=priority_queue,
309
                        topological_sort=cached_topological_sort,
310
                    )
311

312
                    cached_topological_sort = topological_sort
1✔
313
                    component = self._get_component_with_graph_metadata_and_visits(
1✔
314
                        component_name, component_visits[component_name]
315
                    )
316

317
                if pipeline_snapshot:
1✔
318
                    if isinstance(pipeline_snapshot.break_point, AgentBreakpoint):
1✔
319
                        name_to_check = pipeline_snapshot.break_point.agent_name
1✔
320
                    else:
321
                        name_to_check = pipeline_snapshot.break_point.component_name
×
322
                    is_resume = name_to_check == component_name
1✔
323
                else:
324
                    is_resume = False
1✔
325
                component_inputs = self._consume_component_inputs(
1✔
326
                    component_name=component_name, component=component, inputs=inputs, is_resume=is_resume
327
                )
328

329
                # We need to add missing defaults using default values from input sockets because the run signature
330
                # might not provide these defaults for components with inputs defined dynamically upon component
331
                # initialization
332
                component_inputs = self._add_missing_input_defaults(component_inputs, component["input_sockets"])
1✔
333

334
                # Scenario 1: Pipeline snapshot is provided to resume the pipeline at a specific component
335
                # Deserialize the component_inputs if they are passed in the pipeline_snapshot.
336
                # this check will prevent other component_inputs generated at runtime from being deserialized
337
                if pipeline_snapshot:
1✔
338
                    if component_name in pipeline_snapshot.pipeline_state.inputs.keys():
1✔
339
                        for key, value in component_inputs.items():
×
340
                            component_inputs[key] = _deserialize_value_with_schema(value)
×
341

342
                    # If we are resuming from an AgentBreakpoint, we inject the agent_snapshot into the Agents inputs
343
                    if (
1✔
344
                        isinstance(pipeline_snapshot.break_point, AgentBreakpoint)
345
                        and component_name == pipeline_snapshot.break_point.agent_name
346
                    ):
347
                        component_inputs["snapshot"] = pipeline_snapshot.agent_snapshot
1✔
348
                        component_inputs["break_point"] = None
1✔
349

350
                # Scenario 2: A break point is provided to stop the pipeline at a specific component
351
                component_break_point_triggered = (
1✔
352
                    break_point
353
                    and isinstance(break_point, Breakpoint)
354
                    and break_point.component_name == component_name
355
                    and break_point.visit_count == component_visits[component_name]
356
                )
357
                agent_break_point_triggered = (
1✔
358
                    break_point
359
                    and isinstance(break_point, AgentBreakpoint)
360
                    and component_name == break_point.agent_name
361
                )
362
                if break_point and (component_break_point_triggered or agent_break_point_triggered):
1✔
363
                    new_pipeline_snapshot = _create_pipeline_snapshot(
1✔
364
                        inputs=deepcopy(inputs),
365
                        component_inputs=deepcopy(component_inputs),
366
                        break_point=break_point,
367
                        component_visits=component_visits,
368
                        original_input_data=data,
369
                        ordered_component_names=ordered_component_names,
370
                        include_outputs_from=include_outputs_from,
371
                        pipeline_outputs=pipeline_outputs,
372
                    )
373

374
                    # An AgentBreakpoint is provided to stop the pipeline at an Agent component so we pass on the
375
                    # break point and snapshot to the Agent's inputs
376
                    if agent_break_point_triggered:
1✔
377
                        component_inputs["break_point"] = break_point
1✔
378
                        component_inputs["parent_snapshot"] = new_pipeline_snapshot
1✔
379

380
                    # trigger the break point if needed
381
                    if component_break_point_triggered:
1✔
382
                        _trigger_break_point(pipeline_snapshot=new_pipeline_snapshot)
1✔
383

384
                try:
1✔
385
                    component_outputs = self._run_component(
1✔
386
                        component_name=component_name,
387
                        component=component,
388
                        inputs=component_inputs,  # the inputs to the current component
389
                        component_visits=component_visits,
390
                        parent_span=span,
391
                    )
392
                except PipelineRuntimeError as error:
1✔
393
                    out_dir = _get_output_dir("pipeline_snapshot")
1✔
394
                    break_point = Breakpoint(
1✔
395
                        component_name=component_name,
396
                        visit_count=component_visits[component_name],
397
                        snapshot_file_path=out_dir,
398
                    )
399

400
                    # Create a snapshot of the state of the pipeline before the error occurred.
401
                    pipeline_snapshot = _create_pipeline_snapshot(
1✔
402
                        inputs=deepcopy(inputs),
403
                        component_inputs=deepcopy(component_inputs),
404
                        break_point=break_point,
405
                        component_visits=component_visits,
406
                        original_input_data=data,
407
                        ordered_component_names=ordered_component_names,
408
                        include_outputs_from=include_outputs_from,
409
                        pipeline_outputs=pipeline_outputs,
410
                    )
411

412
                    # If the pipeline_snapshot already exists it came from an Agent component.
413
                    # We take the agent snapshot and attach it to the pipeline snapshot we create here.
414
                    # We also update the break_point to be an AgentBreakpoint.
415
                    if error.pipeline_snapshot and error.pipeline_snapshot.agent_snapshot:
1✔
416
                        pipeline_snapshot.agent_snapshot = error.pipeline_snapshot.agent_snapshot
1✔
417
                        pipeline_snapshot.break_point = error.pipeline_snapshot.agent_snapshot.break_point
1✔
418

419
                    # Attach the pipeline snapshot to the error before re-raising
420
                    error.pipeline_snapshot = pipeline_snapshot
1✔
421
                    full_file_path = _save_pipeline_snapshot(
1✔
422
                        pipeline_snapshot=pipeline_snapshot, raise_on_failure=False
423
                    )
424
                    error.pipeline_snapshot_file_path = full_file_path
1✔
425
                    raise error
1✔
426

427
                # Updates global input state with component outputs and returns outputs that should go to
428
                # pipeline outputs.
429
                component_pipeline_outputs = self._write_component_outputs(
1✔
430
                    component_name=component_name,
431
                    component_outputs=component_outputs,
432
                    inputs=inputs,
433
                    receivers=cached_receivers[component_name],
434
                    include_outputs_from=include_outputs_from,
435
                )
436

437
                if component_pipeline_outputs:
1✔
438
                    pipeline_outputs[component_name] = deepcopy(component_pipeline_outputs)
1✔
439
                if self._is_queue_stale(priority_queue):
1✔
440
                    priority_queue = self._fill_queue(ordered_component_names, inputs, component_visits)
1✔
441

442
            if isinstance(break_point, Breakpoint):
1✔
443
                logger.warning(
×
444
                    "The given breakpoint {break_point} was never triggered. This is because:\n"
445
                    "1. The provided component is not a part of the pipeline execution path.\n"
446
                    "2. The component did not reach the visit count specified in the pipeline_breakpoint",
447
                    pipeline_breakpoint=break_point,
448
                )
449

450
            return pipeline_outputs
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc