• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 16703692448

03 Aug 2025 09:48AM UTC coverage: 91.885% (-0.02%) from 91.906%
16703692448

Pull #9665

github

web-flow
Merge 517acf5cb into f2012a452
Pull Request #9665: fix: ensure sentence_transformers_similarity score is a float to not np.float

12794 of 13924 relevant lines covered (91.88%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.88
haystack/core/pipeline/async_pipeline.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import asyncio
1✔
6
import contextvars
1✔
7
from typing import Any, AsyncIterator, Dict, List, Mapping, Optional, Set
1✔
8

9
from haystack import logging, tracing
1✔
10
from haystack.core.component import Component
1✔
11
from haystack.core.errors import PipelineRuntimeError
1✔
12
from haystack.core.pipeline.base import (
1✔
13
    _COMPONENT_INPUT,
14
    _COMPONENT_OUTPUT,
15
    _COMPONENT_VISITS,
16
    ComponentPriority,
17
    PipelineBase,
18
)
19
from haystack.core.pipeline.utils import _deepcopy_with_exceptions
1✔
20
from haystack.telemetry import pipeline_running
1✔
21

22
logger = logging.getLogger(__name__)
1✔
23

24

25
class AsyncPipeline(PipelineBase):
1✔
26
    """
27
    Asynchronous version of the Pipeline orchestration engine.
28

29
    Manages components in a pipeline allowing for concurrent processing when the pipeline's execution graph permits.
30
    This enables efficient processing of components by minimizing idle time and maximizing resource utilization.
31
    """
32

33
    @staticmethod
1✔
34
    async def _run_component_async(
1✔
35
        component_name: str,
36
        component: Dict[str, Any],
37
        component_inputs: Dict[str, Any],
38
        component_visits: Dict[str, int],
39
        parent_span: Optional[tracing.Span] = None,
40
    ) -> Mapping[str, Any]:
41
        """
42
        Executes a single component asynchronously.
43

44
        If the component supports async execution, it is awaited directly as it will run async;
45
        otherwise the component is offloaded to executor.
46

47
        The method also updates the `visits` count of the component, writes outputs to `inputs_state`,
48
        and returns pruned outputs that get stored in `pipeline_outputs`.
49

50
        :param component_name: The name of the component.
51
        :param component_inputs: Inputs for the component.
52
        :returns: Outputs from the component that can be yielded from run_async_generator.
53
        """
54
        instance: Component = component["instance"]
1✔
55

56
        with PipelineBase._create_component_span(
1✔
57
            component_name=component_name, instance=instance, inputs=component_inputs, parent_span=parent_span
58
        ) as span:
59
            # We deepcopy the inputs otherwise we might lose that information
60
            # when we delete them in case they're sent to other Components
61
            span.set_content_tag(_COMPONENT_INPUT, _deepcopy_with_exceptions(component_inputs))
1✔
62
            logger.info("Running component {component_name}", component_name=component_name)
1✔
63

64
            if getattr(instance, "__haystack_supports_async__", False):
1✔
65
                try:
1✔
66
                    outputs = await instance.run_async(**component_inputs)  # type: ignore
1✔
67
                except Exception as error:
×
68
                    raise PipelineRuntimeError.from_exception(component_name, instance.__class__, error) from error
×
69
            else:
70
                loop = asyncio.get_running_loop()
1✔
71
                # Important: contextvars (e.g. active tracing Span) don’t propagate to running loop's ThreadPoolExecutor
72
                # We use ctx.run(...) to preserve context like the active tracing span
73
                ctx = contextvars.copy_context()
1✔
74
                outputs = await loop.run_in_executor(None, lambda: ctx.run(lambda: instance.run(**component_inputs)))
1✔
75

76
            component_visits[component_name] += 1
1✔
77

78
            if not isinstance(outputs, Mapping):
1✔
79
                raise PipelineRuntimeError.from_invalid_output(component_name, instance.__class__, outputs)
×
80

81
            span.set_tag(_COMPONENT_VISITS, component_visits[component_name])
1✔
82
            span.set_content_tag(_COMPONENT_OUTPUT, _deepcopy_with_exceptions(outputs))
1✔
83

84
            return outputs
1✔
85

86
    async def run_async_generator(  # noqa: PLR0915,C901  # pylint: disable=too-many-statements
1✔
87
        self, data: Dict[str, Any], include_outputs_from: Optional[Set[str]] = None, concurrency_limit: int = 4
88
    ) -> AsyncIterator[Dict[str, Any]]:
89
        """
90
        Executes the pipeline step by step asynchronously, yielding partial outputs when any component finishes.
91

92
        Usage:
93
        ```python
94
        from haystack import Document
95
        from haystack.components.builders import ChatPromptBuilder
96
        from haystack.dataclasses import ChatMessage
97
        from haystack.utils import Secret
98
        from haystack.document_stores.in_memory import InMemoryDocumentStore
99
        from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
100
        from haystack.components.generators.chat import OpenAIChatGenerator
101
        from haystack.components.builders.prompt_builder import PromptBuilder
102
        from haystack import AsyncPipeline
103
        import asyncio
104

105
        # Write documents to InMemoryDocumentStore
106
        document_store = InMemoryDocumentStore()
107
        document_store.write_documents([
108
            Document(content="My name is Jean and I live in Paris."),
109
            Document(content="My name is Mark and I live in Berlin."),
110
            Document(content="My name is Giorgio and I live in Rome.")
111
        ])
112

113
        prompt_template = [
114
            ChatMessage.from_user(
115
                '''
116
                Given these documents, answer the question.
117
                Documents:
118
                {% for doc in documents %}
119
                    {{ doc.content }}
120
                {% endfor %}
121
                Question: {{question}}
122
                Answer:
123
                ''')
124
        ]
125

126
        # Create and connect pipeline components
127
        retriever = InMemoryBM25Retriever(document_store=document_store)
128
        prompt_builder = ChatPromptBuilder(template=prompt_template)
129
        llm = OpenAIChatGenerator()
130

131
        rag_pipeline = AsyncPipeline()
132
        rag_pipeline.add_component("retriever", retriever)
133
        rag_pipeline.add_component("prompt_builder", prompt_builder)
134
        rag_pipeline.add_component("llm", llm)
135
        rag_pipeline.connect("retriever", "prompt_builder.documents")
136
        rag_pipeline.connect("prompt_builder", "llm")
137

138
        # Prepare input data
139
        question = "Who lives in Paris?"
140
        data = {
141
            "retriever": {"query": question},
142
            "prompt_builder": {"question": question},
143
        }
144

145

146
        # Process results as they become available
147
        async def process_results():
148
            async for partial_output in rag_pipeline.run_async_generator(
149
                    data=data,
150
                    include_outputs_from={"retriever", "llm"}
151
            ):
152
                # Each partial_output contains the results from a completed component
153
                if "retriever" in partial_output:
154
                    print("Retrieved documents:", len(partial_output["retriever"]["documents"]))
155
                if "llm" in partial_output:
156
                    print("Generated answer:", partial_output["llm"]["replies"][0])
157

158

159
        asyncio.run(process_results())
160
        ```
161

162
        :param data: Initial input data to the pipeline.
163
        :param concurrency_limit: The maximum number of components that are allowed to run concurrently.
164
        :param include_outputs_from:
165
            Set of component names whose individual outputs are to be
166
            included in the pipeline's output. For components that are
167
            invoked multiple times (in a loop), only the last-produced
168
            output is included.
169
        :return: An async iterator containing partial (and final) outputs.
170

171
        :raises ValueError:
172
            If invalid inputs are provided to the pipeline.
173
        :raises PipelineMaxComponentRuns:
174
            If a component exceeds the maximum number of allowed executions within the pipeline.
175
        :raises PipelineRuntimeError:
176
            If the Pipeline contains cycles with unsupported connections that would cause
177
            it to get stuck and fail running.
178
            Or if a Component fails or returns output in an unsupported type.
179
        """
180
        if include_outputs_from is None:
1✔
181
            include_outputs_from = set()
1✔
182

183
        # 0) Basic pipeline init
184
        pipeline_running(self)  # telemetry
1✔
185
        self.warm_up()  # optional warm-up (if needed)
1✔
186

187
        # 1) Prepare ephemeral state
188
        ready_sem = asyncio.Semaphore(max(1, concurrency_limit))
1✔
189
        inputs_state: Dict[str, Dict[str, List[Dict[str, Any]]]] = {}
1✔
190
        pipeline_outputs: Dict[str, Any] = {}
1✔
191
        running_tasks: Dict[asyncio.Task, str] = {}
1✔
192

193
        # A set of component names that have been scheduled but not finished:
194
        scheduled_components: Set[str] = set()
1✔
195

196
        # 2) Convert input data
197
        prepared_data = self._prepare_component_input_data(data)
1✔
198

199
        # raises ValueError if input is malformed in some way
200
        self.validate_input(prepared_data)
1✔
201
        inputs_state = self._convert_to_internal_format(prepared_data)
1✔
202

203
        # For quick lookup of downstream receivers
204
        ordered_names = sorted(self.graph.nodes.keys())
1✔
205
        cached_receivers = {n: self._find_receivers_from(n) for n in ordered_names}
1✔
206
        component_visits = dict.fromkeys(ordered_names, 0)
1✔
207
        cached_topological_sort = None
1✔
208

209
        # We fill the queue once and raise if all components are BLOCKED
210
        self.validate_pipeline(self._fill_queue(ordered_names, inputs_state, component_visits))
1✔
211

212
        # Single parent span for entire pipeline execution
213
        with tracing.tracer.trace(
1✔
214
            "haystack.async_pipeline.run",
215
            tags={
216
                "haystack.pipeline.input_data": prepared_data,
217
                "haystack.pipeline.output_data": pipeline_outputs,
218
                "haystack.pipeline.metadata": self.metadata,
219
                "haystack.pipeline.max_runs_per_component": self._max_runs_per_component,
220
            },
221
        ) as parent_span:
222
            # -------------------------------------------------
223
            # We define some functions here so that they have access to local runtime state
224
            # (inputs, tasks, scheduled components) via closures.
225
            # -------------------------------------------------
226
            async def _run_highest_in_isolation(component_name: str) -> AsyncIterator[Dict[str, Any]]:
1✔
227
                """
228
                Runs a component with HIGHEST priority in isolation.
229

230
                We need to run components with HIGHEST priority (i.e. components with GreedyVariadic input socket)
231
                by themselves, without any other components running concurrently. Otherwise, downstream components
232
                could produce additional inputs for the GreedyVariadic socket.
233

234
                :param component_name: The name of the component.
235
                :return: An async iterator of partial outputs.
236
                """
237
                # 1) Wait for all in-flight tasks to finish
238
                while running_tasks:
×
239
                    done, _pending = await asyncio.wait(running_tasks.keys(), return_when=asyncio.ALL_COMPLETED)
×
240
                    for finished in done:
×
241
                        finished_component_name = running_tasks.pop(finished)
×
242
                        partial_result = finished.result()
×
243
                        scheduled_components.discard(finished_component_name)
×
244
                        if partial_result:
×
245
                            yield_dict = {finished_component_name: _deepcopy_with_exceptions(partial_result)}
×
246
                            yield yield_dict  # partial outputs
×
247

248
                if component_name in scheduled_components:
×
249
                    # If it's already scheduled for some reason, skip
250
                    return
×
251

252
                # 2) Run the HIGHEST component by itself
253
                scheduled_components.add(component_name)
×
254
                comp_dict = self._get_component_with_graph_metadata_and_visits(
×
255
                    component_name, component_visits[component_name]
256
                )
257
                component_inputs = self._consume_component_inputs(component_name, comp_dict, inputs_state)
×
258
                component_inputs = self._add_missing_input_defaults(component_inputs, comp_dict["input_sockets"])
×
259
                component_pipeline_outputs = await self._run_component_async(
×
260
                    component_name=component_name,
261
                    component=comp_dict,
262
                    component_inputs=component_inputs,
263
                    component_visits=component_visits,
264
                    parent_span=parent_span,
265
                )
266

267
                # Distribute outputs to downstream inputs; also prune outputs based on `include_outputs_from`
268
                pruned = self._write_component_outputs(
×
269
                    component_name=component_name,
270
                    component_outputs=component_pipeline_outputs,
271
                    inputs=inputs_state,
272
                    receivers=cached_receivers[component_name],
273
                    include_outputs_from=include_outputs_from,
274
                )
275
                if pruned:
×
276
                    pipeline_outputs[component_name] = pruned
×
277

278
                scheduled_components.remove(component_name)
×
279
                if pruned:
×
280
                    yield {component_name: _deepcopy_with_exceptions(pruned)}
×
281

282
            async def _schedule_task(component_name: str) -> None:
1✔
283
                """
284
                Schedule a component to run.
285

286
                We do NOT wait for it to finish here. This allows us to run other components concurrently.
287

288
                :param component_name: The name of the component.
289
                """
290

291
                if component_name in scheduled_components:
1✔
292
                    return  # already scheduled, do nothing
×
293

294
                scheduled_components.add(component_name)
1✔
295

296
                comp_dict = self._get_component_with_graph_metadata_and_visits(
1✔
297
                    component_name, component_visits[component_name]
298
                )
299
                component_inputs = self._consume_component_inputs(component_name, comp_dict, inputs_state)
1✔
300
                component_inputs = self._add_missing_input_defaults(component_inputs, comp_dict["input_sockets"])
1✔
301

302
                async def _runner():
1✔
303
                    async with ready_sem:
1✔
304
                        component_pipeline_outputs = await self._run_component_async(
1✔
305
                            component_name=component_name,
306
                            component=comp_dict,
307
                            component_inputs=component_inputs,
308
                            component_visits=component_visits,
309
                            parent_span=parent_span,
310
                        )
311

312
                    # Distribute outputs to downstream inputs; also prune outputs based on `include_outputs_from`
313
                    pruned = self._write_component_outputs(
1✔
314
                        component_name=component_name,
315
                        component_outputs=component_pipeline_outputs,
316
                        inputs=inputs_state,
317
                        receivers=cached_receivers[component_name],
318
                        include_outputs_from=include_outputs_from,
319
                    )
320
                    if pruned:
1✔
321
                        pipeline_outputs[component_name] = pruned
1✔
322

323
                    scheduled_components.remove(component_name)
1✔
324
                    return pruned
1✔
325

326
                task = asyncio.create_task(_runner())
1✔
327
                running_tasks[task] = component_name
1✔
328

329
            async def _wait_for_one_task_to_complete() -> AsyncIterator[Dict[str, Any]]:
1✔
330
                """
331
                Wait for exactly one running task to finish, yield partial outputs.
332

333
                If no tasks are running, does nothing.
334
                """
335
                if running_tasks:
1✔
336
                    done, _ = await asyncio.wait(running_tasks.keys(), return_when=asyncio.FIRST_COMPLETED)
1✔
337
                    for finished in done:
1✔
338
                        finished_component_name = running_tasks.pop(finished)
1✔
339
                        partial_result = finished.result()
1✔
340
                        scheduled_components.discard(finished_component_name)
1✔
341
                        if partial_result:
1✔
342
                            yield {finished_component_name: _deepcopy_with_exceptions(partial_result)}
1✔
343

344
            async def _wait_for_all_tasks_to_complete() -> AsyncIterator[Dict[str, Any]]:
1✔
345
                """
346
                Wait for all running tasks to finish, yield partial outputs.
347
                """
348
                if running_tasks:
1✔
349
                    done, _ = await asyncio.wait(running_tasks.keys(), return_when=asyncio.ALL_COMPLETED)
×
350
                    for finished in done:
×
351
                        finished_component_name = running_tasks.pop(finished)
×
352
                        partial_result = finished.result()
×
353
                        scheduled_components.discard(finished_component_name)
×
354
                        if partial_result:
×
355
                            yield {finished_component_name: _deepcopy_with_exceptions(partial_result)}
×
356

357
            # -------------------------------------------------
358
            # MAIN SCHEDULING LOOP
359
            # -------------------------------------------------
360
            while True:
361
                # 2) Build the priority queue of candidates
362
                priority_queue = self._fill_queue(ordered_names, inputs_state, component_visits)
1✔
363
                candidate = self._get_next_runnable_component(priority_queue, component_visits)
1✔
364

365
                if (candidate is None or candidate[0] == ComponentPriority.BLOCKED) and running_tasks:
1✔
366
                    # We need to wait for one task to finish to make progress and potentially unblock the priority_queue
367
                    async for partial_res in _wait_for_one_task_to_complete():
×
368
                        yield partial_res
×
369
                    continue
×
370

371
                if candidate is None and not running_tasks:
1✔
372
                    # done
373
                    break
×
374

375
                priority, comp_name, comp = candidate  # type: ignore
1✔
376

377
                # If the next component is blocked, we do a check to see if the pipeline is possibly blocked and raise
378
                # a warning if it is.
379
                if priority == ComponentPriority.BLOCKED and not running_tasks:
1✔
380
                    if self._is_pipeline_possibly_blocked(current_pipeline_outputs=pipeline_outputs):
1✔
381
                        # Pipeline is most likely blocked (most likely a configuration issue) so we raise a warning.
382
                        logger.warning(
×
383
                            "Cannot run pipeline - the next component that is meant to run is blocked.\n"
384
                            "Component name: '{component_name}'\n"
385
                            "Component type: '{component_type}'\n"
386
                            "This typically happens when the component is unable to receive all of its required "
387
                            "inputs.\nCheck the connections to this component and ensure all required inputs are "
388
                            "provided.",
389
                            component_name=comp_name,
390
                            component_type=comp["instance"].__class__.__name__,
391
                        )
392
                    # We always exit the loop since we cannot run the next component.
393
                    break
×
394

395
                if comp_name in scheduled_components:
1✔
396
                    # We need to wait for one task to finish to make progress
397
                    async for partial_res in _wait_for_one_task_to_complete():
×
398
                        yield partial_res
×
399
                    continue
×
400

401
                if priority == ComponentPriority.HIGHEST:
1✔
402
                    # 1) run alone
403
                    async for partial_res in _run_highest_in_isolation(comp_name):
×
404
                        yield partial_res
×
405
                    # then continue the loop
406
                    continue
×
407

408
                if priority == ComponentPriority.READY:
1✔
409
                    # 1) schedule this one
410
                    await _schedule_task(comp_name)
1✔
411

412
                    # 2) Possibly schedule more READY tasks if concurrency not fully used
413
                    while len(priority_queue) > 0 and not ready_sem.locked():
1✔
414
                        peek_prio, peek_name = priority_queue.peek()
1✔
415
                        if peek_prio in (ComponentPriority.BLOCKED, ComponentPriority.HIGHEST):
1✔
416
                            # can't run or must run alone => skip
417
                            break
1✔
418
                        if peek_prio == ComponentPriority.READY:
1✔
419
                            priority_queue.pop()
1✔
420
                            await _schedule_task(peek_name)
1✔
421
                            # keep adding while concurrency is not locked
422
                            continue
1✔
423

424
                        # The next is DEFER/DEFER_LAST => we only schedule it if it "becomes READY"
425
                        # We'll handle it in the next iteration or with incremental waiting
426
                        break
×
427

428
                # We only schedule components with priority DEFER or DEFER_LAST when no other tasks are running
429
                elif priority in (ComponentPriority.DEFER, ComponentPriority.DEFER_LAST) and not running_tasks:
×
430
                    if len(priority_queue) > 0:
×
431
                        comp_name, topological_sort = self._tiebreak_waiting_components(
×
432
                            component_name=comp_name,
433
                            priority=priority,
434
                            priority_queue=priority_queue,
435
                            topological_sort=cached_topological_sort,
436
                        )
437
                        cached_topological_sort = topological_sort
×
438

439
                    await _schedule_task(comp_name)
×
440

441
                # To make progress, we wait for one task to complete before re-starting the loop
442
                async for partial_res in _wait_for_one_task_to_complete():
1✔
443
                    yield partial_res
1✔
444

445
            # End main loop
446

447
            # 3) Drain leftover tasks
448
            async for partial_res in _wait_for_all_tasks_to_complete():
1✔
449
                yield partial_res
1✔
450

451
            # 4) Yield final pipeline outputs
452
            yield _deepcopy_with_exceptions(pipeline_outputs)
1✔
453

454
    async def run_async(
1✔
455
        self, data: Dict[str, Any], include_outputs_from: Optional[Set[str]] = None, concurrency_limit: int = 4
456
    ) -> Dict[str, Any]:
457
        """
458
        Provides an asynchronous interface to run the pipeline with provided input data.
459

460
        This method allows the pipeline to be integrated into an asynchronous workflow, enabling non-blocking
461
        execution of pipeline components.
462

463
        Usage:
464
        ```python
465
        import asyncio
466

467
        from haystack import Document
468
        from haystack.components.builders import ChatPromptBuilder
469
        from haystack.components.generators.chat import OpenAIChatGenerator
470
        from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
471
        from haystack.core.pipeline import AsyncPipeline
472
        from haystack.dataclasses import ChatMessage
473
        from haystack.document_stores.in_memory import InMemoryDocumentStore
474

475
        # Write documents to InMemoryDocumentStore
476
        document_store = InMemoryDocumentStore()
477
        document_store.write_documents([
478
            Document(content="My name is Jean and I live in Paris."),
479
            Document(content="My name is Mark and I live in Berlin."),
480
            Document(content="My name is Giorgio and I live in Rome.")
481
        ])
482

483
        prompt_template = [
484
            ChatMessage.from_user(
485
                '''
486
                Given these documents, answer the question.
487
                Documents:
488
                {% for doc in documents %}
489
                    {{ doc.content }}
490
                {% endfor %}
491
                Question: {{question}}
492
                Answer:
493
                ''')
494
        ]
495

496
        retriever = InMemoryBM25Retriever(document_store=document_store)
497
        prompt_builder = ChatPromptBuilder(template=prompt_template)
498
        llm = OpenAIChatGenerator()
499

500
        rag_pipeline = AsyncPipeline()
501
        rag_pipeline.add_component("retriever", retriever)
502
        rag_pipeline.add_component("prompt_builder", prompt_builder)
503
        rag_pipeline.add_component("llm", llm)
504
        rag_pipeline.connect("retriever", "prompt_builder.documents")
505
        rag_pipeline.connect("prompt_builder", "llm")
506

507
        # Ask a question
508
        question = "Who lives in Paris?"
509

510
        async def run_inner(data, include_outputs_from):
511
            return await rag_pipeline.run_async(data=data, include_outputs_from=include_outputs_from)
512

513
        data = {
514
            "retriever": {"query": question},
515
            "prompt_builder": {"question": question},
516
        }
517

518
        results = asyncio.run(run_inner(data, include_outputs_from={"retriever", "llm"}))
519

520
        print(results["llm"]["replies"])
521
        # [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=[TextContent(text='Jean lives in Paris.')],
522
        # _name=None, _meta={'model': 'gpt-4o-mini-2024-07-18', 'index': 0, 'finish_reason': 'stop', 'usage':
523
        # {'completion_tokens': 6, 'prompt_tokens': 69, 'total_tokens': 75,
524
        # 'completion_tokens_details': CompletionTokensDetails(accepted_prediction_tokens=0,
525
        # audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0), 'prompt_tokens_details':
526
        # PromptTokensDetails(audio_tokens=0, cached_tokens=0)}})]
527
        ```
528

529
        :param data:
530
            A dictionary of inputs for the pipeline's components. Each key is a component name
531
            and its value is a dictionary of that component's input parameters:
532
            ```
533
            data = {
534
                "comp1": {"input1": 1, "input2": 2},
535
            }
536
            ```
537
            For convenience, this format is also supported when input names are unique:
538
            ```
539
            data = {
540
                "input1": 1, "input2": 2,
541
            }
542
            ```
543
        :param include_outputs_from:
544
            Set of component names whose individual outputs are to be
545
            included in the pipeline's output. For components that are
546
            invoked multiple times (in a loop), only the last-produced
547
            output is included.
548
        :param concurrency_limit: The maximum number of components that should be allowed to run concurrently.
549
        :returns:
550
            A dictionary where each entry corresponds to a component name
551
            and its output. If `include_outputs_from` is `None`, this dictionary
552
            will only contain the outputs of leaf components, i.e., components
553
            without outgoing connections.
554

555
        :raises ValueError:
556
            If invalid inputs are provided to the pipeline.
557
        :raises PipelineRuntimeError:
558
            If the Pipeline contains cycles with unsupported connections that would cause
559
            it to get stuck and fail running.
560
            Or if a Component fails or returns output in an unsupported type.
561
        :raises PipelineMaxComponentRuns:
562
            If a Component reaches the maximum number of times it can be run in this Pipeline.
563
        """
564
        final: Dict[str, Any] = {}
1✔
565
        async for partial in self.run_async_generator(
1✔
566
            data=data, concurrency_limit=concurrency_limit, include_outputs_from=include_outputs_from
567
        ):
568
            final = partial
1✔
569
        return final or {}
1✔
570

571
    def run(
1✔
572
        self, data: Dict[str, Any], include_outputs_from: Optional[Set[str]] = None, concurrency_limit: int = 4
573
    ) -> Dict[str, Any]:
574
        """
575
        Provides a synchronous interface to run the pipeline with given input data.
576

577
        Internally, the pipeline components are executed asynchronously, but the method itself
578
        will block until the entire pipeline execution is complete.
579

580
        In case you need asynchronous methods, consider using `run_async` or `run_async_generator`.
581

582
        Usage:
583
        ```python
584
        from haystack import Document
585
        from haystack.components.builders import ChatPromptBuilder
586
        from haystack.components.generators.chat import OpenAIChatGenerator
587
        from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
588
        from haystack.core.pipeline import AsyncPipeline
589
        from haystack.dataclasses import ChatMessage
590
        from haystack.document_stores.in_memory import InMemoryDocumentStore
591

592
        # Write documents to InMemoryDocumentStore
593
        document_store = InMemoryDocumentStore()
594
        document_store.write_documents([
595
            Document(content="My name is Jean and I live in Paris."),
596
            Document(content="My name is Mark and I live in Berlin."),
597
            Document(content="My name is Giorgio and I live in Rome.")
598
        ])
599

600
        prompt_template = [
601
            ChatMessage.from_user(
602
                '''
603
                Given these documents, answer the question.
604
                Documents:
605
                {% for doc in documents %}
606
                    {{ doc.content }}
607
                {% endfor %}
608
                Question: {{question}}
609
                Answer:
610
                ''')
611
        ]
612

613

614
        retriever = InMemoryBM25Retriever(document_store=document_store)
615
        prompt_builder = ChatPromptBuilder(template=prompt_template)
616
        llm = OpenAIChatGenerator()
617

618
        rag_pipeline = AsyncPipeline()
619
        rag_pipeline.add_component("retriever", retriever)
620
        rag_pipeline.add_component("prompt_builder", prompt_builder)
621
        rag_pipeline.add_component("llm", llm)
622
        rag_pipeline.connect("retriever", "prompt_builder.documents")
623
        rag_pipeline.connect("prompt_builder", "llm")
624

625
        # Ask a question
626
        question = "Who lives in Paris?"
627

628
        data = {
629
            "retriever": {"query": question},
630
            "prompt_builder": {"question": question},
631
        }
632

633
        results = rag_pipeline.run(data)
634

635
        print(results["llm"]["replies"])
636
        # [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=[TextContent(text='Jean lives in Paris.')],
637
        # _name=None, _meta={'model': 'gpt-4o-mini-2024-07-18', 'index': 0, 'finish_reason': 'stop', 'usage':
638
        # {'completion_tokens': 6, 'prompt_tokens': 69, 'total_tokens': 75, 'completion_tokens_details':
639
        # CompletionTokensDetails(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0,
640
        # rejected_prediction_tokens=0), 'prompt_tokens_details': PromptTokensDetails(audio_tokens=0,
641
        # cached_tokens=0)}})]
642
        ```
643

644
        :param data:
645
            A dictionary of inputs for the pipeline's components. Each key is a component name
646
            and its value is a dictionary of that component's input parameters:
647
            ```
648
            data = {
649
                "comp1": {"input1": 1, "input2": 2},
650
            }
651
            ```
652
            For convenience, this format is also supported when input names are unique:
653
            ```
654
            data = {
655
                "input1": 1, "input2": 2,
656
            }
657
            ```
658
        :param include_outputs_from:
659
            Set of component names whose individual outputs are to be
660
            included in the pipeline's output. For components that are
661
            invoked multiple times (in a loop), only the last-produced
662
            output is included.
663
        :param concurrency_limit: The maximum number of components that should be allowed to run concurrently.
664

665
        :returns:
666
            A dictionary where each entry corresponds to a component name
667
            and its output. If `include_outputs_from` is `None`, this dictionary
668
            will only contain the outputs of leaf components, i.e., components
669
            without outgoing connections.
670

671
        :raises ValueError:
672
            If invalid inputs are provided to the pipeline.
673
        :raises PipelineRuntimeError:
674
            If the Pipeline contains cycles with unsupported connections that would cause
675
            it to get stuck and fail running.
676
            Or if a Component fails or returns output in an unsupported type.
677
        :raises PipelineMaxComponentRuns:
678
            If a Component reaches the maximum number of times it can be run in this Pipeline.
679
        """
680
        return asyncio.run(
×
681
            self.run_async(data=data, include_outputs_from=include_outputs_from, concurrency_limit=concurrency_limit)
682
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc