• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 15210934031

23 May 2025 01:01PM CUT coverage: 90.056% (-0.03%) from 90.087%
15210934031

Pull #9434

github

web-flow
Merge f2e68af13 into d8cc6f733
Pull Request #9434: fix: Fix invoker to work when using dataclass with from_dict but dataclass…

11338 of 12590 relevant lines covered (90.06%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.32
haystack/core/pipeline/async_pipeline.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import asyncio
1✔
6
from typing import Any, AsyncIterator, Dict, List, Optional, Set
1✔
7

8
from haystack import logging, tracing
1✔
9
from haystack.core.component import Component
1✔
10
from haystack.core.errors import PipelineMaxComponentRuns, PipelineRuntimeError
1✔
11
from haystack.core.pipeline.base import (
1✔
12
    _COMPONENT_INPUT,
13
    _COMPONENT_OUTPUT,
14
    _COMPONENT_VISITS,
15
    ComponentPriority,
16
    PipelineBase,
17
)
18
from haystack.core.pipeline.utils import _deepcopy_with_exceptions
1✔
19
from haystack.telemetry import pipeline_running
1✔
20

21
logger = logging.getLogger(__name__)
1✔
22

23

24
class AsyncPipeline(PipelineBase):
1✔
25
    """
26
    Asynchronous version of the Pipeline orchestration engine.
27

28
    Manages components in a pipeline allowing for concurrent processing when the pipeline's execution graph permits.
29
    This enables efficient processing of components by minimizing idle time and maximizing resource utilization.
30
    """
31

32
    @staticmethod
1✔
33
    async def _run_component_async(  # pylint: disable=too-many-positional-arguments
1✔
34
        component_name: str,
35
        component: Dict[str, Any],
36
        component_inputs: Dict[str, Any],
37
        component_visits: Dict[str, int],
38
        max_runs_per_component: int = 100,
39
        parent_span: Optional[tracing.Span] = None,
40
    ) -> Dict[str, Any]:
41
        """
42
        Executes a single component asynchronously.
43

44
        If the component supports async execution, it is awaited directly as it will run async;
45
        otherwise the component is offloaded to executor.
46

47
        The method also updates the `visits` count of the component, writes outputs to `inputs_state`,
48
        and returns pruned outputs that get stored in `pipeline_outputs`.
49

50
        :param component_name: The name of the component.
51
        :param component_inputs: Inputs for the component.
52
        :returns: Outputs from the component that can be yielded from run_async_generator.
53
        """
54
        if component_visits[component_name] > max_runs_per_component:
1✔
55
            raise PipelineMaxComponentRuns(f"Max runs for '{component_name}' reached.")
×
56

57
        instance: Component = component["instance"]
1✔
58
        with PipelineBase._create_component_span(
1✔
59
            component_name=component_name, instance=instance, inputs=component_inputs, parent_span=parent_span
60
        ) as span:
61
            span.set_content_tag(_COMPONENT_INPUT, _deepcopy_with_exceptions(component_inputs))
1✔
62
            logger.info("Running component {component_name}", component_name=component_name)
1✔
63

64
            if getattr(instance, "__haystack_supports_async__", False):
1✔
65
                try:
1✔
66
                    outputs = await instance.run_async(**component_inputs)  # type: ignore
1✔
67
                except Exception as error:
×
68
                    raise PipelineRuntimeError.from_exception(component_name, instance.__class__, error) from error
×
69
            else:
70
                loop = asyncio.get_running_loop()
1✔
71
                outputs = await loop.run_in_executor(None, lambda: instance.run(**component_inputs))
1✔
72

73
            component_visits[component_name] += 1
1✔
74

75
            if not isinstance(outputs, dict):
1✔
76
                raise PipelineRuntimeError.from_invalid_output(component_name, instance.__class__, outputs)
×
77

78
            span.set_tag(_COMPONENT_VISITS, component_visits[component_name])
1✔
79
            span.set_content_tag(_COMPONENT_OUTPUT, _deepcopy_with_exceptions(outputs))
1✔
80

81
            return outputs
1✔
82

83
    async def run_async_generator(  # noqa: PLR0915,C901
1✔
84
        self, data: Dict[str, Any], include_outputs_from: Optional[Set[str]] = None, concurrency_limit: int = 4
85
    ) -> AsyncIterator[Dict[str, Any]]:
86
        """
87
        Executes the pipeline step by step asynchronously, yielding partial outputs when any component finishes.
88

89
        Usage:
90
        ```python
91
        from haystack import Document
92
        from haystack.components.builders import ChatPromptBuilder
93
        from haystack.dataclasses import ChatMessage
94
        from haystack.utils import Secret
95
        from haystack.document_stores.in_memory import InMemoryDocumentStore
96
        from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
97
        from haystack.components.generators.chat import OpenAIChatGenerator
98
        from haystack.components.builders.prompt_builder import PromptBuilder
99
        from haystack import AsyncPipeline
100
        import asyncio
101

102
        # Write documents to InMemoryDocumentStore
103
        document_store = InMemoryDocumentStore()
104
        document_store.write_documents([
105
            Document(content="My name is Jean and I live in Paris."),
106
            Document(content="My name is Mark and I live in Berlin."),
107
            Document(content="My name is Giorgio and I live in Rome.")
108
        ])
109

110
        prompt_template = [
111
            ChatMessage.from_user(
112
                '''
113
                Given these documents, answer the question.
114
                Documents:
115
                {% for doc in documents %}
116
                    {{ doc.content }}
117
                {% endfor %}
118
                Question: {{question}}
119
                Answer:
120
                ''')
121
        ]
122

123
        # Create and connect pipeline components
124
        retriever = InMemoryBM25Retriever(document_store=document_store)
125
        prompt_builder = ChatPromptBuilder(template=prompt_template)
126
        llm = OpenAIChatGenerator()
127

128
        rag_pipeline = AsyncPipeline()
129
        rag_pipeline.add_component("retriever", retriever)
130
        rag_pipeline.add_component("prompt_builder", prompt_builder)
131
        rag_pipeline.add_component("llm", llm)
132
        rag_pipeline.connect("retriever", "prompt_builder.documents")
133
        rag_pipeline.connect("prompt_builder", "llm")
134

135
        # Prepare input data
136
        question = "Who lives in Paris?"
137
        data = {
138
            "retriever": {"query": question},
139
            "prompt_builder": {"question": question},
140
        }
141

142

143
        # Process results as they become available
144
        async def process_results():
145
            async for partial_output in rag_pipeline.run_async_generator(
146
                    data=data,
147
                    include_outputs_from={"retriever", "llm"}
148
            ):
149
                # Each partial_output contains the results from a completed component
150
                if "retriever" in partial_output:
151
                    print("Retrieved documents:", len(partial_output["retriever"]["documents"]))
152
                if "llm" in partial_output:
153
                    print("Generated answer:", partial_output["llm"]["replies"][0])
154

155

156
        asyncio.run(process_results())
157
        ```
158

159
        :param data: Initial input data to the pipeline.
160
        :param concurrency_limit: The maximum number of components that are allowed to run concurrently.
161
        :param include_outputs_from:
162
            Set of component names whose individual outputs are to be
163
            included in the pipeline's output. For components that are
164
            invoked multiple times (in a loop), only the last-produced
165
            output is included.
166
        :return: An async iterator containing partial (and final) outputs.
167

168
        :raises ValueError:
169
            If invalid inputs are provided to the pipeline.
170
        :raises PipelineMaxComponentRuns:
171
            If a component exceeds the maximum number of allowed executions within the pipeline.
172
        :raises PipelineRuntimeError:
173
            If the Pipeline contains cycles with unsupported connections that would cause
174
            it to get stuck and fail running.
175
            Or if a Component fails or returns output in an unsupported type.
176
        """
177
        if include_outputs_from is None:
1✔
178
            include_outputs_from = set()
1✔
179

180
        # 0) Basic pipeline init
181
        pipeline_running(self)  # telemetry
1✔
182
        self.warm_up()  # optional warm-up (if needed)
1✔
183

184
        # 1) Prepare ephemeral state
185
        ready_sem = asyncio.Semaphore(max(1, concurrency_limit))
1✔
186
        inputs_state: Dict[str, Dict[str, List[Dict[str, Any]]]] = {}
1✔
187
        pipeline_outputs: Dict[str, Any] = {}
1✔
188
        running_tasks: Dict[asyncio.Task, str] = {}
1✔
189

190
        # A set of component names that have been scheduled but not finished:
191
        scheduled_components: Set[str] = set()
1✔
192

193
        # 2) Convert input data
194
        prepared_data = self._prepare_component_input_data(data)
1✔
195

196
        # raises ValueError if input is malformed in some way
197
        self._validate_input(prepared_data)
1✔
198
        inputs_state = self._convert_to_internal_format(prepared_data)
1✔
199

200
        # For quick lookup of downstream receivers
201
        ordered_names = sorted(self.graph.nodes.keys())
1✔
202
        cached_receivers = {n: self._find_receivers_from(n) for n in ordered_names}
1✔
203
        component_visits = dict.fromkeys(ordered_names, 0)
1✔
204
        cached_topological_sort = None
1✔
205

206
        # We fill the queue once and raise if all components are BLOCKED
207
        self.validate_pipeline(self._fill_queue(ordered_names, inputs_state, component_visits))
1✔
208

209
        # Single parent span for entire pipeline execution
210
        with tracing.tracer.trace(
1✔
211
            "haystack.async_pipeline.run",
212
            tags={
213
                "haystack.pipeline.input_data": prepared_data,
214
                "haystack.pipeline.output_data": pipeline_outputs,
215
                "haystack.pipeline.metadata": self.metadata,
216
                "haystack.pipeline.max_runs_per_component": self._max_runs_per_component,
217
            },
218
        ) as parent_span:
219
            # -------------------------------------------------
220
            # We define some functions here so that they have access to local runtime state
221
            # (inputs, tasks, scheduled components) via closures.
222
            # -------------------------------------------------
223
            async def _run_highest_in_isolation(component_name: str) -> AsyncIterator[Dict[str, Any]]:
1✔
224
                """
225
                Runs a component with HIGHEST priority in isolation.
226

227
                We need to run components with HIGHEST priority (i.e. components with GreedyVariadic input socket)
228
                because otherwise, downstream components could produce additional inputs for the GreedyVariadic socket.
229

230
                :param component_name: The name of the component.
231
                :return: An async iterator of partial outputs.
232
                """
233
                # 1) Wait for all in-flight tasks to finish
234
                while running_tasks:
×
235
                    done, _pending = await asyncio.wait(running_tasks.keys(), return_when=asyncio.ALL_COMPLETED)
×
236
                    for finished in done:
×
237
                        finished_component_name = running_tasks.pop(finished)
×
238
                        partial_result = finished.result()
×
239
                        scheduled_components.discard(finished_component_name)
×
240
                        if partial_result:
×
241
                            yield_dict = {finished_component_name: _deepcopy_with_exceptions(partial_result)}
×
242
                            yield yield_dict  # partial outputs
×
243

244
                if component_name in scheduled_components:
×
245
                    # If it's already scheduled for some reason, skip
246
                    return
×
247

248
                # 2) Run the HIGHEST component by itself
249
                scheduled_components.add(component_name)
×
250
                comp_dict = self._get_component_with_graph_metadata_and_visits(
×
251
                    component_name, component_visits[component_name]
252
                )
253
                component_inputs = self._consume_component_inputs(component_name, comp_dict, inputs_state)
×
254
                component_inputs = self._add_missing_input_defaults(component_inputs, comp_dict["input_sockets"])
×
255
                component_pipeline_outputs = await self._run_component_async(
×
256
                    component_name=component_name,
257
                    component=comp_dict,
258
                    component_inputs=component_inputs,
259
                    component_visits=component_visits,
260
                    max_runs_per_component=self._max_runs_per_component,
261
                    parent_span=parent_span,
262
                )
263

264
                # Distribute outputs to downstream inputs; also prune outputs based on `include_outputs_from`
265
                pruned = self._write_component_outputs(
×
266
                    component_name=component_name,
267
                    component_outputs=component_pipeline_outputs,
268
                    inputs=inputs_state,
269
                    receivers=cached_receivers[component_name],
270
                    include_outputs_from=include_outputs_from,
271
                )
272
                if pruned:
×
273
                    pipeline_outputs[component_name] = pruned
×
274

275
                scheduled_components.remove(component_name)
×
276
                if pruned:
×
277
                    yield {component_name: _deepcopy_with_exceptions(pruned)}
×
278

279
            async def _schedule_task(component_name: str) -> None:
1✔
280
                """
281
                Schedule a component to run.
282

283
                We do NOT wait for it to finish here. This allows us to run other components concurrently.
284

285
                :param component_name: The name of the component.
286
                """
287

288
                if component_name in scheduled_components:
1✔
289
                    return  # already scheduled, do nothing
×
290

291
                scheduled_components.add(component_name)
1✔
292

293
                comp_dict = self._get_component_with_graph_metadata_and_visits(
1✔
294
                    component_name, component_visits[component_name]
295
                )
296
                component_inputs = self._consume_component_inputs(component_name, comp_dict, inputs_state)
1✔
297
                component_inputs = self._add_missing_input_defaults(component_inputs, comp_dict["input_sockets"])
1✔
298

299
                async def _runner():
1✔
300
                    async with ready_sem:
1✔
301
                        component_pipeline_outputs = await self._run_component_async(
1✔
302
                            component_name=component_name,
303
                            component=comp_dict,
304
                            component_inputs=component_inputs,
305
                            component_visits=component_visits,
306
                            max_runs_per_component=self._max_runs_per_component,
307
                            parent_span=parent_span,
308
                        )
309

310
                    # Distribute outputs to downstream inputs; also prune outputs based on `include_outputs_from`
311
                    pruned = self._write_component_outputs(
1✔
312
                        component_name=component_name,
313
                        component_outputs=component_pipeline_outputs,
314
                        inputs=inputs_state,
315
                        receivers=cached_receivers[component_name],
316
                        include_outputs_from=include_outputs_from,
317
                    )
318
                    if pruned:
1✔
319
                        pipeline_outputs[component_name] = pruned
1✔
320

321
                    scheduled_components.remove(component_name)
1✔
322
                    return pruned
1✔
323

324
                task = asyncio.create_task(_runner())
1✔
325
                running_tasks[task] = component_name
1✔
326

327
            async def _wait_for_one_task_to_complete() -> AsyncIterator[Dict[str, Any]]:
1✔
328
                """
329
                Wait for exactly one running task to finish, yield partial outputs.
330

331
                If no tasks are running, does nothing.
332
                """
333
                if running_tasks:
1✔
334
                    done, _ = await asyncio.wait(running_tasks.keys(), return_when=asyncio.FIRST_COMPLETED)
1✔
335
                    for finished in done:
1✔
336
                        finished_component_name = running_tasks.pop(finished)
1✔
337
                        partial_result = finished.result()
1✔
338
                        scheduled_components.discard(finished_component_name)
1✔
339
                        if partial_result:
1✔
340
                            yield {finished_component_name: _deepcopy_with_exceptions(partial_result)}
1✔
341

342
            async def _wait_for_all_tasks_to_complete() -> AsyncIterator[Dict[str, Any]]:
1✔
343
                """
344
                Wait for all running tasks to finish, yield partial outputs.
345
                """
346
                if running_tasks:
1✔
347
                    done, _ = await asyncio.wait(running_tasks.keys(), return_when=asyncio.ALL_COMPLETED)
×
348
                    for finished in done:
×
349
                        finished_component_name = running_tasks.pop(finished)
×
350
                        partial_result = finished.result()
×
351
                        scheduled_components.discard(finished_component_name)
×
352
                        if partial_result:
×
353
                            yield {finished_component_name: _deepcopy_with_exceptions(partial_result)}
×
354

355
            # -------------------------------------------------
356
            # MAIN SCHEDULING LOOP
357
            # -------------------------------------------------
358
            while True:
359
                # 2) Build the priority queue of candidates
360
                priority_queue = self._fill_queue(ordered_names, inputs_state, component_visits)
1✔
361
                candidate = self._get_next_runnable_component(priority_queue, component_visits)
1✔
362
                if candidate is None and running_tasks:
1✔
363
                    # We need to wait for one task to finish to make progress and potentially unblock the priority_queue
364
                    async for partial_res in _wait_for_one_task_to_complete():
×
365
                        yield partial_res
×
366
                    continue
×
367

368
                if candidate is None and not running_tasks:
1✔
369
                    # done
370
                    break
1✔
371

372
                priority, comp_name, _ = candidate  # type: ignore
1✔
373

374
                if comp_name in scheduled_components:
1✔
375
                    # We need to wait for one task to finish to make progress
376
                    async for partial_res in _wait_for_one_task_to_complete():
×
377
                        yield partial_res
×
378
                    continue
×
379

380
                if priority == ComponentPriority.HIGHEST:
1✔
381
                    # 1) run alone
382
                    async for partial_res in _run_highest_in_isolation(comp_name):
×
383
                        yield partial_res
×
384
                    # then continue the loop
385
                    continue
×
386

387
                if priority == ComponentPriority.READY:
1✔
388
                    # 1) schedule this one
389
                    await _schedule_task(comp_name)
1✔
390

391
                    # 2) Possibly schedule more READY tasks if concurrency not fully used
392
                    while len(priority_queue) > 0 and not ready_sem.locked():
1✔
393
                        peek_prio, peek_name = priority_queue.peek()
1✔
394
                        if peek_prio in (ComponentPriority.BLOCKED, ComponentPriority.HIGHEST):
1✔
395
                            # can't run or must run alone => skip
396
                            break
1✔
397
                        if peek_prio == ComponentPriority.READY:
1✔
398
                            priority_queue.pop()
1✔
399
                            await _schedule_task(peek_name)
1✔
400
                            # keep adding while concurrency is not locked
401
                            continue
1✔
402

403
                        # The next is DEFER/DEFER_LAST => we only schedule it if it "becomes READY"
404
                        # We'll handle it in the next iteration or with incremental waiting
405
                        break
×
406

407
                # We only schedule components with priority DEFER or DEFER_LAST when no other tasks are running
408
                elif priority in (ComponentPriority.DEFER, ComponentPriority.DEFER_LAST) and not running_tasks:
×
409
                    if len(priority_queue) > 0:
×
410
                        comp_name, topological_sort = self._tiebreak_waiting_components(
×
411
                            component_name=comp_name,
412
                            priority=priority,
413
                            priority_queue=priority_queue,
414
                            topological_sort=cached_topological_sort,
415
                        )
416
                        cached_topological_sort = topological_sort
×
417

418
                    await _schedule_task(comp_name)
×
419

420
                # To make progress, we wait for one task to complete before re-starting the loop
421
                async for partial_res in _wait_for_one_task_to_complete():
1✔
422
                    yield partial_res
1✔
423

424
            # End main loop
425

426
            # 3) Drain leftover tasks
427
            async for partial_res in _wait_for_all_tasks_to_complete():
1✔
428
                yield partial_res
1✔
429

430
            # 4) Yield final pipeline outputs
431
            yield _deepcopy_with_exceptions(pipeline_outputs)
1✔
432

433
    async def run_async(
1✔
434
        self, data: Dict[str, Any], include_outputs_from: Optional[Set[str]] = None, concurrency_limit: int = 4
435
    ) -> Dict[str, Any]:
436
        """
437
        Provides an asynchronous interface to run the pipeline with provided input data.
438

439
        This method allows the pipeline to be integrated into an asynchronous workflow, enabling non-blocking
440
        execution of pipeline components.
441

442
        Usage:
443
        ```python
444
        import asyncio
445

446
        from haystack import Document
447
        from haystack.components.builders import ChatPromptBuilder
448
        from haystack.components.generators.chat import OpenAIChatGenerator
449
        from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
450
        from haystack.core.pipeline import AsyncPipeline
451
        from haystack.dataclasses import ChatMessage
452
        from haystack.document_stores.in_memory import InMemoryDocumentStore
453

454
        # Write documents to InMemoryDocumentStore
455
        document_store = InMemoryDocumentStore()
456
        document_store.write_documents([
457
            Document(content="My name is Jean and I live in Paris."),
458
            Document(content="My name is Mark and I live in Berlin."),
459
            Document(content="My name is Giorgio and I live in Rome.")
460
        ])
461

462
        prompt_template = [
463
            ChatMessage.from_user(
464
                '''
465
                Given these documents, answer the question.
466
                Documents:
467
                {% for doc in documents %}
468
                    {{ doc.content }}
469
                {% endfor %}
470
                Question: {{question}}
471
                Answer:
472
                ''')
473
        ]
474

475
        retriever = InMemoryBM25Retriever(document_store=document_store)
476
        prompt_builder = ChatPromptBuilder(template=prompt_template)
477
        llm = OpenAIChatGenerator()
478

479
        rag_pipeline = AsyncPipeline()
480
        rag_pipeline.add_component("retriever", retriever)
481
        rag_pipeline.add_component("prompt_builder", prompt_builder)
482
        rag_pipeline.add_component("llm", llm)
483
        rag_pipeline.connect("retriever", "prompt_builder.documents")
484
        rag_pipeline.connect("prompt_builder", "llm")
485

486
        # Ask a question
487
        question = "Who lives in Paris?"
488

489
        async def run_inner(data, include_outputs_from):
490
            return await rag_pipeline.run_async(data=data, include_outputs_from=include_outputs_from)
491

492
        data = {
493
            "retriever": {"query": question},
494
            "prompt_builder": {"question": question},
495
        }
496

497
        results = asyncio.run(run_inner(data, include_outputs_from={"retriever", "llm"}))
498

499
        print(results["llm"]["replies"])
500
        # [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=[TextContent(text='Jean lives in Paris.')],
501
        # _name=None, _meta={'model': 'gpt-4o-mini-2024-07-18', 'index': 0, 'finish_reason': 'stop', 'usage':
502
        # {'completion_tokens': 6, 'prompt_tokens': 69, 'total_tokens': 75,
503
        # 'completion_tokens_details': CompletionTokensDetails(accepted_prediction_tokens=0,
504
        # audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0), 'prompt_tokens_details':
505
        # PromptTokensDetails(audio_tokens=0, cached_tokens=0)}})]
506
        ```
507

508
        :param data:
509
            A dictionary of inputs for the pipeline's components. Each key is a component name
510
            and its value is a dictionary of that component's input parameters:
511
            ```
512
            data = {
513
                "comp1": {"input1": 1, "input2": 2},
514
            }
515
            ```
516
            For convenience, this format is also supported when input names are unique:
517
            ```
518
            data = {
519
                "input1": 1, "input2": 2,
520
            }
521
            ```
522
        :param include_outputs_from:
523
            Set of component names whose individual outputs are to be
524
            included in the pipeline's output. For components that are
525
            invoked multiple times (in a loop), only the last-produced
526
            output is included.
527
        :param concurrency_limit: The maximum number of components that should be allowed to run concurrently.
528
        :returns:
529
            A dictionary where each entry corresponds to a component name
530
            and its output. If `include_outputs_from` is `None`, this dictionary
531
            will only contain the outputs of leaf components, i.e., components
532
            without outgoing connections.
533

534
        :raises ValueError:
535
            If invalid inputs are provided to the pipeline.
536
        :raises PipelineRuntimeError:
537
            If the Pipeline contains cycles with unsupported connections that would cause
538
            it to get stuck and fail running.
539
            Or if a Component fails or returns output in an unsupported type.
540
        :raises PipelineMaxComponentRuns:
541
            If a Component reaches the maximum number of times it can be run in this Pipeline.
542
        """
543
        final: Dict[str, Any] = {}
1✔
544
        async for partial in self.run_async_generator(
1✔
545
            data=data, concurrency_limit=concurrency_limit, include_outputs_from=include_outputs_from
546
        ):
547
            final = partial
1✔
548
        return final or {}
1✔
549

550
    def run(
1✔
551
        self, data: Dict[str, Any], include_outputs_from: Optional[Set[str]] = None, concurrency_limit: int = 4
552
    ) -> Dict[str, Any]:
553
        """
554
        Provides a synchronous interface to run the pipeline with given input data.
555

556
        Internally, the pipeline components are executed asynchronously, but the method itself
557
        will block until the entire pipeline execution is complete.
558

559
        In case you need asynchronous methods, consider using `run_async` or `run_async_generator`.
560

561
        Usage:
562
        ```python
563
        from haystack import Document
564
        from haystack.components.builders import ChatPromptBuilder
565
        from haystack.components.generators.chat import OpenAIChatGenerator
566
        from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
567
        from haystack.core.pipeline import AsyncPipeline
568
        from haystack.dataclasses import ChatMessage
569
        from haystack.document_stores.in_memory import InMemoryDocumentStore
570

571
        # Write documents to InMemoryDocumentStore
572
        document_store = InMemoryDocumentStore()
573
        document_store.write_documents([
574
            Document(content="My name is Jean and I live in Paris."),
575
            Document(content="My name is Mark and I live in Berlin."),
576
            Document(content="My name is Giorgio and I live in Rome.")
577
        ])
578

579
        prompt_template = [
580
            ChatMessage.from_user(
581
                '''
582
                Given these documents, answer the question.
583
                Documents:
584
                {% for doc in documents %}
585
                    {{ doc.content }}
586
                {% endfor %}
587
                Question: {{question}}
588
                Answer:
589
                ''')
590
        ]
591

592

593
        retriever = InMemoryBM25Retriever(document_store=document_store)
594
        prompt_builder = ChatPromptBuilder(template=prompt_template)
595
        llm = OpenAIChatGenerator()
596

597
        rag_pipeline = AsyncPipeline()
598
        rag_pipeline.add_component("retriever", retriever)
599
        rag_pipeline.add_component("prompt_builder", prompt_builder)
600
        rag_pipeline.add_component("llm", llm)
601
        rag_pipeline.connect("retriever", "prompt_builder.documents")
602
        rag_pipeline.connect("prompt_builder", "llm")
603

604
        # Ask a question
605
        question = "Who lives in Paris?"
606

607
        data = {
608
            "retriever": {"query": question},
609
            "prompt_builder": {"question": question},
610
        }
611

612
        results = rag_pipeline.run(data)
613

614
        print(results["llm"]["replies"])
615
        # [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=[TextContent(text='Jean lives in Paris.')],
616
        # _name=None, _meta={'model': 'gpt-4o-mini-2024-07-18', 'index': 0, 'finish_reason': 'stop', 'usage':
617
        # {'completion_tokens': 6, 'prompt_tokens': 69, 'total_tokens': 75, 'completion_tokens_details':
618
        # CompletionTokensDetails(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0,
619
        # rejected_prediction_tokens=0), 'prompt_tokens_details': PromptTokensDetails(audio_tokens=0,
620
        # cached_tokens=0)}})]
621
        ```
622

623
        :param data:
624
            A dictionary of inputs for the pipeline's components. Each key is a component name
625
            and its value is a dictionary of that component's input parameters:
626
            ```
627
            data = {
628
                "comp1": {"input1": 1, "input2": 2},
629
            }
630
            ```
631
            For convenience, this format is also supported when input names are unique:
632
            ```
633
            data = {
634
                "input1": 1, "input2": 2,
635
            }
636
            ```
637
        :param include_outputs_from:
638
            Set of component names whose individual outputs are to be
639
            included in the pipeline's output. For components that are
640
            invoked multiple times (in a loop), only the last-produced
641
            output is included.
642
        :param concurrency_limit: The maximum number of components that should be allowed to run concurrently.
643

644
        :returns:
645
            A dictionary where each entry corresponds to a component name
646
            and its output. If `include_outputs_from` is `None`, this dictionary
647
            will only contain the outputs of leaf components, i.e., components
648
            without outgoing connections.
649

650
        :raises ValueError:
651
            If invalid inputs are provided to the pipeline.
652
        :raises PipelineRuntimeError:
653
            If the Pipeline contains cycles with unsupported connections that would cause
654
            it to get stuck and fail running.
655
            Or if a Component fails or returns output in an unsupported type.
656
        :raises PipelineMaxComponentRuns:
657
            If a Component reaches the maximum number of times it can be run in this Pipeline.
658
        """
659
        return asyncio.run(
×
660
            self.run_async(data=data, include_outputs_from=include_outputs_from, concurrency_limit=concurrency_limit)
661
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc