• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 14447237460

14 Apr 2025 01:44PM UTC coverage: 90.371% (-0.03%) from 90.396%
14447237460

Pull #9234

github

web-flow
Merge 923678230 into c67d1bf0e
Pull Request #9234: chore: removing a duplicated import

10624 of 11756 relevant lines covered (90.37%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.46
haystack/core/pipeline/async_pipeline.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import asyncio
1✔
6
from copy import deepcopy
1✔
7
from typing import Any, AsyncIterator, Dict, List, Optional, Set
1✔
8

9
from haystack import logging, tracing
1✔
10
from haystack.core.component import Component
1✔
11
from haystack.core.errors import PipelineMaxComponentRuns, PipelineRuntimeError
1✔
12
from haystack.core.pipeline.base import ComponentPriority, PipelineBase
1✔
13
from haystack.telemetry import pipeline_running
1✔
14

15
logger = logging.getLogger(__name__)
1✔
16

17

18
class AsyncPipeline(PipelineBase):
1✔
19
    """
20
    Asynchronous version of the Pipeline orchestration engine.
21

22
    Manages components in a pipeline allowing for concurrent processing when the pipeline's execution graph permits.
23
    This enables efficient processing of components by minimizing idle time and maximizing resource utilization.
24
    """
25

26
    async def run_async_generator(  # noqa: PLR0915,C901
1✔
27
        self, data: Dict[str, Any], include_outputs_from: Optional[Set[str]] = None, concurrency_limit: int = 4
28
    ) -> AsyncIterator[Dict[str, Any]]:
29
        """
30
        Executes the pipeline step by step asynchronously, yielding partial outputs when any component finishes.
31

32
        Usage:
33
        ```python
34
        from haystack import Document
35
        from haystack.components.builders import ChatPromptBuilder
36
        from haystack.dataclasses import ChatMessage
37
        from haystack.utils import Secret
38
        from haystack.document_stores.in_memory import InMemoryDocumentStore
39
        from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
40
        from haystack.components.generators.chat import OpenAIChatGenerator
41
        from haystack.components.builders.prompt_builder import PromptBuilder
42
        from haystack import AsyncPipeline
43
        import asyncio
44

45
        # Write documents to InMemoryDocumentStore
46
        document_store = InMemoryDocumentStore()
47
        document_store.write_documents([
48
            Document(content="My name is Jean and I live in Paris."),
49
            Document(content="My name is Mark and I live in Berlin."),
50
            Document(content="My name is Giorgio and I live in Rome.")
51
        ])
52

53
        prompt_template = [
54
            ChatMessage.from_user(
55
                '''
56
                Given these documents, answer the question.
57
                Documents:
58
                {% for doc in documents %}
59
                    {{ doc.content }}
60
                {% endfor %}
61
                Question: {{question}}
62
                Answer:
63
                ''')
64
        ]
65

66
        # Create and connect pipeline components
67
        retriever = InMemoryBM25Retriever(document_store=document_store)
68
        prompt_builder = ChatPromptBuilder(template=prompt_template)
69
        llm = OpenAIChatGenerator()
70

71
        rag_pipeline = AsyncPipeline()
72
        rag_pipeline.add_component("retriever", retriever)
73
        rag_pipeline.add_component("prompt_builder", prompt_builder)
74
        rag_pipeline.add_component("llm", llm)
75
        rag_pipeline.connect("retriever", "prompt_builder.documents")
76
        rag_pipeline.connect("prompt_builder", "llm")
77

78
        # Prepare input data
79
        question = "Who lives in Paris?"
80
        data = {
81
            "retriever": {"query": question},
82
            "prompt_builder": {"question": question},
83
        }
84

85

86
        # Process results as they become available
87
        async def process_results():
88
            async for partial_output in rag_pipeline.run_async_generator(
89
                    data=data,
90
                    include_outputs_from={"retriever", "llm"}
91
            ):
92
                # Each partial_output contains the results from a completed component
93
                if "retriever" in partial_output:
94
                    print("Retrieved documents:", len(partial_output["retriever"]["documents"]))
95
                if "llm" in partial_output:
96
                    print("Generated answer:", partial_output["llm"]["replies"][0])
97

98

99
        asyncio.run(process_results())
100
        ```
101

102
        :param data: Initial input data to the pipeline.
103
        :param concurrency_limit: The maximum number of components that are allowed to run concurrently.
104
        :param include_outputs_from:
105
            Set of component names whose individual outputs are to be
106
            included in the pipeline's output. For components that are
107
            invoked multiple times (in a loop), only the last-produced
108
            output is included.
109
        :return: An async iterator containing partial (and final) outputs.
110

111
        :raises ValueError:
112
            If invalid inputs are provided to the pipeline.
113
        :raises PipelineMaxComponentRuns:
114
            If a component exceeds the maximum number of allowed executions within the pipeline.
115
        :raises PipelineRuntimeError:
116
            If the Pipeline contains cycles with unsupported connections that would cause
117
            it to get stuck and fail running.
118
            Or if a Component fails or returns output in an unsupported type.
119
        """
120
        if include_outputs_from is None:
1✔
121
            include_outputs_from = set()
1✔
122

123
        # 0) Basic pipeline init
124
        pipeline_running(self)  # telemetry
1✔
125
        self.warm_up()  # optional warm-up (if needed)
1✔
126

127
        # 1) Prepare ephemeral state
128
        ready_sem = asyncio.Semaphore(max(1, concurrency_limit))
1✔
129
        inputs_state: Dict[str, Dict[str, List[Dict[str, Any]]]] = {}
1✔
130
        pipeline_outputs: Dict[str, Any] = {}
1✔
131
        running_tasks: Dict[asyncio.Task, str] = {}
1✔
132

133
        # A set of component names that have been scheduled but not finished:
134
        scheduled_components: Set[str] = set()
1✔
135

136
        # 2) Convert input data
137
        prepared_data = self._prepare_component_input_data(data)
1✔
138

139
        # raises ValueError if input is malformed in some way
140
        self._validate_input(prepared_data)
1✔
141
        inputs_state = self._convert_to_internal_format(prepared_data)
1✔
142

143
        # For quick lookup of downstream receivers
144
        ordered_names = sorted(self.graph.nodes.keys())
1✔
145
        cached_receivers = {n: self._find_receivers_from(n) for n in ordered_names}
1✔
146
        component_visits = dict.fromkeys(ordered_names, 0)
1✔
147
        cached_topological_sort = None
1✔
148

149
        # We fill the queue once and raise if all components are BLOCKED
150
        self.validate_pipeline(self._fill_queue(ordered_names, inputs_state, component_visits))
1✔
151

152
        # Single parent span for entire pipeline execution
153
        with tracing.tracer.trace(
1✔
154
            "haystack.async_pipeline.run",
155
            tags={
156
                "haystack.pipeline.input_data": prepared_data,
157
                "haystack.pipeline.output_data": pipeline_outputs,
158
                "haystack.pipeline.metadata": self.metadata,
159
                "haystack.pipeline.max_runs_per_component": self._max_runs_per_component,
160
            },
161
        ) as parent_span:
162
            # -------------------------------------------------
163
            # We define some functions here so that they have access to local runtime state
164
            # (inputs, tasks, scheduled components) via closures.
165
            # -------------------------------------------------
166
            async def _run_component_async(component_name: str, component_inputs: Dict[str, Any]) -> Dict[str, Any]:
1✔
167
                """
168
                Executes a single component asynchronously.
169

170
                If the component supports async execution, it is awaited directly as it will run async;
171
                otherwise the component is offloaded to executor.
172

173
                The method also updates the `visits` count of the component, writes outputs to `inputs_state`,
174
                and returns pruned outputs that get stored in `pipeline_outputs`.
175

176
                :param component_name: The name of the component.
177
                :param component_inputs: Inputs for the component.
178
                :returns: Outputs from the component that can be yielded from run_async_generator.
179
                """
180
                if component_visits[component_name] > self._max_runs_per_component:
1✔
181
                    raise PipelineMaxComponentRuns(f"Max runs for '{component_name}' reached.")
×
182

183
                instance: Component = self.get_component(component_name)
1✔
184
                with tracing.tracer.trace(
1✔
185
                    "haystack.component.run",
186
                    tags={
187
                        "haystack.component.name": component_name,
188
                        "haystack.component.type": instance.__class__.__name__,
189
                        "haystack.component.input_types": {k: type(v).__name__ for k, v in component_inputs.items()},
190
                        "haystack.component.input_spec": {
191
                            key: {
192
                                "type": (value.type.__name__ if isinstance(value.type, type) else str(value.type)),
193
                                "senders": value.senders,
194
                            }
195
                            for key, value in instance.__haystack_input__._sockets_dict.items()  # type: ignore
196
                        },
197
                        "haystack.component.output_spec": {
198
                            key: {
199
                                "type": (value.type.__name__ if isinstance(value.type, type) else str(value.type)),
200
                                "receivers": value.receivers,
201
                            }
202
                            for key, value in instance.__haystack_output__._sockets_dict.items()  # type: ignore
203
                        },
204
                    },
205
                    parent_span=parent_span,
206
                ) as span:
207
                    span.set_content_tag("haystack.component.input", deepcopy(component_inputs))
1✔
208
                    logger.info("Running component {component_name}", component_name=component_name)
1✔
209

210
                    if getattr(instance, "__haystack_supports_async__", False):
1✔
211
                        try:
1✔
212
                            outputs = await instance.run_async(**component_inputs)  # type: ignore
1✔
213
                        except Exception as error:
×
214
                            raise PipelineRuntimeError.from_exception(
×
215
                                component_name, instance.__class__, error
216
                            ) from error
217
                    else:
218
                        loop = asyncio.get_running_loop()
1✔
219
                        outputs = await loop.run_in_executor(None, lambda: instance.run(**component_inputs))
1✔
220

221
                    component_visits[component_name] += 1
1✔
222

223
                    if not isinstance(outputs, dict):
1✔
224
                        raise PipelineRuntimeError.from_invalid_output(component_name, instance.__class__, outputs)
×
225

226
                    span.set_tag("haystack.component.visits", component_visits[component_name])
1✔
227
                    span.set_content_tag("haystack.component.output", deepcopy(outputs))
1✔
228

229
                    # Distribute outputs to downstream inputs; also prune outputs based on `include_outputs_from`
230
                    pruned = self._write_component_outputs(
1✔
231
                        component_name=component_name,
232
                        component_outputs=outputs,
233
                        inputs=inputs_state,
234
                        receivers=cached_receivers[component_name],
235
                        include_outputs_from=include_outputs_from,
236
                    )
237
                    if pruned:
1✔
238
                        pipeline_outputs[component_name] = pruned
1✔
239

240
                    return pruned
1✔
241

242
            async def _run_highest_in_isolation(component_name: str) -> AsyncIterator[Dict[str, Any]]:
1✔
243
                """
244
                Runs a component with HIGHEST priority in isolation.
245

246
                We need to run components with HIGHEST priority (i.e. components with GreedyVariadic input socket)
247
                because otherwise, downstream components could produce additional inputs for the GreedyVariadic socket.
248

249
                :param component_name: The name of the component.
250
                :return: An async iterator of partial outputs.
251
                """
252
                # 1) Wait for all in-flight tasks to finish
253
                while running_tasks:
×
254
                    done, _pending = await asyncio.wait(running_tasks.keys(), return_when=asyncio.ALL_COMPLETED)
×
255
                    for finished in done:
×
256
                        finished_component_name = running_tasks.pop(finished)
×
257
                        partial_result = finished.result()
×
258
                        scheduled_components.discard(finished_component_name)
×
259
                        if partial_result:
×
260
                            yield_dict = {finished_component_name: deepcopy(partial_result)}
×
261
                            yield yield_dict  # partial outputs
×
262

263
                if component_name in scheduled_components:
×
264
                    # If it's already scheduled for some reason, skip
265
                    return
×
266

267
                # 2) Run the HIGHEST component by itself
268
                scheduled_components.add(component_name)
×
269
                comp_dict = self._get_component_with_graph_metadata_and_visits(
×
270
                    component_name, component_visits[component_name]
271
                )
272
                component_inputs = self._consume_component_inputs(component_name, comp_dict, inputs_state)
×
273
                component_inputs = self._add_missing_input_defaults(component_inputs, comp_dict["input_sockets"])
×
274
                result = await _run_component_async(component_name, component_inputs)
×
275
                scheduled_components.remove(component_name)
×
276
                if result:
×
277
                    yield {component_name: deepcopy(result)}
×
278

279
            async def _schedule_task(component_name: str) -> None:
1✔
280
                """
281
                Schedule a component to run.
282

283
                We do NOT wait for it to finish here. This allows us to run other components concurrently.
284

285
                :param component_name: The name of the component.
286
                """
287

288
                if component_name in scheduled_components:
1✔
289
                    return  # already scheduled, do nothing
×
290

291
                scheduled_components.add(component_name)
1✔
292

293
                comp_dict = self._get_component_with_graph_metadata_and_visits(
1✔
294
                    component_name, component_visits[component_name]
295
                )
296
                component_inputs = self._consume_component_inputs(component_name, comp_dict, inputs_state)
1✔
297
                component_inputs = self._add_missing_input_defaults(component_inputs, comp_dict["input_sockets"])
1✔
298

299
                async def _runner():
1✔
300
                    async with ready_sem:
1✔
301
                        result = await _run_component_async(component_name, component_inputs)
1✔
302

303
                    scheduled_components.remove(component_name)
1✔
304
                    return result
1✔
305

306
                task = asyncio.create_task(_runner())
1✔
307
                running_tasks[task] = component_name
1✔
308

309
            async def _wait_for_one_task_to_complete() -> AsyncIterator[Dict[str, Any]]:
1✔
310
                """
311
                Wait for exactly one running task to finish, yield partial outputs.
312

313
                If no tasks are running, does nothing.
314
                """
315
                if running_tasks:
1✔
316
                    done, _ = await asyncio.wait(running_tasks.keys(), return_when=asyncio.FIRST_COMPLETED)
1✔
317
                    for finished in done:
1✔
318
                        finished_component_name = running_tasks.pop(finished)
1✔
319
                        partial_result = finished.result()
1✔
320
                        scheduled_components.discard(finished_component_name)
1✔
321
                        if partial_result:
1✔
322
                            yield {finished_component_name: deepcopy(partial_result)}
1✔
323

324
            async def _wait_for_all_tasks_to_complete() -> AsyncIterator[Dict[str, Any]]:
1✔
325
                """
326
                Wait for all running tasks to finish, yield partial outputs.
327
                """
328
                if running_tasks:
1✔
329
                    done, _ = await asyncio.wait(running_tasks.keys(), return_when=asyncio.ALL_COMPLETED)
×
330
                    for finished in done:
×
331
                        finished_component_name = running_tasks.pop(finished)
×
332
                        partial_result = finished.result()
×
333
                        scheduled_components.discard(finished_component_name)
×
334
                        if partial_result:
×
335
                            yield {finished_component_name: deepcopy(partial_result)}
×
336

337
            # -------------------------------------------------
338
            # MAIN SCHEDULING LOOP
339
            # -------------------------------------------------
340
            while True:
341
                # 2) Build the priority queue of candidates
342
                priority_queue = self._fill_queue(ordered_names, inputs_state, component_visits)
1✔
343
                candidate = self._get_next_runnable_component(priority_queue, component_visits)
1✔
344
                if candidate is None and running_tasks:
1✔
345
                    # We need to wait for one task to finish to make progress and potentially unblock the priority_queue
346
                    async for partial_res in _wait_for_one_task_to_complete():
×
347
                        yield partial_res
×
348
                    continue
×
349

350
                if candidate is None and not running_tasks:
1✔
351
                    # done
352
                    break
1✔
353

354
                priority, comp_name, _ = candidate  # type: ignore
1✔
355

356
                if comp_name in scheduled_components:
1✔
357
                    # We need to wait for one task to finish to make progress
358
                    async for partial_res in _wait_for_one_task_to_complete():
×
359
                        yield partial_res
×
360
                    continue
×
361

362
                if priority == ComponentPriority.HIGHEST:
1✔
363
                    # 1) run alone
364
                    async for partial_res in _run_highest_in_isolation(comp_name):
×
365
                        yield partial_res
×
366
                    # then continue the loop
367
                    continue
×
368

369
                if priority == ComponentPriority.READY:
1✔
370
                    # 1) schedule this one
371
                    await _schedule_task(comp_name)
1✔
372

373
                    # 2) Possibly schedule more READY tasks if concurrency not fully used
374
                    while len(priority_queue) > 0 and not ready_sem.locked():
1✔
375
                        peek_prio, peek_name = priority_queue.peek()
1✔
376
                        if peek_prio in (ComponentPriority.BLOCKED, ComponentPriority.HIGHEST):
1✔
377
                            # can't run or must run alone => skip
378
                            break
1✔
379
                        if peek_prio == ComponentPriority.READY:
1✔
380
                            priority_queue.pop()
1✔
381
                            await _schedule_task(peek_name)
1✔
382
                            # keep adding while concurrency is not locked
383
                            continue
1✔
384

385
                        # The next is DEFER/DEFER_LAST => we only schedule it if it "becomes READY"
386
                        # We'll handle it in the next iteration or with incremental waiting
387
                        break
×
388

389
                # We only schedule components with priority DEFER or DEFER_LAST when no other tasks are running
390
                elif priority in (ComponentPriority.DEFER, ComponentPriority.DEFER_LAST) and not running_tasks:
×
391
                    if len(priority_queue) > 0:
×
392
                        comp_name, topological_sort = self._tiebreak_waiting_components(
×
393
                            component_name=comp_name,
394
                            priority=priority,
395
                            priority_queue=priority_queue,
396
                            topological_sort=cached_topological_sort,
397
                        )
398
                        cached_topological_sort = topological_sort
×
399

400
                    await _schedule_task(comp_name)
×
401

402
                # To make progress, we wait for one task to complete before re-starting the loop
403
                async for partial_res in _wait_for_one_task_to_complete():
1✔
404
                    yield partial_res
1✔
405

406
            # End main loop
407

408
            # 3) Drain leftover tasks
409
            async for partial_res in _wait_for_all_tasks_to_complete():
1✔
410
                yield partial_res
1✔
411

412
            # 4) Yield final pipeline outputs
413
            yield deepcopy(pipeline_outputs)
1✔
414

415
    async def run_async(
1✔
416
        self, data: Dict[str, Any], include_outputs_from: Optional[Set[str]] = None, concurrency_limit: int = 4
417
    ) -> Dict[str, Any]:
418
        """
419
        Provides an asynchronous interface to run the pipeline with provided input data.
420

421
        This method allows the pipeline to be integrated into an asynchronous workflow, enabling non-blocking
422
        execution of pipeline components.
423

424
        Usage:
425
        ```python
426
        import asyncio
427

428
        from haystack import Document
429
        from haystack.components.builders import ChatPromptBuilder
430
        from haystack.components.generators.chat import OpenAIChatGenerator
431
        from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
432
        from haystack.core.pipeline import AsyncPipeline
433
        from haystack.dataclasses import ChatMessage
434
        from haystack.document_stores.in_memory import InMemoryDocumentStore
435

436
        # Write documents to InMemoryDocumentStore
437
        document_store = InMemoryDocumentStore()
438
        document_store.write_documents([
439
            Document(content="My name is Jean and I live in Paris."),
440
            Document(content="My name is Mark and I live in Berlin."),
441
            Document(content="My name is Giorgio and I live in Rome.")
442
        ])
443

444
        prompt_template = [
445
            ChatMessage.from_user(
446
                '''
447
                Given these documents, answer the question.
448
                Documents:
449
                {% for doc in documents %}
450
                    {{ doc.content }}
451
                {% endfor %}
452
                Question: {{question}}
453
                Answer:
454
                ''')
455
        ]
456

457
        retriever = InMemoryBM25Retriever(document_store=document_store)
458
        prompt_builder = ChatPromptBuilder(template=prompt_template)
459
        llm = OpenAIChatGenerator()
460

461
        rag_pipeline = AsyncPipeline()
462
        rag_pipeline.add_component("retriever", retriever)
463
        rag_pipeline.add_component("prompt_builder", prompt_builder)
464
        rag_pipeline.add_component("llm", llm)
465
        rag_pipeline.connect("retriever", "prompt_builder.documents")
466
        rag_pipeline.connect("prompt_builder", "llm")
467

468
        # Ask a question
469
        question = "Who lives in Paris?"
470

471
        async def run_inner(data, include_outputs_from):
472
            return await rag_pipeline.run_async(data=data, include_outputs_from=include_outputs_from)
473

474
        data = {
475
            "retriever": {"query": question},
476
            "prompt_builder": {"question": question},
477
        }
478

479
        results = asyncio.run(run_inner(data, include_outputs_from={"retriever", "llm"}))
480

481
        print(results["llm"]["replies"])
482
        # [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=[TextContent(text='Jean lives in Paris.')],
483
        # _name=None, _meta={'model': 'gpt-4o-mini-2024-07-18', 'index': 0, 'finish_reason': 'stop', 'usage':
484
        # {'completion_tokens': 6, 'prompt_tokens': 69, 'total_tokens': 75,
485
        # 'completion_tokens_details': CompletionTokensDetails(accepted_prediction_tokens=0,
486
        # audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0), 'prompt_tokens_details':
487
        # PromptTokensDetails(audio_tokens=0, cached_tokens=0)}})]
488
        ```
489

490
        :param data:
491
            A dictionary of inputs for the pipeline's components. Each key is a component name
492
            and its value is a dictionary of that component's input parameters:
493
            ```
494
            data = {
495
                "comp1": {"input1": 1, "input2": 2},
496
            }
497
            ```
498
            For convenience, this format is also supported when input names are unique:
499
            ```
500
            data = {
501
                "input1": 1, "input2": 2,
502
            }
503
            ```
504
        :param include_outputs_from:
505
            Set of component names whose individual outputs are to be
506
            included in the pipeline's output. For components that are
507
            invoked multiple times (in a loop), only the last-produced
508
            output is included.
509
        :param concurrency_limit: The maximum number of components that should be allowed to run concurrently.
510
        :returns:
511
            A dictionary where each entry corresponds to a component name
512
            and its output. If `include_outputs_from` is `None`, this dictionary
513
            will only contain the outputs of leaf components, i.e., components
514
            without outgoing connections.
515

516
        :raises ValueError:
517
            If invalid inputs are provided to the pipeline.
518
        :raises PipelineRuntimeError:
519
            If the Pipeline contains cycles with unsupported connections that would cause
520
            it to get stuck and fail running.
521
            Or if a Component fails or returns output in an unsupported type.
522
        :raises PipelineMaxComponentRuns:
523
            If a Component reaches the maximum number of times it can be run in this Pipeline.
524
        """
525
        final: Dict[str, Any] = {}
1✔
526
        async for partial in self.run_async_generator(
1✔
527
            data=data, concurrency_limit=concurrency_limit, include_outputs_from=include_outputs_from
528
        ):
529
            final = partial
1✔
530
        return final or {}
1✔
531

532
    def run(
1✔
533
        self, data: Dict[str, Any], include_outputs_from: Optional[Set[str]] = None, concurrency_limit: int = 4
534
    ) -> Dict[str, Any]:
535
        """
536
        Provides a synchronous interface to run the pipeline with given input data.
537

538
        Internally, the pipeline components are executed asynchronously, but the method itself
539
        will block until the entire pipeline execution is complete.
540

541
        In case you need asynchronous methods, consider using `run_async` or `run_async_generator`.
542

543
        Usage:
544
        ```python
545
        from haystack import Document
546
        from haystack.components.builders import ChatPromptBuilder
547
        from haystack.components.generators.chat import OpenAIChatGenerator
548
        from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
549
        from haystack.core.pipeline import AsyncPipeline
550
        from haystack.dataclasses import ChatMessage
551
        from haystack.document_stores.in_memory import InMemoryDocumentStore
552

553
        # Write documents to InMemoryDocumentStore
554
        document_store = InMemoryDocumentStore()
555
        document_store.write_documents([
556
            Document(content="My name is Jean and I live in Paris."),
557
            Document(content="My name is Mark and I live in Berlin."),
558
            Document(content="My name is Giorgio and I live in Rome.")
559
        ])
560

561
        prompt_template = [
562
            ChatMessage.from_user(
563
                '''
564
                Given these documents, answer the question.
565
                Documents:
566
                {% for doc in documents %}
567
                    {{ doc.content }}
568
                {% endfor %}
569
                Question: {{question}}
570
                Answer:
571
                ''')
572
        ]
573

574

575
        retriever = InMemoryBM25Retriever(document_store=document_store)
576
        prompt_builder = ChatPromptBuilder(template=prompt_template)
577
        llm = OpenAIChatGenerator()
578

579
        rag_pipeline = AsyncPipeline()
580
        rag_pipeline.add_component("retriever", retriever)
581
        rag_pipeline.add_component("prompt_builder", prompt_builder)
582
        rag_pipeline.add_component("llm", llm)
583
        rag_pipeline.connect("retriever", "prompt_builder.documents")
584
        rag_pipeline.connect("prompt_builder", "llm")
585

586
        # Ask a question
587
        question = "Who lives in Paris?"
588

589
        data = {
590
            "retriever": {"query": question},
591
            "prompt_builder": {"question": question},
592
        }
593

594
        results = rag_pipeline.run(data)
595

596
        print(results["llm"]["replies"])
597
        # [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=[TextContent(text='Jean lives in Paris.')],
598
        # _name=None, _meta={'model': 'gpt-4o-mini-2024-07-18', 'index': 0, 'finish_reason': 'stop', 'usage':
599
        # {'completion_tokens': 6, 'prompt_tokens': 69, 'total_tokens': 75, 'completion_tokens_details':
600
        # CompletionTokensDetails(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0,
601
        # rejected_prediction_tokens=0), 'prompt_tokens_details': PromptTokensDetails(audio_tokens=0,
602
        # cached_tokens=0)}})]
603
        ```
604

605
        :param data:
606
            A dictionary of inputs for the pipeline's components. Each key is a component name
607
            and its value is a dictionary of that component's input parameters:
608
            ```
609
            data = {
610
                "comp1": {"input1": 1, "input2": 2},
611
            }
612
            ```
613
            For convenience, this format is also supported when input names are unique:
614
            ```
615
            data = {
616
                "input1": 1, "input2": 2,
617
            }
618
            ```
619
        :param include_outputs_from:
620
            Set of component names whose individual outputs are to be
621
            included in the pipeline's output. For components that are
622
            invoked multiple times (in a loop), only the last-produced
623
            output is included.
624
        :param concurrency_limit: The maximum number of components that should be allowed to run concurrently.
625

626
        :returns:
627
            A dictionary where each entry corresponds to a component name
628
            and its output. If `include_outputs_from` is `None`, this dictionary
629
            will only contain the outputs of leaf components, i.e., components
630
            without outgoing connections.
631

632
        :raises ValueError:
633
            If invalid inputs are provided to the pipeline.
634
        :raises PipelineRuntimeError:
635
            If the Pipeline contains cycles with unsupported connections that would cause
636
            it to get stuck and fail running.
637
            Or if a Component fails or returns output in an unsupported type.
638
        :raises PipelineMaxComponentRuns:
639
            If a Component reaches the maximum number of times it can be run in this Pipeline.
640
        """
641
        return asyncio.run(
×
642
            self.run_async(data=data, include_outputs_from=include_outputs_from, concurrency_limit=concurrency_limit)
643
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc