• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 17434976747

03 Sep 2025 01:26PM UTC coverage: 92.056% (-0.02%) from 92.079%
17434976747

Pull #9759

github

web-flow
Merge 7d9036b38 into f48789f5f
Pull Request #9759: feat: Add PipelineTool to streamline using Pipeline as Tools with Agent

12967 of 14086 relevant lines covered (92.06%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.65
haystack/core/pipeline/async_pipeline.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import asyncio
1✔
6
import contextvars
1✔
7
from typing import Any, AsyncIterator, Mapping, Optional
1✔
8

9
from haystack import logging, tracing
1✔
10
from haystack.core.component import Component
1✔
11
from haystack.core.errors import PipelineRuntimeError
1✔
12
from haystack.core.pipeline.base import (
1✔
13
    _COMPONENT_INPUT,
14
    _COMPONENT_OUTPUT,
15
    _COMPONENT_VISITS,
16
    ComponentPriority,
17
    PipelineBase,
18
)
19
from haystack.core.pipeline.utils import _deepcopy_with_exceptions
1✔
20
from haystack.telemetry import pipeline_running
1✔
21

22
logger = logging.getLogger(__name__)
1✔
23

24

25
class AsyncPipeline(PipelineBase):
1✔
26
    """
27
    Asynchronous version of the Pipeline orchestration engine.
28

29
    Manages components in a pipeline allowing for concurrent processing when the pipeline's execution graph permits.
30
    This enables efficient processing of components by minimizing idle time and maximizing resource utilization.
31
    """
32

33
    @staticmethod
1✔
34
    async def _run_component_async(
1✔
35
        component_name: str,
36
        component: dict[str, Any],
37
        component_inputs: dict[str, Any],
38
        component_visits: dict[str, int],
39
        parent_span: Optional[tracing.Span] = None,
40
    ) -> Mapping[str, Any]:
41
        """
42
        Executes a single component asynchronously.
43

44
        If the component supports async execution, it is awaited directly as it will run async;
45
        otherwise the component is offloaded to executor.
46

47
        The method also updates the `visits` count of the component, writes outputs to `inputs_state`,
48
        and returns pruned outputs that get stored in `pipeline_outputs`.
49

50
        :param component_name: The name of the component.
51
        :param component_inputs: Inputs for the component.
52
        :returns: Outputs from the component that can be yielded from run_async_generator.
53
        """
54
        instance: Component = component["instance"]
1✔
55

56
        with PipelineBase._create_component_span(
1✔
57
            component_name=component_name, instance=instance, inputs=component_inputs, parent_span=parent_span
58
        ) as span:
59
            # We deepcopy the inputs otherwise we might lose that information
60
            # when we delete them in case they're sent to other Components
61
            span.set_content_tag(_COMPONENT_INPUT, _deepcopy_with_exceptions(component_inputs))
1✔
62
            logger.info("Running component {component_name}", component_name=component_name)
1✔
63

64
            if getattr(instance, "__haystack_supports_async__", False):
1✔
65
                try:
1✔
66
                    outputs = await instance.run_async(**component_inputs)  # type: ignore
1✔
67
                except Exception as error:
×
68
                    raise PipelineRuntimeError.from_exception(component_name, instance.__class__, error) from error
×
69
            else:
70
                loop = asyncio.get_running_loop()
1✔
71
                # Important: contextvars (e.g. active tracing Span) don’t propagate to running loop's ThreadPoolExecutor
72
                # We use ctx.run(...) to preserve context like the active tracing span
73
                ctx = contextvars.copy_context()
1✔
74
                try:
1✔
75
                    outputs = await loop.run_in_executor(
1✔
76
                        None, lambda: ctx.run(lambda: instance.run(**component_inputs))
77
                    )
78
                except Exception as error:
×
79
                    raise PipelineRuntimeError.from_exception(component_name, instance.__class__, error) from error
×
80

81
            component_visits[component_name] += 1
1✔
82

83
            if not isinstance(outputs, Mapping):
1✔
84
                raise PipelineRuntimeError.from_invalid_output(component_name, instance.__class__, outputs)
×
85

86
            span.set_tag(_COMPONENT_VISITS, component_visits[component_name])
1✔
87
            span.set_content_tag(_COMPONENT_OUTPUT, _deepcopy_with_exceptions(outputs))
1✔
88

89
            return outputs
1✔
90

91
    async def run_async_generator(  # noqa: PLR0915,C901  # pylint: disable=too-many-statements
1✔
92
        self, data: dict[str, Any], include_outputs_from: Optional[set[str]] = None, concurrency_limit: int = 4
93
    ) -> AsyncIterator[dict[str, Any]]:
94
        """
95
        Executes the pipeline step by step asynchronously, yielding partial outputs when any component finishes.
96

97
        Usage:
98
        ```python
99
        from haystack import Document
100
        from haystack.components.builders import ChatPromptBuilder
101
        from haystack.dataclasses import ChatMessage
102
        from haystack.utils import Secret
103
        from haystack.document_stores.in_memory import InMemoryDocumentStore
104
        from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
105
        from haystack.components.generators.chat import OpenAIChatGenerator
106
        from haystack.components.builders.prompt_builder import PromptBuilder
107
        from haystack import AsyncPipeline
108
        import asyncio
109

110
        # Write documents to InMemoryDocumentStore
111
        document_store = InMemoryDocumentStore()
112
        document_store.write_documents([
113
            Document(content="My name is Jean and I live in Paris."),
114
            Document(content="My name is Mark and I live in Berlin."),
115
            Document(content="My name is Giorgio and I live in Rome.")
116
        ])
117

118
        prompt_template = [
119
            ChatMessage.from_user(
120
                '''
121
                Given these documents, answer the question.
122
                Documents:
123
                {% for doc in documents %}
124
                    {{ doc.content }}
125
                {% endfor %}
126
                Question: {{question}}
127
                Answer:
128
                ''')
129
        ]
130

131
        # Create and connect pipeline components
132
        retriever = InMemoryBM25Retriever(document_store=document_store)
133
        prompt_builder = ChatPromptBuilder(template=prompt_template)
134
        llm = OpenAIChatGenerator()
135

136
        rag_pipeline = AsyncPipeline()
137
        rag_pipeline.add_component("retriever", retriever)
138
        rag_pipeline.add_component("prompt_builder", prompt_builder)
139
        rag_pipeline.add_component("llm", llm)
140
        rag_pipeline.connect("retriever", "prompt_builder.documents")
141
        rag_pipeline.connect("prompt_builder", "llm")
142

143
        # Prepare input data
144
        question = "Who lives in Paris?"
145
        data = {
146
            "retriever": {"query": question},
147
            "prompt_builder": {"question": question},
148
        }
149

150

151
        # Process results as they become available
152
        async def process_results():
153
            async for partial_output in rag_pipeline.run_async_generator(
154
                    data=data,
155
                    include_outputs_from={"retriever", "llm"}
156
            ):
157
                # Each partial_output contains the results from a completed component
158
                if "retriever" in partial_output:
159
                    print("Retrieved documents:", len(partial_output["retriever"]["documents"]))
160
                if "llm" in partial_output:
161
                    print("Generated answer:", partial_output["llm"]["replies"][0])
162

163

164
        asyncio.run(process_results())
165
        ```
166

167
        :param data: Initial input data to the pipeline.
168
        :param concurrency_limit: The maximum number of components that are allowed to run concurrently.
169
        :param include_outputs_from:
170
            Set of component names whose individual outputs are to be
171
            included in the pipeline's output. For components that are
172
            invoked multiple times (in a loop), only the last-produced
173
            output is included.
174
        :return: An async iterator containing partial (and final) outputs.
175

176
        :raises ValueError:
177
            If invalid inputs are provided to the pipeline.
178
        :raises PipelineMaxComponentRuns:
179
            If a component exceeds the maximum number of allowed executions within the pipeline.
180
        :raises PipelineRuntimeError:
181
            If the Pipeline contains cycles with unsupported connections that would cause
182
            it to get stuck and fail running.
183
            Or if a Component fails or returns output in an unsupported type.
184
        """
185
        if include_outputs_from is None:
1✔
186
            include_outputs_from = set()
1✔
187

188
        # 0) Basic pipeline init
189
        pipeline_running(self)  # telemetry
1✔
190
        self.warm_up()  # optional warm-up (if needed)
1✔
191

192
        # 1) Prepare ephemeral state
193
        ready_sem = asyncio.Semaphore(max(1, concurrency_limit))
1✔
194
        inputs_state: dict[str, dict[str, list[dict[str, Any]]]] = {}
1✔
195
        pipeline_outputs: dict[str, Any] = {}
1✔
196
        running_tasks: dict[asyncio.Task, str] = {}
1✔
197

198
        # A set of component names that have been scheduled but not finished:
199
        scheduled_components: set[str] = set()
1✔
200

201
        # 2) Convert input data
202
        prepared_data = self._prepare_component_input_data(data)
1✔
203

204
        # raises ValueError if input is malformed in some way
205
        self.validate_input(prepared_data)
1✔
206
        inputs_state = self._convert_to_internal_format(prepared_data)
1✔
207

208
        # For quick lookup of downstream receivers
209
        ordered_names = sorted(self.graph.nodes.keys())
1✔
210
        cached_receivers = {n: self._find_receivers_from(n) for n in ordered_names}
1✔
211
        component_visits = dict.fromkeys(ordered_names, 0)
1✔
212
        cached_topological_sort = None
1✔
213

214
        # We fill the queue once and raise if all components are BLOCKED
215
        self.validate_pipeline(self._fill_queue(ordered_names, inputs_state, component_visits))
1✔
216

217
        # Single parent span for entire pipeline execution
218
        with tracing.tracer.trace(
1✔
219
            "haystack.async_pipeline.run",
220
            tags={
221
                "haystack.pipeline.input_data": prepared_data,
222
                "haystack.pipeline.output_data": pipeline_outputs,
223
                "haystack.pipeline.metadata": self.metadata,
224
                "haystack.pipeline.max_runs_per_component": self._max_runs_per_component,
225
            },
226
        ) as parent_span:
227
            # -------------------------------------------------
228
            # We define some functions here so that they have access to local runtime state
229
            # (inputs, tasks, scheduled components) via closures.
230
            # -------------------------------------------------
231
            async def _run_highest_in_isolation(component_name: str) -> AsyncIterator[dict[str, Any]]:
1✔
232
                """
233
                Runs a component with HIGHEST priority in isolation.
234

235
                We need to run components with HIGHEST priority (i.e. components with GreedyVariadic input socket)
236
                by themselves, without any other components running concurrently. Otherwise, downstream components
237
                could produce additional inputs for the GreedyVariadic socket.
238

239
                :param component_name: The name of the component.
240
                :return: An async iterator of partial outputs.
241
                """
242
                # 1) Wait for all in-flight tasks to finish
243
                while running_tasks:
×
244
                    done, _pending = await asyncio.wait(running_tasks.keys(), return_when=asyncio.ALL_COMPLETED)
×
245
                    for finished in done:
×
246
                        finished_component_name = running_tasks.pop(finished)
×
247
                        partial_result = finished.result()
×
248
                        scheduled_components.discard(finished_component_name)
×
249
                        if partial_result:
×
250
                            yield_dict = {finished_component_name: _deepcopy_with_exceptions(partial_result)}
×
251
                            yield yield_dict  # partial outputs
×
252

253
                if component_name in scheduled_components:
×
254
                    # If it's already scheduled for some reason, skip
255
                    return
×
256

257
                # 2) Run the HIGHEST component by itself
258
                scheduled_components.add(component_name)
×
259
                comp_dict = self._get_component_with_graph_metadata_and_visits(
×
260
                    component_name, component_visits[component_name]
261
                )
262
                component_inputs = self._consume_component_inputs(component_name, comp_dict, inputs_state)
×
263
                component_inputs = self._add_missing_input_defaults(component_inputs, comp_dict["input_sockets"])
×
264

265
                try:
×
266
                    component_pipeline_outputs = await self._run_component_async(
×
267
                        component_name=component_name,
268
                        component=comp_dict,
269
                        component_inputs=component_inputs,
270
                        component_visits=component_visits,
271
                        parent_span=parent_span,
272
                    )
273
                except PipelineRuntimeError as error:
×
274
                    raise error
×
275

276
                # Distribute outputs to downstream inputs; also prune outputs based on `include_outputs_from`
277
                pruned = self._write_component_outputs(
×
278
                    component_name=component_name,
279
                    component_outputs=component_pipeline_outputs,
280
                    inputs=inputs_state,
281
                    receivers=cached_receivers[component_name],
282
                    include_outputs_from=include_outputs_from,
283
                )
284
                if pruned:
×
285
                    pipeline_outputs[component_name] = pruned
×
286

287
                scheduled_components.remove(component_name)
×
288
                if pruned:
×
289
                    yield {component_name: _deepcopy_with_exceptions(pruned)}
×
290

291
            async def _schedule_task(component_name: str) -> None:
1✔
292
                """
293
                Schedule a component to run.
294

295
                We do NOT wait for it to finish here. This allows us to run other components concurrently.
296

297
                :param component_name: The name of the component.
298
                """
299

300
                if component_name in scheduled_components:
1✔
301
                    return  # already scheduled, do nothing
×
302

303
                scheduled_components.add(component_name)
1✔
304

305
                comp_dict = self._get_component_with_graph_metadata_and_visits(
1✔
306
                    component_name, component_visits[component_name]
307
                )
308
                component_inputs = self._consume_component_inputs(component_name, comp_dict, inputs_state)
1✔
309
                component_inputs = self._add_missing_input_defaults(component_inputs, comp_dict["input_sockets"])
1✔
310

311
                async def _runner():
1✔
312
                    try:
1✔
313
                        async with ready_sem:
1✔
314
                            component_pipeline_outputs = await self._run_component_async(
1✔
315
                                component_name=component_name,
316
                                component=comp_dict,
317
                                component_inputs=component_inputs,
318
                                component_visits=component_visits,
319
                                parent_span=parent_span,
320
                            )
321
                    except PipelineRuntimeError as error:
×
322
                        raise error
×
323

324
                    # Distribute outputs to downstream inputs; also prune outputs based on `include_outputs_from`
325
                    pruned = self._write_component_outputs(
1✔
326
                        component_name=component_name,
327
                        component_outputs=component_pipeline_outputs,
328
                        inputs=inputs_state,
329
                        receivers=cached_receivers[component_name],
330
                        include_outputs_from=include_outputs_from,
331
                    )
332
                    if pruned:
1✔
333
                        pipeline_outputs[component_name] = pruned
1✔
334

335
                    scheduled_components.remove(component_name)
1✔
336
                    return pruned
1✔
337

338
                task = asyncio.create_task(_runner())
1✔
339
                running_tasks[task] = component_name
1✔
340

341
            async def _wait_for_one_task_to_complete() -> AsyncIterator[dict[str, Any]]:
1✔
342
                """
343
                Wait for exactly one running task to finish, yield partial outputs.
344

345
                If no tasks are running, does nothing.
346
                """
347
                if running_tasks:
1✔
348
                    done, _ = await asyncio.wait(running_tasks.keys(), return_when=asyncio.FIRST_COMPLETED)
1✔
349
                    for finished in done:
1✔
350
                        finished_component_name = running_tasks.pop(finished)
1✔
351
                        partial_result = finished.result()
1✔
352
                        scheduled_components.discard(finished_component_name)
1✔
353
                        if partial_result:
1✔
354
                            yield {finished_component_name: _deepcopy_with_exceptions(partial_result)}
1✔
355

356
            async def _wait_for_all_tasks_to_complete() -> AsyncIterator[dict[str, Any]]:
1✔
357
                """
358
                Wait for all running tasks to finish, yield partial outputs.
359
                """
360
                if running_tasks:
1✔
361
                    done, _ = await asyncio.wait(running_tasks.keys(), return_when=asyncio.ALL_COMPLETED)
×
362
                    for finished in done:
×
363
                        finished_component_name = running_tasks.pop(finished)
×
364
                        partial_result = finished.result()
×
365
                        scheduled_components.discard(finished_component_name)
×
366
                        if partial_result:
×
367
                            yield {finished_component_name: _deepcopy_with_exceptions(partial_result)}
×
368

369
            # -------------------------------------------------
370
            # MAIN SCHEDULING LOOP
371
            # -------------------------------------------------
372
            while True:
373
                # 2) Build the priority queue of candidates
374
                priority_queue = self._fill_queue(ordered_names, inputs_state, component_visits)
1✔
375
                candidate = self._get_next_runnable_component(priority_queue, component_visits)
1✔
376

377
                if (candidate is None or candidate[0] == ComponentPriority.BLOCKED) and running_tasks:
1✔
378
                    # We need to wait for one task to finish to make progress and potentially unblock the priority_queue
379
                    async for partial_res in _wait_for_one_task_to_complete():
1✔
380
                        yield partial_res
1✔
381
                    continue
1✔
382

383
                if candidate is None and not running_tasks:
1✔
384
                    # done
385
                    break
×
386

387
                priority, comp_name, comp = candidate  # type: ignore
1✔
388

389
                # If the next component is blocked, we do a check to see if the pipeline is possibly blocked and raise
390
                # a warning if it is.
391
                if priority == ComponentPriority.BLOCKED and not running_tasks:
1✔
392
                    if self._is_pipeline_possibly_blocked(current_pipeline_outputs=pipeline_outputs):
1✔
393
                        # Pipeline is most likely blocked (most likely a configuration issue) so we raise a warning.
394
                        logger.warning(
×
395
                            "Cannot run pipeline - the next component that is meant to run is blocked.\n"
396
                            "Component name: '{component_name}'\n"
397
                            "Component type: '{component_type}'\n"
398
                            "This typically happens when the component is unable to receive all of its required "
399
                            "inputs.\nCheck the connections to this component and ensure all required inputs are "
400
                            "provided.",
401
                            component_name=comp_name,
402
                            component_type=comp["instance"].__class__.__name__,
403
                        )
404
                    # We always exit the loop since we cannot run the next component.
405
                    break
×
406

407
                if comp_name in scheduled_components:
1✔
408
                    # We need to wait for one task to finish to make progress
409
                    async for partial_res in _wait_for_one_task_to_complete():
×
410
                        yield partial_res
×
411
                    continue
×
412

413
                if priority == ComponentPriority.HIGHEST:
1✔
414
                    # 1) run alone
415
                    async for partial_res in _run_highest_in_isolation(comp_name):
×
416
                        yield partial_res
×
417
                    # then continue the loop
418
                    continue
×
419

420
                if priority == ComponentPriority.READY:
1✔
421
                    # 1) schedule this one
422
                    await _schedule_task(comp_name)
1✔
423

424
                    # 2) Possibly schedule more READY tasks if concurrency not fully used
425
                    while len(priority_queue) > 0 and not ready_sem.locked():
1✔
426
                        peek_prio, peek_name = priority_queue.peek()
1✔
427
                        if peek_prio in (ComponentPriority.BLOCKED, ComponentPriority.HIGHEST):
1✔
428
                            # can't run or must run alone => skip
429
                            break
1✔
430
                        if peek_prio == ComponentPriority.READY:
1✔
431
                            priority_queue.pop()
1✔
432
                            await _schedule_task(peek_name)
1✔
433
                            # keep adding while concurrency is not locked
434
                            continue
1✔
435

436
                        # The next is DEFER/DEFER_LAST => we only schedule it if it "becomes READY"
437
                        # We'll handle it in the next iteration or with incremental waiting
438
                        break
×
439

440
                # We only schedule components with priority DEFER or DEFER_LAST when no other tasks are running
441
                elif priority in (ComponentPriority.DEFER, ComponentPriority.DEFER_LAST) and not running_tasks:
×
442
                    if len(priority_queue) > 0:
×
443
                        comp_name, topological_sort = self._tiebreak_waiting_components(
×
444
                            component_name=comp_name,
445
                            priority=priority,
446
                            priority_queue=priority_queue,
447
                            topological_sort=cached_topological_sort,
448
                        )
449
                        cached_topological_sort = topological_sort
×
450

451
                    await _schedule_task(comp_name)
×
452

453
                # To make progress, we wait for one task to complete before re-starting the loop
454
                async for partial_res in _wait_for_one_task_to_complete():
1✔
455
                    yield partial_res
1✔
456

457
            # End main loop
458

459
            # 3) Drain leftover tasks
460
            async for partial_res in _wait_for_all_tasks_to_complete():
1✔
461
                yield partial_res
1✔
462

463
            # 4) Yield final pipeline outputs
464
            yield _deepcopy_with_exceptions(pipeline_outputs)
1✔
465

466
    async def run_async(
1✔
467
        self, data: dict[str, Any], include_outputs_from: Optional[set[str]] = None, concurrency_limit: int = 4
468
    ) -> dict[str, Any]:
469
        """
470
        Provides an asynchronous interface to run the pipeline with provided input data.
471

472
        This method allows the pipeline to be integrated into an asynchronous workflow, enabling non-blocking
473
        execution of pipeline components.
474

475
        Usage:
476
        ```python
477
        import asyncio
478

479
        from haystack import Document
480
        from haystack.components.builders import ChatPromptBuilder
481
        from haystack.components.generators.chat import OpenAIChatGenerator
482
        from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
483
        from haystack.core.pipeline import AsyncPipeline
484
        from haystack.dataclasses import ChatMessage
485
        from haystack.document_stores.in_memory import InMemoryDocumentStore
486

487
        # Write documents to InMemoryDocumentStore
488
        document_store = InMemoryDocumentStore()
489
        document_store.write_documents([
490
            Document(content="My name is Jean and I live in Paris."),
491
            Document(content="My name is Mark and I live in Berlin."),
492
            Document(content="My name is Giorgio and I live in Rome.")
493
        ])
494

495
        prompt_template = [
496
            ChatMessage.from_user(
497
                '''
498
                Given these documents, answer the question.
499
                Documents:
500
                {% for doc in documents %}
501
                    {{ doc.content }}
502
                {% endfor %}
503
                Question: {{question}}
504
                Answer:
505
                ''')
506
        ]
507

508
        retriever = InMemoryBM25Retriever(document_store=document_store)
509
        prompt_builder = ChatPromptBuilder(template=prompt_template)
510
        llm = OpenAIChatGenerator()
511

512
        rag_pipeline = AsyncPipeline()
513
        rag_pipeline.add_component("retriever", retriever)
514
        rag_pipeline.add_component("prompt_builder", prompt_builder)
515
        rag_pipeline.add_component("llm", llm)
516
        rag_pipeline.connect("retriever", "prompt_builder.documents")
517
        rag_pipeline.connect("prompt_builder", "llm")
518

519
        # Ask a question
520
        question = "Who lives in Paris?"
521

522
        async def run_inner(data, include_outputs_from):
523
            return await rag_pipeline.run_async(data=data, include_outputs_from=include_outputs_from)
524

525
        data = {
526
            "retriever": {"query": question},
527
            "prompt_builder": {"question": question},
528
        }
529

530
        results = asyncio.run(run_inner(data, include_outputs_from={"retriever", "llm"}))
531

532
        print(results["llm"]["replies"])
533
        # [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=[TextContent(text='Jean lives in Paris.')],
534
        # _name=None, _meta={'model': 'gpt-4o-mini-2024-07-18', 'index': 0, 'finish_reason': 'stop', 'usage':
535
        # {'completion_tokens': 6, 'prompt_tokens': 69, 'total_tokens': 75,
536
        # 'completion_tokens_details': CompletionTokensDetails(accepted_prediction_tokens=0,
537
        # audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0), 'prompt_tokens_details':
538
        # PromptTokensDetails(audio_tokens=0, cached_tokens=0)}})]
539
        ```
540

541
        :param data:
542
            A dictionary of inputs for the pipeline's components. Each key is a component name
543
            and its value is a dictionary of that component's input parameters:
544
            ```
545
            data = {
546
                "comp1": {"input1": 1, "input2": 2},
547
            }
548
            ```
549
            For convenience, this format is also supported when input names are unique:
550
            ```
551
            data = {
552
                "input1": 1, "input2": 2,
553
            }
554
            ```
555
        :param include_outputs_from:
556
            Set of component names whose individual outputs are to be
557
            included in the pipeline's output. For components that are
558
            invoked multiple times (in a loop), only the last-produced
559
            output is included.
560
        :param concurrency_limit: The maximum number of components that should be allowed to run concurrently.
561
        :returns:
562
            A dictionary where each entry corresponds to a component name
563
            and its output. If `include_outputs_from` is `None`, this dictionary
564
            will only contain the outputs of leaf components, i.e., components
565
            without outgoing connections.
566

567
        :raises ValueError:
568
            If invalid inputs are provided to the pipeline.
569
        :raises PipelineRuntimeError:
570
            If the Pipeline contains cycles with unsupported connections that would cause
571
            it to get stuck and fail running.
572
            Or if a Component fails or returns output in an unsupported type.
573
        :raises PipelineMaxComponentRuns:
574
            If a Component reaches the maximum number of times it can be run in this Pipeline.
575
        """
576
        final: dict[str, Any] = {}
1✔
577
        async for partial in self.run_async_generator(
1✔
578
            data=data, concurrency_limit=concurrency_limit, include_outputs_from=include_outputs_from
579
        ):
580
            final = partial
1✔
581
        return final or {}
1✔
582

583
    def run(
1✔
584
        self, data: dict[str, Any], include_outputs_from: Optional[set[str]] = None, concurrency_limit: int = 4
585
    ) -> dict[str, Any]:
586
        """
587
        Provides a synchronous interface to run the pipeline with given input data.
588

589
        Internally, the pipeline components are executed asynchronously, but the method itself
590
        will block until the entire pipeline execution is complete.
591

592
        In case you need asynchronous methods, consider using `run_async` or `run_async_generator`.
593

594
        Usage:
595
        ```python
596
        from haystack import Document
597
        from haystack.components.builders import ChatPromptBuilder
598
        from haystack.components.generators.chat import OpenAIChatGenerator
599
        from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
600
        from haystack.core.pipeline import AsyncPipeline
601
        from haystack.dataclasses import ChatMessage
602
        from haystack.document_stores.in_memory import InMemoryDocumentStore
603

604
        # Write documents to InMemoryDocumentStore
605
        document_store = InMemoryDocumentStore()
606
        document_store.write_documents([
607
            Document(content="My name is Jean and I live in Paris."),
608
            Document(content="My name is Mark and I live in Berlin."),
609
            Document(content="My name is Giorgio and I live in Rome.")
610
        ])
611

612
        prompt_template = [
613
            ChatMessage.from_user(
614
                '''
615
                Given these documents, answer the question.
616
                Documents:
617
                {% for doc in documents %}
618
                    {{ doc.content }}
619
                {% endfor %}
620
                Question: {{question}}
621
                Answer:
622
                ''')
623
        ]
624

625

626
        retriever = InMemoryBM25Retriever(document_store=document_store)
627
        prompt_builder = ChatPromptBuilder(template=prompt_template)
628
        llm = OpenAIChatGenerator()
629

630
        rag_pipeline = AsyncPipeline()
631
        rag_pipeline.add_component("retriever", retriever)
632
        rag_pipeline.add_component("prompt_builder", prompt_builder)
633
        rag_pipeline.add_component("llm", llm)
634
        rag_pipeline.connect("retriever", "prompt_builder.documents")
635
        rag_pipeline.connect("prompt_builder", "llm")
636

637
        # Ask a question
638
        question = "Who lives in Paris?"
639

640
        data = {
641
            "retriever": {"query": question},
642
            "prompt_builder": {"question": question},
643
        }
644

645
        results = rag_pipeline.run(data)
646

647
        print(results["llm"]["replies"])
648
        # [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=[TextContent(text='Jean lives in Paris.')],
649
        # _name=None, _meta={'model': 'gpt-4o-mini-2024-07-18', 'index': 0, 'finish_reason': 'stop', 'usage':
650
        # {'completion_tokens': 6, 'prompt_tokens': 69, 'total_tokens': 75, 'completion_tokens_details':
651
        # CompletionTokensDetails(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0,
652
        # rejected_prediction_tokens=0), 'prompt_tokens_details': PromptTokensDetails(audio_tokens=0,
653
        # cached_tokens=0)}})]
654
        ```
655

656
        :param data:
657
            A dictionary of inputs for the pipeline's components. Each key is a component name
658
            and its value is a dictionary of that component's input parameters:
659
            ```
660
            data = {
661
                "comp1": {"input1": 1, "input2": 2},
662
            }
663
            ```
664
            For convenience, this format is also supported when input names are unique:
665
            ```
666
            data = {
667
                "input1": 1, "input2": 2,
668
            }
669
            ```
670
        :param include_outputs_from:
671
            Set of component names whose individual outputs are to be
672
            included in the pipeline's output. For components that are
673
            invoked multiple times (in a loop), only the last-produced
674
            output is included.
675
        :param concurrency_limit: The maximum number of components that should be allowed to run concurrently.
676

677
        :returns:
678
            A dictionary where each entry corresponds to a component name
679
            and its output. If `include_outputs_from` is `None`, this dictionary
680
            will only contain the outputs of leaf components, i.e., components
681
            without outgoing connections.
682

683
        :raises ValueError:
684
            If invalid inputs are provided to the pipeline.
685
        :raises PipelineRuntimeError:
686
            If the Pipeline contains cycles with unsupported connections that would cause
687
            it to get stuck and fail running.
688
            Or if a Component fails or returns output in an unsupported type.
689
        :raises PipelineMaxComponentRuns:
690
            If a Component reaches the maximum number of times it can be run in this Pipeline.
691
        :raises RuntimeError:
692
            If called from within an async context. Use `run_async` instead.
693
        """
694
        try:
1✔
695
            asyncio.get_running_loop()
1✔
696
        except RuntimeError:
1✔
697
            # No running loop: safe to use asyncio.run()
698
            return asyncio.run(
1✔
699
                self.run_async(
700
                    data=data, include_outputs_from=include_outputs_from, concurrency_limit=concurrency_limit
701
                )
702
            )
703
        else:
704
            # Running loop present: do not create the coroutine and do not call asyncio.run()
705
            raise RuntimeError(
1✔
706
                "Cannot call run() from within an async context. Use 'await pipeline.run_async(...)' instead."
707
            )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc