• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 15848630494

24 Jun 2025 11:00AM UTC coverage: 90.175% (-0.007%) from 90.182%
15848630494

Pull #9526

github

web-flow
Merge 552c3e435 into d14f5dca0
Pull Request #9526: fix: Update the de/serialization with schema utils

11610 of 12875 relevant lines covered (90.17%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.74
haystack/core/pipeline/async_pipeline.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import asyncio
1✔
6
import contextvars
1✔
7
from typing import Any, AsyncIterator, Dict, List, Optional, Set
1✔
8

9
from haystack import logging, tracing
1✔
10
from haystack.core.component import Component
1✔
11
from haystack.core.errors import PipelineMaxComponentRuns, PipelineRuntimeError
1✔
12
from haystack.core.pipeline.base import (
1✔
13
    _COMPONENT_INPUT,
14
    _COMPONENT_OUTPUT,
15
    _COMPONENT_VISITS,
16
    ComponentPriority,
17
    PipelineBase,
18
)
19
from haystack.core.pipeline.utils import _deepcopy_with_exceptions
1✔
20
from haystack.telemetry import pipeline_running
1✔
21

22
logger = logging.getLogger(__name__)
1✔
23

24

25
class AsyncPipeline(PipelineBase):
1✔
26
    """
27
    Asynchronous version of the Pipeline orchestration engine.
28

29
    Manages components in a pipeline allowing for concurrent processing when the pipeline's execution graph permits.
30
    This enables efficient processing of components by minimizing idle time and maximizing resource utilization.
31
    """
32

33
    @staticmethod
1✔
34
    async def _run_component_async(  # pylint: disable=too-many-positional-arguments
1✔
35
        component_name: str,
36
        component: Dict[str, Any],
37
        component_inputs: Dict[str, Any],
38
        component_visits: Dict[str, int],
39
        max_runs_per_component: int = 100,
40
        parent_span: Optional[tracing.Span] = None,
41
    ) -> Dict[str, Any]:
42
        """
43
        Executes a single component asynchronously.
44

45
        If the component supports async execution, it is awaited directly as it will run async;
46
        otherwise the component is offloaded to executor.
47

48
        The method also updates the `visits` count of the component, writes outputs to `inputs_state`,
49
        and returns pruned outputs that get stored in `pipeline_outputs`.
50

51
        :param component_name: The name of the component.
52
        :param component_inputs: Inputs for the component.
53
        :returns: Outputs from the component that can be yielded from run_async_generator.
54
        """
55
        if component_visits[component_name] > max_runs_per_component:
1✔
56
            raise PipelineMaxComponentRuns(f"Max runs for '{component_name}' reached.")
×
57

58
        instance: Component = component["instance"]
1✔
59
        with PipelineBase._create_component_span(
1✔
60
            component_name=component_name, instance=instance, inputs=component_inputs, parent_span=parent_span
61
        ) as span:
62
            span.set_content_tag(_COMPONENT_INPUT, _deepcopy_with_exceptions(component_inputs))
1✔
63
            logger.info("Running component {component_name}", component_name=component_name)
1✔
64

65
            if getattr(instance, "__haystack_supports_async__", False):
1✔
66
                try:
1✔
67
                    outputs = await instance.run_async(**component_inputs)  # type: ignore
1✔
68
                except Exception as error:
×
69
                    raise PipelineRuntimeError.from_exception(component_name, instance.__class__, error) from error
×
70
            else:
71
                loop = asyncio.get_running_loop()
1✔
72
                # Important: contextvars (e.g. active tracing Span) don’t propagate to running loop's ThreadPoolExecutor
73
                # We use ctx.run(...) to preserve context like the active tracing span
74
                ctx = contextvars.copy_context()
1✔
75
                outputs = await loop.run_in_executor(None, lambda: ctx.run(lambda: instance.run(**component_inputs)))
1✔
76

77
            component_visits[component_name] += 1
1✔
78

79
            if not isinstance(outputs, dict):
1✔
80
                raise PipelineRuntimeError.from_invalid_output(component_name, instance.__class__, outputs)
×
81

82
            span.set_tag(_COMPONENT_VISITS, component_visits[component_name])
1✔
83
            span.set_content_tag(_COMPONENT_OUTPUT, _deepcopy_with_exceptions(outputs))
1✔
84

85
            return outputs
1✔
86

87
    async def run_async_generator(  # noqa: PLR0915,C901
1✔
88
        self, data: Dict[str, Any], include_outputs_from: Optional[Set[str]] = None, concurrency_limit: int = 4
89
    ) -> AsyncIterator[Dict[str, Any]]:
90
        """
91
        Executes the pipeline step by step asynchronously, yielding partial outputs when any component finishes.
92

93
        Usage:
94
        ```python
95
        from haystack import Document
96
        from haystack.components.builders import ChatPromptBuilder
97
        from haystack.dataclasses import ChatMessage
98
        from haystack.utils import Secret
99
        from haystack.document_stores.in_memory import InMemoryDocumentStore
100
        from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
101
        from haystack.components.generators.chat import OpenAIChatGenerator
102
        from haystack.components.builders.prompt_builder import PromptBuilder
103
        from haystack import AsyncPipeline
104
        import asyncio
105

106
        # Write documents to InMemoryDocumentStore
107
        document_store = InMemoryDocumentStore()
108
        document_store.write_documents([
109
            Document(content="My name is Jean and I live in Paris."),
110
            Document(content="My name is Mark and I live in Berlin."),
111
            Document(content="My name is Giorgio and I live in Rome.")
112
        ])
113

114
        prompt_template = [
115
            ChatMessage.from_user(
116
                '''
117
                Given these documents, answer the question.
118
                Documents:
119
                {% for doc in documents %}
120
                    {{ doc.content }}
121
                {% endfor %}
122
                Question: {{question}}
123
                Answer:
124
                ''')
125
        ]
126

127
        # Create and connect pipeline components
128
        retriever = InMemoryBM25Retriever(document_store=document_store)
129
        prompt_builder = ChatPromptBuilder(template=prompt_template)
130
        llm = OpenAIChatGenerator()
131

132
        rag_pipeline = AsyncPipeline()
133
        rag_pipeline.add_component("retriever", retriever)
134
        rag_pipeline.add_component("prompt_builder", prompt_builder)
135
        rag_pipeline.add_component("llm", llm)
136
        rag_pipeline.connect("retriever", "prompt_builder.documents")
137
        rag_pipeline.connect("prompt_builder", "llm")
138

139
        # Prepare input data
140
        question = "Who lives in Paris?"
141
        data = {
142
            "retriever": {"query": question},
143
            "prompt_builder": {"question": question},
144
        }
145

146

147
        # Process results as they become available
148
        async def process_results():
149
            async for partial_output in rag_pipeline.run_async_generator(
150
                    data=data,
151
                    include_outputs_from={"retriever", "llm"}
152
            ):
153
                # Each partial_output contains the results from a completed component
154
                if "retriever" in partial_output:
155
                    print("Retrieved documents:", len(partial_output["retriever"]["documents"]))
156
                if "llm" in partial_output:
157
                    print("Generated answer:", partial_output["llm"]["replies"][0])
158

159

160
        asyncio.run(process_results())
161
        ```
162

163
        :param data: Initial input data to the pipeline.
164
        :param concurrency_limit: The maximum number of components that are allowed to run concurrently.
165
        :param include_outputs_from:
166
            Set of component names whose individual outputs are to be
167
            included in the pipeline's output. For components that are
168
            invoked multiple times (in a loop), only the last-produced
169
            output is included.
170
        :return: An async iterator containing partial (and final) outputs.
171

172
        :raises ValueError:
173
            If invalid inputs are provided to the pipeline.
174
        :raises PipelineMaxComponentRuns:
175
            If a component exceeds the maximum number of allowed executions within the pipeline.
176
        :raises PipelineRuntimeError:
177
            If the Pipeline contains cycles with unsupported connections that would cause
178
            it to get stuck and fail running.
179
            Or if a Component fails or returns output in an unsupported type.
180
        """
181
        if include_outputs_from is None:
1✔
182
            include_outputs_from = set()
1✔
183

184
        # 0) Basic pipeline init
185
        pipeline_running(self)  # telemetry
1✔
186
        self.warm_up()  # optional warm-up (if needed)
1✔
187

188
        # 1) Prepare ephemeral state
189
        ready_sem = asyncio.Semaphore(max(1, concurrency_limit))
1✔
190
        inputs_state: Dict[str, Dict[str, List[Dict[str, Any]]]] = {}
1✔
191
        pipeline_outputs: Dict[str, Any] = {}
1✔
192
        running_tasks: Dict[asyncio.Task, str] = {}
1✔
193

194
        # A set of component names that have been scheduled but not finished:
195
        scheduled_components: Set[str] = set()
1✔
196

197
        # 2) Convert input data
198
        prepared_data = self._prepare_component_input_data(data)
1✔
199

200
        # raises ValueError if input is malformed in some way
201
        self.validate_input(prepared_data)
1✔
202
        inputs_state = self._convert_to_internal_format(prepared_data)
1✔
203

204
        # For quick lookup of downstream receivers
205
        ordered_names = sorted(self.graph.nodes.keys())
1✔
206
        cached_receivers = {n: self._find_receivers_from(n) for n in ordered_names}
1✔
207
        component_visits = dict.fromkeys(ordered_names, 0)
1✔
208
        cached_topological_sort = None
1✔
209

210
        # We fill the queue once and raise if all components are BLOCKED
211
        self.validate_pipeline(self._fill_queue(ordered_names, inputs_state, component_visits))
1✔
212

213
        # Single parent span for entire pipeline execution
214
        with tracing.tracer.trace(
1✔
215
            "haystack.async_pipeline.run",
216
            tags={
217
                "haystack.pipeline.input_data": prepared_data,
218
                "haystack.pipeline.output_data": pipeline_outputs,
219
                "haystack.pipeline.metadata": self.metadata,
220
                "haystack.pipeline.max_runs_per_component": self._max_runs_per_component,
221
            },
222
        ) as parent_span:
223
            # -------------------------------------------------
224
            # We define some functions here so that they have access to local runtime state
225
            # (inputs, tasks, scheduled components) via closures.
226
            # -------------------------------------------------
227
            async def _run_highest_in_isolation(component_name: str) -> AsyncIterator[Dict[str, Any]]:
1✔
228
                """
229
                Runs a component with HIGHEST priority in isolation.
230

231
                We need to run components with HIGHEST priority (i.e. components with GreedyVariadic input socket)
232
                because otherwise, downstream components could produce additional inputs for the GreedyVariadic socket.
233

234
                :param component_name: The name of the component.
235
                :return: An async iterator of partial outputs.
236
                """
237
                # 1) Wait for all in-flight tasks to finish
238
                while running_tasks:
×
239
                    done, _pending = await asyncio.wait(running_tasks.keys(), return_when=asyncio.ALL_COMPLETED)
×
240
                    for finished in done:
×
241
                        finished_component_name = running_tasks.pop(finished)
×
242
                        partial_result = finished.result()
×
243
                        scheduled_components.discard(finished_component_name)
×
244
                        if partial_result:
×
245
                            yield_dict = {finished_component_name: _deepcopy_with_exceptions(partial_result)}
×
246
                            yield yield_dict  # partial outputs
×
247

248
                if component_name in scheduled_components:
×
249
                    # If it's already scheduled for some reason, skip
250
                    return
×
251

252
                # 2) Run the HIGHEST component by itself
253
                scheduled_components.add(component_name)
×
254
                comp_dict = self._get_component_with_graph_metadata_and_visits(
×
255
                    component_name, component_visits[component_name]
256
                )
257
                component_inputs = self._consume_component_inputs(component_name, comp_dict, inputs_state)
×
258
                component_inputs = self._add_missing_input_defaults(component_inputs, comp_dict["input_sockets"])
×
259
                component_pipeline_outputs = await self._run_component_async(
×
260
                    component_name=component_name,
261
                    component=comp_dict,
262
                    component_inputs=component_inputs,
263
                    component_visits=component_visits,
264
                    max_runs_per_component=self._max_runs_per_component,
265
                    parent_span=parent_span,
266
                )
267

268
                # Distribute outputs to downstream inputs; also prune outputs based on `include_outputs_from`
269
                pruned = self._write_component_outputs(
×
270
                    component_name=component_name,
271
                    component_outputs=component_pipeline_outputs,
272
                    inputs=inputs_state,
273
                    receivers=cached_receivers[component_name],
274
                    include_outputs_from=include_outputs_from,
275
                )
276
                if pruned:
×
277
                    pipeline_outputs[component_name] = pruned
×
278

279
                scheduled_components.remove(component_name)
×
280
                if pruned:
×
281
                    yield {component_name: _deepcopy_with_exceptions(pruned)}
×
282

283
            async def _schedule_task(component_name: str) -> None:
1✔
284
                """
285
                Schedule a component to run.
286

287
                We do NOT wait for it to finish here. This allows us to run other components concurrently.
288

289
                :param component_name: The name of the component.
290
                """
291

292
                if component_name in scheduled_components:
1✔
293
                    return  # already scheduled, do nothing
×
294

295
                scheduled_components.add(component_name)
1✔
296

297
                comp_dict = self._get_component_with_graph_metadata_and_visits(
1✔
298
                    component_name, component_visits[component_name]
299
                )
300
                component_inputs = self._consume_component_inputs(component_name, comp_dict, inputs_state)
1✔
301
                component_inputs = self._add_missing_input_defaults(component_inputs, comp_dict["input_sockets"])
1✔
302

303
                async def _runner():
1✔
304
                    async with ready_sem:
1✔
305
                        component_pipeline_outputs = await self._run_component_async(
1✔
306
                            component_name=component_name,
307
                            component=comp_dict,
308
                            component_inputs=component_inputs,
309
                            component_visits=component_visits,
310
                            max_runs_per_component=self._max_runs_per_component,
311
                            parent_span=parent_span,
312
                        )
313

314
                    # Distribute outputs to downstream inputs; also prune outputs based on `include_outputs_from`
315
                    pruned = self._write_component_outputs(
1✔
316
                        component_name=component_name,
317
                        component_outputs=component_pipeline_outputs,
318
                        inputs=inputs_state,
319
                        receivers=cached_receivers[component_name],
320
                        include_outputs_from=include_outputs_from,
321
                    )
322
                    if pruned:
1✔
323
                        pipeline_outputs[component_name] = pruned
1✔
324

325
                    scheduled_components.remove(component_name)
1✔
326
                    return pruned
1✔
327

328
                task = asyncio.create_task(_runner())
1✔
329
                running_tasks[task] = component_name
1✔
330

331
            async def _wait_for_one_task_to_complete() -> AsyncIterator[Dict[str, Any]]:
1✔
332
                """
333
                Wait for exactly one running task to finish, yield partial outputs.
334

335
                If no tasks are running, does nothing.
336
                """
337
                if running_tasks:
1✔
338
                    done, _ = await asyncio.wait(running_tasks.keys(), return_when=asyncio.FIRST_COMPLETED)
1✔
339
                    for finished in done:
1✔
340
                        finished_component_name = running_tasks.pop(finished)
1✔
341
                        partial_result = finished.result()
1✔
342
                        scheduled_components.discard(finished_component_name)
1✔
343
                        if partial_result:
1✔
344
                            yield {finished_component_name: _deepcopy_with_exceptions(partial_result)}
1✔
345

346
            async def _wait_for_all_tasks_to_complete() -> AsyncIterator[Dict[str, Any]]:
1✔
347
                """
348
                Wait for all running tasks to finish, yield partial outputs.
349
                """
350
                if running_tasks:
1✔
351
                    done, _ = await asyncio.wait(running_tasks.keys(), return_when=asyncio.ALL_COMPLETED)
×
352
                    for finished in done:
×
353
                        finished_component_name = running_tasks.pop(finished)
×
354
                        partial_result = finished.result()
×
355
                        scheduled_components.discard(finished_component_name)
×
356
                        if partial_result:
×
357
                            yield {finished_component_name: _deepcopy_with_exceptions(partial_result)}
×
358

359
            # -------------------------------------------------
360
            # MAIN SCHEDULING LOOP
361
            # -------------------------------------------------
362
            while True:
363
                # 2) Build the priority queue of candidates
364
                priority_queue = self._fill_queue(ordered_names, inputs_state, component_visits)
1✔
365
                candidate = self._get_next_runnable_component(priority_queue, component_visits)
1✔
366
                if candidate is None and running_tasks:
1✔
367
                    # We need to wait for one task to finish to make progress and potentially unblock the priority_queue
368
                    async for partial_res in _wait_for_one_task_to_complete():
×
369
                        yield partial_res
×
370
                    continue
×
371

372
                if candidate is None and not running_tasks:
1✔
373
                    # done
374
                    break
1✔
375

376
                priority, comp_name, _ = candidate  # type: ignore
1✔
377

378
                if comp_name in scheduled_components:
1✔
379
                    # We need to wait for one task to finish to make progress
380
                    async for partial_res in _wait_for_one_task_to_complete():
×
381
                        yield partial_res
×
382
                    continue
×
383

384
                if priority == ComponentPriority.HIGHEST:
1✔
385
                    # 1) run alone
386
                    async for partial_res in _run_highest_in_isolation(comp_name):
×
387
                        yield partial_res
×
388
                    # then continue the loop
389
                    continue
×
390

391
                if priority == ComponentPriority.READY:
1✔
392
                    # 1) schedule this one
393
                    await _schedule_task(comp_name)
1✔
394

395
                    # 2) Possibly schedule more READY tasks if concurrency not fully used
396
                    while len(priority_queue) > 0 and not ready_sem.locked():
1✔
397
                        peek_prio, peek_name = priority_queue.peek()
1✔
398
                        if peek_prio in (ComponentPriority.BLOCKED, ComponentPriority.HIGHEST):
1✔
399
                            # can't run or must run alone => skip
400
                            break
1✔
401
                        if peek_prio == ComponentPriority.READY:
1✔
402
                            priority_queue.pop()
1✔
403
                            await _schedule_task(peek_name)
1✔
404
                            # keep adding while concurrency is not locked
405
                            continue
1✔
406

407
                        # The next is DEFER/DEFER_LAST => we only schedule it if it "becomes READY"
408
                        # We'll handle it in the next iteration or with incremental waiting
409
                        break
×
410

411
                # We only schedule components with priority DEFER or DEFER_LAST when no other tasks are running
412
                elif priority in (ComponentPriority.DEFER, ComponentPriority.DEFER_LAST) and not running_tasks:
×
413
                    if len(priority_queue) > 0:
×
414
                        comp_name, topological_sort = self._tiebreak_waiting_components(
×
415
                            component_name=comp_name,
416
                            priority=priority,
417
                            priority_queue=priority_queue,
418
                            topological_sort=cached_topological_sort,
419
                        )
420
                        cached_topological_sort = topological_sort
×
421

422
                    await _schedule_task(comp_name)
×
423

424
                # To make progress, we wait for one task to complete before re-starting the loop
425
                async for partial_res in _wait_for_one_task_to_complete():
1✔
426
                    yield partial_res
1✔
427

428
            # End main loop
429

430
            # 3) Drain leftover tasks
431
            async for partial_res in _wait_for_all_tasks_to_complete():
1✔
432
                yield partial_res
1✔
433

434
            # 4) Yield final pipeline outputs
435
            yield _deepcopy_with_exceptions(pipeline_outputs)
1✔
436

437
    async def run_async(
1✔
438
        self, data: Dict[str, Any], include_outputs_from: Optional[Set[str]] = None, concurrency_limit: int = 4
439
    ) -> Dict[str, Any]:
440
        """
441
        Provides an asynchronous interface to run the pipeline with provided input data.
442

443
        This method allows the pipeline to be integrated into an asynchronous workflow, enabling non-blocking
444
        execution of pipeline components.
445

446
        Usage:
447
        ```python
448
        import asyncio
449

450
        from haystack import Document
451
        from haystack.components.builders import ChatPromptBuilder
452
        from haystack.components.generators.chat import OpenAIChatGenerator
453
        from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
454
        from haystack.core.pipeline import AsyncPipeline
455
        from haystack.dataclasses import ChatMessage
456
        from haystack.document_stores.in_memory import InMemoryDocumentStore
457

458
        # Write documents to InMemoryDocumentStore
459
        document_store = InMemoryDocumentStore()
460
        document_store.write_documents([
461
            Document(content="My name is Jean and I live in Paris."),
462
            Document(content="My name is Mark and I live in Berlin."),
463
            Document(content="My name is Giorgio and I live in Rome.")
464
        ])
465

466
        prompt_template = [
467
            ChatMessage.from_user(
468
                '''
469
                Given these documents, answer the question.
470
                Documents:
471
                {% for doc in documents %}
472
                    {{ doc.content }}
473
                {% endfor %}
474
                Question: {{question}}
475
                Answer:
476
                ''')
477
        ]
478

479
        retriever = InMemoryBM25Retriever(document_store=document_store)
480
        prompt_builder = ChatPromptBuilder(template=prompt_template)
481
        llm = OpenAIChatGenerator()
482

483
        rag_pipeline = AsyncPipeline()
484
        rag_pipeline.add_component("retriever", retriever)
485
        rag_pipeline.add_component("prompt_builder", prompt_builder)
486
        rag_pipeline.add_component("llm", llm)
487
        rag_pipeline.connect("retriever", "prompt_builder.documents")
488
        rag_pipeline.connect("prompt_builder", "llm")
489

490
        # Ask a question
491
        question = "Who lives in Paris?"
492

493
        async def run_inner(data, include_outputs_from):
494
            return await rag_pipeline.run_async(data=data, include_outputs_from=include_outputs_from)
495

496
        data = {
497
            "retriever": {"query": question},
498
            "prompt_builder": {"question": question},
499
        }
500

501
        results = asyncio.run(run_inner(data, include_outputs_from={"retriever", "llm"}))
502

503
        print(results["llm"]["replies"])
504
        # [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=[TextContent(text='Jean lives in Paris.')],
505
        # _name=None, _meta={'model': 'gpt-4o-mini-2024-07-18', 'index': 0, 'finish_reason': 'stop', 'usage':
506
        # {'completion_tokens': 6, 'prompt_tokens': 69, 'total_tokens': 75,
507
        # 'completion_tokens_details': CompletionTokensDetails(accepted_prediction_tokens=0,
508
        # audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0), 'prompt_tokens_details':
509
        # PromptTokensDetails(audio_tokens=0, cached_tokens=0)}})]
510
        ```
511

512
        :param data:
513
            A dictionary of inputs for the pipeline's components. Each key is a component name
514
            and its value is a dictionary of that component's input parameters:
515
            ```
516
            data = {
517
                "comp1": {"input1": 1, "input2": 2},
518
            }
519
            ```
520
            For convenience, this format is also supported when input names are unique:
521
            ```
522
            data = {
523
                "input1": 1, "input2": 2,
524
            }
525
            ```
526
        :param include_outputs_from:
527
            Set of component names whose individual outputs are to be
528
            included in the pipeline's output. For components that are
529
            invoked multiple times (in a loop), only the last-produced
530
            output is included.
531
        :param concurrency_limit: The maximum number of components that should be allowed to run concurrently.
532
        :returns:
533
            A dictionary where each entry corresponds to a component name
534
            and its output. If `include_outputs_from` is `None`, this dictionary
535
            will only contain the outputs of leaf components, i.e., components
536
            without outgoing connections.
537

538
        :raises ValueError:
539
            If invalid inputs are provided to the pipeline.
540
        :raises PipelineRuntimeError:
541
            If the Pipeline contains cycles with unsupported connections that would cause
542
            it to get stuck and fail running.
543
            Or if a Component fails or returns output in an unsupported type.
544
        :raises PipelineMaxComponentRuns:
545
            If a Component reaches the maximum number of times it can be run in this Pipeline.
546
        """
547
        final: Dict[str, Any] = {}
1✔
548
        async for partial in self.run_async_generator(
1✔
549
            data=data, concurrency_limit=concurrency_limit, include_outputs_from=include_outputs_from
550
        ):
551
            final = partial
1✔
552
        return final or {}
1✔
553

554
    def run(
1✔
555
        self, data: Dict[str, Any], include_outputs_from: Optional[Set[str]] = None, concurrency_limit: int = 4
556
    ) -> Dict[str, Any]:
557
        """
558
        Provides a synchronous interface to run the pipeline with given input data.
559

560
        Internally, the pipeline components are executed asynchronously, but the method itself
561
        will block until the entire pipeline execution is complete.
562

563
        In case you need asynchronous methods, consider using `run_async` or `run_async_generator`.
564

565
        Usage:
566
        ```python
567
        from haystack import Document
568
        from haystack.components.builders import ChatPromptBuilder
569
        from haystack.components.generators.chat import OpenAIChatGenerator
570
        from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
571
        from haystack.core.pipeline import AsyncPipeline
572
        from haystack.dataclasses import ChatMessage
573
        from haystack.document_stores.in_memory import InMemoryDocumentStore
574

575
        # Write documents to InMemoryDocumentStore
576
        document_store = InMemoryDocumentStore()
577
        document_store.write_documents([
578
            Document(content="My name is Jean and I live in Paris."),
579
            Document(content="My name is Mark and I live in Berlin."),
580
            Document(content="My name is Giorgio and I live in Rome.")
581
        ])
582

583
        prompt_template = [
584
            ChatMessage.from_user(
585
                '''
586
                Given these documents, answer the question.
587
                Documents:
588
                {% for doc in documents %}
589
                    {{ doc.content }}
590
                {% endfor %}
591
                Question: {{question}}
592
                Answer:
593
                ''')
594
        ]
595

596

597
        retriever = InMemoryBM25Retriever(document_store=document_store)
598
        prompt_builder = ChatPromptBuilder(template=prompt_template)
599
        llm = OpenAIChatGenerator()
600

601
        rag_pipeline = AsyncPipeline()
602
        rag_pipeline.add_component("retriever", retriever)
603
        rag_pipeline.add_component("prompt_builder", prompt_builder)
604
        rag_pipeline.add_component("llm", llm)
605
        rag_pipeline.connect("retriever", "prompt_builder.documents")
606
        rag_pipeline.connect("prompt_builder", "llm")
607

608
        # Ask a question
609
        question = "Who lives in Paris?"
610

611
        data = {
612
            "retriever": {"query": question},
613
            "prompt_builder": {"question": question},
614
        }
615

616
        results = rag_pipeline.run(data)
617

618
        print(results["llm"]["replies"])
619
        # [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=[TextContent(text='Jean lives in Paris.')],
620
        # _name=None, _meta={'model': 'gpt-4o-mini-2024-07-18', 'index': 0, 'finish_reason': 'stop', 'usage':
621
        # {'completion_tokens': 6, 'prompt_tokens': 69, 'total_tokens': 75, 'completion_tokens_details':
622
        # CompletionTokensDetails(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0,
623
        # rejected_prediction_tokens=0), 'prompt_tokens_details': PromptTokensDetails(audio_tokens=0,
624
        # cached_tokens=0)}})]
625
        ```
626

627
        :param data:
628
            A dictionary of inputs for the pipeline's components. Each key is a component name
629
            and its value is a dictionary of that component's input parameters:
630
            ```
631
            data = {
632
                "comp1": {"input1": 1, "input2": 2},
633
            }
634
            ```
635
            For convenience, this format is also supported when input names are unique:
636
            ```
637
            data = {
638
                "input1": 1, "input2": 2,
639
            }
640
            ```
641
        :param include_outputs_from:
642
            Set of component names whose individual outputs are to be
643
            included in the pipeline's output. For components that are
644
            invoked multiple times (in a loop), only the last-produced
645
            output is included.
646
        :param concurrency_limit: The maximum number of components that should be allowed to run concurrently.
647

648
        :returns:
649
            A dictionary where each entry corresponds to a component name
650
            and its output. If `include_outputs_from` is `None`, this dictionary
651
            will only contain the outputs of leaf components, i.e., components
652
            without outgoing connections.
653

654
        :raises ValueError:
655
            If invalid inputs are provided to the pipeline.
656
        :raises PipelineRuntimeError:
657
            If the Pipeline contains cycles with unsupported connections that would cause
658
            it to get stuck and fail running.
659
            Or if a Component fails or returns output in an unsupported type.
660
        :raises PipelineMaxComponentRuns:
661
            If a Component reaches the maximum number of times it can be run in this Pipeline.
662
        """
663
        return asyncio.run(
×
664
            self.run_async(data=data, include_outputs_from=include_outputs_from, concurrency_limit=concurrency_limit)
665
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc