• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 13246115546

10 Feb 2025 04:53PM UTC coverage: 91.84% (-0.4%) from 92.249%
13246115546

Pull #8815

github

web-flow
Merge dc1295781 into b6ebd3cd7
Pull Request #8815: feat: Add new component CSVDocumentSplitter to recursively split CSV documents

9285 of 10110 relevant lines covered (91.84%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.08
haystack/core/pipeline/async_pipeline.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import asyncio
1✔
6
from copy import deepcopy
1✔
7
from typing import Any, AsyncIterator, Dict, List, Optional, Set
1✔
8

9
from haystack import logging, tracing
1✔
10
from haystack.core.component import Component
1✔
11
from haystack.core.errors import PipelineMaxComponentRuns, PipelineRuntimeError
1✔
12
from haystack.core.pipeline.base import ComponentPriority, PipelineBase
1✔
13
from haystack.telemetry import pipeline_running
1✔
14

15
logger = logging.getLogger(__name__)
1✔
16

17

18
class AsyncPipeline(PipelineBase):
1✔
19
    """
20
    Asynchronous version of the orchestration engine.
21

22
    Orchestrates component execution and runs components concurrently if the execution graph allows it.
23
    """
24

25
    async def run_async_generator(  # noqa: PLR0915,C901
1✔
26
        self, data: Dict[str, Any], include_outputs_from: Optional[Set[str]] = None, concurrency_limit: int = 4
27
    ) -> AsyncIterator[Dict[str, Any]]:
28
        """
29
        Execute this pipeline asynchronously, yielding partial outputs when any component finishes.
30

31
        :param data: Initial input data to the pipeline.
32
        :param concurrency_limit: The maximum number of components that are allowed to run concurrently.
33
        :param include_outputs_from:
34
            Set of component names whose individual outputs are to be
35
            included in the pipeline's output. For components that are
36
            invoked multiple times (in a loop), only the last-produced
37
            output is included.
38
        :return: An async iterator of partial (and final) outputs.
39
        """
40
        if include_outputs_from is None:
1✔
41
            include_outputs_from = set()
1✔
42

43
        # 0) Basic pipeline init
44
        pipeline_running(self)  # telemetry
1✔
45
        self.warm_up()  # optional warm-up (if needed)
1✔
46

47
        # 1) Prepare ephemeral state
48
        ready_sem = asyncio.Semaphore(max(1, concurrency_limit))
1✔
49
        inputs_state: Dict[str, Dict[str, List[Dict[str, Any]]]] = {}
1✔
50
        pipeline_outputs: Dict[str, Any] = {}
1✔
51
        running_tasks: Dict[asyncio.Task, str] = {}
1✔
52

53
        # A set of component names that have been scheduled but not finished:
54
        scheduled_components: Set[str] = set()
1✔
55

56
        # 2) Convert input data
57
        prepared_data = self._prepare_component_input_data(data)
1✔
58
        self._validate_input(prepared_data)
1✔
59
        inputs_state = self._convert_to_internal_format(prepared_data)
1✔
60

61
        # For quick lookup of downstream receivers
62
        ordered_names = sorted(self.graph.nodes.keys())
1✔
63
        cached_receivers = {n: self._find_receivers_from(n) for n in ordered_names}
1✔
64
        component_visits = {component_name: 0 for component_name in ordered_names}
1✔
65

66
        # We fill the queue once and raise if all components are BLOCKED
67
        self.validate_pipeline(self._fill_queue(ordered_names, inputs_state, component_visits))
1✔
68

69
        # Single parent span for entire pipeline execution
70
        with tracing.tracer.trace(
1✔
71
            "haystack.async_pipeline.run",
72
            tags={
73
                "haystack.pipeline.input_data": data,
74
                "haystack.pipeline.output_data": pipeline_outputs,
75
                "haystack.pipeline.metadata": self.metadata,
76
                "haystack.pipeline.max_runs_per_component": self._max_runs_per_component,
77
            },
78
        ) as parent_span:
79
            # -------------------------------------------------
80
            # We define some functions here so that they have access to local runtime state
81
            # (inputs, tasks, scheduled components) via closures.
82
            # -------------------------------------------------
83
            async def _run_component_async(component_name: str, component_inputs: Dict[str, Any]) -> Dict[str, Any]:
1✔
84
                """
85
                Runs one component.
86

87
                If the component supports async, await directly it will run async; otherwise offload to executor.
88
                Updates visits count, writes outputs to `inputs_state`,
89
                and returns pruned outputs that get stored in `pipeline_outputs`.
90

91
                :param component_name: The name of the component.
92
                :param component_inputs: Inputs for the component.
93
                :returns: Outputs from the component that can be yielded from run_async_generator.
94
                """
95
                if component_visits[component_name] > self._max_runs_per_component:
1✔
96
                    raise PipelineMaxComponentRuns(f"Max runs for '{component_name}' reached.")
×
97

98
                instance: Component = self.get_component(component_name)
1✔
99
                with tracing.tracer.trace(
1✔
100
                    "haystack.component.run",
101
                    tags={
102
                        "haystack.component.name": component_name,
103
                        "haystack.component.type": instance.__class__.__name__,
104
                        "haystack.component.input_types": {k: type(v).__name__ for k, v in component_inputs.items()},
105
                        "haystack.component.input_spec": {
106
                            key: {
107
                                "type": (value.type.__name__ if isinstance(value.type, type) else str(value.type)),
108
                                "senders": value.senders,
109
                            }
110
                            for key, value in instance.__haystack_input__._sockets_dict.items()  # type: ignore
111
                        },
112
                        "haystack.component.output_spec": {
113
                            key: {
114
                                "type": (value.type.__name__ if isinstance(value.type, type) else str(value.type)),
115
                                "receivers": value.receivers,
116
                            }
117
                            for key, value in instance.__haystack_output__._sockets_dict.items()  # type: ignore
118
                        },
119
                    },
120
                    parent_span=parent_span,
121
                ) as span:
122
                    span.set_content_tag("haystack.component.input", deepcopy(component_inputs))
1✔
123
                    logger.info("Running component {name}", name=component_name)
1✔
124

125
                    if getattr(instance, "__haystack_supports_async__", False):
1✔
126
                        outputs = await instance.run_async(**component_inputs)  # type: ignore
1✔
127
                    else:
128
                        loop = asyncio.get_running_loop()
×
129
                        outputs = await loop.run_in_executor(None, lambda: instance.run(**component_inputs))
×
130

131
                component_visits[component_name] += 1
1✔
132

133
                if not isinstance(outputs, dict):
1✔
134
                    raise PipelineRuntimeError(
×
135
                        f"Component '{component_name}' returned an invalid output type. "
136
                        f"Expected a dict, but got {type(outputs).__name__} instead. "
137
                    )
138

139
                span.set_tag("haystack.component.visits", component_visits[component_name])
1✔
140
                span.set_content_tag("haystack.component.outputs", deepcopy(outputs))
1✔
141

142
                # Distribute outputs to downstream inputs; also prune outputs based on `include_outputs_from`
143
                pruned = self._write_component_outputs(
1✔
144
                    component_name=component_name,
145
                    component_outputs=outputs,
146
                    inputs=inputs_state,
147
                    receivers=cached_receivers[component_name],
148
                    include_outputs_from=include_outputs_from,
149
                )
150
                if pruned:
1✔
151
                    pipeline_outputs[component_name] = pruned
1✔
152

153
                return pruned
1✔
154

155
            async def _run_highest_in_isolation(component_name: str) -> AsyncIterator[Dict[str, Any]]:
1✔
156
                """
157
                Runs a component with HIGHEST priority in isolation.
158

159
                We need to run components with HIGHEST priority (i.e. components with GreedyVariadic input socket)
160
                because otherwise, downstream components could produce additional inputs for the GreedyVariadic socket.
161

162
                :param component_name: The name of the component.
163
                :return: An async iterator of partial outputs.
164
                """
165
                # 1) Wait for all in-flight tasks to finish
166
                while running_tasks:
×
167
                    done, _pending = await asyncio.wait(running_tasks.keys(), return_when=asyncio.ALL_COMPLETED)
×
168
                    for finished in done:
×
169
                        finished_component_name = running_tasks.pop(finished)
×
170
                        partial_result = finished.result()
×
171
                        scheduled_components.discard(finished_component_name)
×
172
                        if partial_result:
×
173
                            yield_dict = {finished_component_name: deepcopy(partial_result)}
×
174
                            yield yield_dict  # partial outputs
×
175

176
                if component_name in scheduled_components:
×
177
                    # If it's already scheduled for some reason, skip
178
                    return
×
179

180
                # 2) Run the HIGHEST component by itself
181
                scheduled_components.add(component_name)
×
182
                comp_dict = self._get_component_with_graph_metadata_and_visits(
×
183
                    component_name, component_visits[component_name]
184
                )
185
                component_inputs = self._consume_component_inputs(component_name, comp_dict, inputs_state)
×
186
                component_inputs = self._add_missing_input_defaults(component_inputs, comp_dict["input_sockets"])
×
187
                result = await _run_component_async(component_name, component_inputs)
×
188
                scheduled_components.remove(component_name)
×
189
                if result:
×
190
                    yield {component_name: deepcopy(result)}
×
191

192
            async def _schedule_task(component_name: str) -> None:
1✔
193
                """
194
                Schedule a component to run.
195

196
                We do NOT wait for it to finish here. This allows us to run other components concurrently.
197

198
                :param component_name: The name of the component.
199
                """
200

201
                if component_name in scheduled_components:
1✔
202
                    return  # already scheduled, do nothing
×
203

204
                scheduled_components.add(component_name)
1✔
205

206
                comp_dict = self._get_component_with_graph_metadata_and_visits(
1✔
207
                    component_name, component_visits[component_name]
208
                )
209
                component_inputs = self._consume_component_inputs(component_name, comp_dict, inputs_state)
1✔
210
                component_inputs = self._add_missing_input_defaults(component_inputs, comp_dict["input_sockets"])
1✔
211

212
                async def _runner():
1✔
213
                    async with ready_sem:
1✔
214
                        result = await _run_component_async(component_name, component_inputs)
1✔
215

216
                    scheduled_components.remove(component_name)
1✔
217
                    return result
1✔
218

219
                task = asyncio.create_task(_runner())
1✔
220
                running_tasks[task] = component_name
1✔
221

222
            async def _wait_for_one_task_to_complete() -> AsyncIterator[Dict[str, Any]]:
1✔
223
                """
224
                Wait for exactly one running task to finish, yield partial outputs.
225

226
                If no tasks are running, does nothing.
227
                """
228
                if running_tasks:
1✔
229
                    done, _ = await asyncio.wait(running_tasks.keys(), return_when=asyncio.FIRST_COMPLETED)
1✔
230
                    for finished in done:
1✔
231
                        finished_component_name = running_tasks.pop(finished)
1✔
232
                        partial_result = finished.result()
1✔
233
                        scheduled_components.discard(finished_component_name)
1✔
234
                        if partial_result:
1✔
235
                            yield {finished_component_name: deepcopy(partial_result)}
1✔
236

237
            async def _wait_for_all_tasks_to_complete() -> AsyncIterator[Dict[str, Any]]:
1✔
238
                """
239
                Wait for all running tasks to finish, yield partial outputs.
240
                """
241
                if running_tasks:
1✔
242
                    done, _ = await asyncio.wait(running_tasks.keys(), return_when=asyncio.ALL_COMPLETED)
×
243
                    for finished in done:
×
244
                        finished_component_name = running_tasks.pop(finished)
×
245
                        partial_result = finished.result()
×
246
                        scheduled_components.discard(finished_component_name)
×
247
                        if partial_result:
×
248
                            yield {finished_component_name: deepcopy(partial_result)}
×
249

250
            # -------------------------------------------------
251
            # MAIN SCHEDULING LOOP
252
            # -------------------------------------------------
253
            while True:
254
                # 2) Build the priority queue of candidates
255
                priority_queue = self._fill_queue(ordered_names, inputs_state, component_visits)
1✔
256
                candidate = self._get_next_runnable_component(priority_queue, component_visits)
1✔
257
                if candidate is None and running_tasks:
1✔
258
                    # We need to wait for one task to finish to make progress and potentially unblock the priority_queue
259
                    async for partial_result in _wait_for_one_task_to_complete():
×
260
                        yield partial_result
×
261
                    continue
×
262

263
                if candidate is None and not running_tasks:
1✔
264
                    # done
265
                    break
1✔
266

267
                priority, component_name, _ = candidate  # type: ignore
1✔
268

269
                if component_name in scheduled_components:
1✔
270
                    # We need to wait for one task to finish to make progress
271
                    async for partial_result in _wait_for_one_task_to_complete():
×
272
                        yield partial_result
×
273
                    continue
×
274

275
                if priority == ComponentPriority.HIGHEST:
1✔
276
                    # 1) run alone
277
                    async for partial_result in _run_highest_in_isolation(component_name):
×
278
                        yield partial_result
×
279
                    # then continue the loop
280
                    continue
×
281

282
                if priority == ComponentPriority.READY:
1✔
283
                    # 1) schedule this one
284
                    await _schedule_task(component_name)
1✔
285

286
                    # 2) Possibly schedule more READY tasks if concurrency not fully used
287
                    while len(priority_queue) > 0 and not ready_sem.locked():
1✔
288
                        peek_prio, peek_name = priority_queue.peek()
×
289
                        if peek_prio in (ComponentPriority.BLOCKED, ComponentPriority.HIGHEST):
×
290
                            # can't run or must run alone => skip
291
                            break
×
292
                        if peek_prio == ComponentPriority.READY:
×
293
                            priority_queue.pop()
×
294
                            await _schedule_task(peek_name)
×
295
                            # keep adding while concurrency is not locked
296
                            continue
×
297

298
                        # The next is DEFER/DEFER_LAST => we only schedule it if it "becomes READY"
299
                        # We'll handle it in the next iteration or with incremental waiting
300
                        break
×
301

302
                # We only schedule components with priority DEFER or DEFER_LAST when no other tasks are running
303
                elif priority in (ComponentPriority.DEFER, ComponentPriority.DEFER_LAST) and not running_tasks:
×
304
                    await _schedule_task(component_name)
×
305

306
                # To make progress, we wait for one task to complete before re-starting the loop
307
                async for partial_result in _wait_for_one_task_to_complete():
1✔
308
                    yield partial_result
1✔
309

310
            # End main loop
311

312
            # 3) Drain leftover tasks
313
            async for partial_result in _wait_for_all_tasks_to_complete():
1✔
314
                yield partial_result
1✔
315

316
            # 4) Yield final pipeline outputs
317
            yield deepcopy(pipeline_outputs)
1✔
318

319
    async def run_async(
1✔
320
        self, data: Dict[str, Any], include_outputs_from: Optional[Set[str]] = None, concurrency_limit: int = 4
321
    ) -> Dict[str, Any]:
322
        """
323
        Runs the Pipeline with given input data.
324

325
        Usage:
326
        ```python
327
        from haystack import Document
328
        from haystack.utils import Secret
329
        from haystack.document_stores.in_memory import InMemoryDocumentStore
330
        from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
331
        from haystack.components.generators import OpenAIGenerator
332
        from haystack.components.builders.answer_builder import AnswerBuilder
333
        from haystack.components.builders.prompt_builder import PromptBuilder
334

335
        from haystack_experimental import AsyncPipeline
336

337
        import asyncio
338

339
        # Write documents to InMemoryDocumentStore
340
        document_store = InMemoryDocumentStore()
341
        document_store.write_documents([
342
            Document(content="My name is Jean and I live in Paris."),
343
            Document(content="My name is Mark and I live in Berlin."),
344
            Document(content="My name is Giorgio and I live in Rome.")
345
        ])
346

347
        prompt_template = \"\"\"
348
        Given these documents, answer the question.
349
        Documents:
350
        {% for doc in documents %}
351
            {{ doc.content }}
352
        {% endfor %}
353
        Question: {{question}}
354
        Answer:
355
        \"\"\"
356

357
        retriever = InMemoryBM25Retriever(document_store=document_store)
358
        prompt_builder = PromptBuilder(template=prompt_template)
359
        llm = OpenAIGenerator(api_key=Secret.from_token(api_key))
360

361
        rag_pipeline = AsyncPipeline()
362
        rag_pipeline.add_component("retriever", retriever)
363
        rag_pipeline.add_component("prompt_builder", prompt_builder)
364
        rag_pipeline.add_component("llm", llm)
365
        rag_pipeline.connect("retriever", "prompt_builder.documents")
366
        rag_pipeline.connect("prompt_builder", "llm")
367

368
        # Ask a question
369
        question = "Who lives in Paris?"
370

371

372
        async def run_inner(data, include_outputs_from):
373
            return await rag_pipeline.run_async(data=data, include_outputs_from=include_outputs_from)
374

375
        data = {
376
            "retriever": {"query": question},
377
            "prompt_builder": {"question": question},
378
        }
379
        async_loop = asyncio.new_event_loop()
380
        asyncio.set_event_loop(async_loop)
381
        results = async_loop.run_until_complete(run_inner(data))
382
        async_loop.close()
383

384
        print(results["llm"]["replies"])
385
        # Jean lives in Paris
386
        ```
387

388
        :param data:
389
            A dictionary of inputs for the pipeline's components. Each key is a component name
390
            and its value is a dictionary of that component's input parameters:
391
            ```
392
            data = {
393
                "comp1": {"input1": 1, "input2": 2},
394
            }
395
            ```
396
            For convenience, this format is also supported when input names are unique:
397
            ```
398
            data = {
399
                "input1": 1, "input2": 2,
400
            }
401
            ```
402
        :param include_outputs_from:
403
            Set of component names whose individual outputs are to be
404
            included in the pipeline's output. For components that are
405
            invoked multiple times (in a loop), only the last-produced
406
            output is included.
407
        :param concurrency_limit: The maximum number of components that should be allowed to run concurrently.
408
        :returns:
409
            A dictionary where each entry corresponds to a component name
410
            and its output. If `include_outputs_from` is `None`, this dictionary
411
            will only contain the outputs of leaf components, i.e., components
412
            without outgoing connections.
413

414
        :raises ValueError:
415
            If invalid inputs are provided to the pipeline.
416
        :raises PipelineRuntimeError:
417
            If the Pipeline contains cycles with unsupported connections that would cause
418
            it to get stuck and fail running.
419
            Or if a Component fails or returns output in an unsupported type.
420
        :raises PipelineMaxComponentRuns:
421
            If a Component reaches the maximum number of times it can be run in this Pipeline.
422
        """
423
        final: Dict[str, Any] = {}
1✔
424
        async for partial in self.run_async_generator(
1✔
425
            data=data, concurrency_limit=concurrency_limit, include_outputs_from=include_outputs_from
426
        ):
427
            final = partial
1✔
428
        return final or {}
1✔
429

430
    def run(
1✔
431
        self, data: Dict[str, Any], include_outputs_from: Optional[Set[str]] = None, concurrency_limit: int = 4
432
    ) -> Dict[str, Any]:
433
        """
434
        Runs the pipeline with given input data.
435

436
        This method is synchronous, but it runs components asynchronously internally.
437
        Check out `run_async` or `run_async_generator` if you are looking for async-methods.
438

439
        Usage:
440
        ```python
441
        from haystack import Document
442
        from haystack.utils import Secret
443
        from haystack.document_stores.in_memory import InMemoryDocumentStore
444
        from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
445
        from haystack.components.generators import OpenAIGenerator
446
        from haystack.components.builders.answer_builder import AnswerBuilder
447
        from haystack.components.builders.prompt_builder import PromptBuilder
448

449
        from haystack_experimental import AsyncPipeline
450

451
        # Write documents to InMemoryDocumentStore
452
        document_store = InMemoryDocumentStore()
453
        document_store.write_documents([
454
            Document(content="My name is Jean and I live in Paris."),
455
            Document(content="My name is Mark and I live in Berlin."),
456
            Document(content="My name is Giorgio and I live in Rome.")
457
        ])
458

459
        prompt_template = \"\"\"
460
        Given these documents, answer the question.
461
        Documents:
462
        {% for doc in documents %}
463
            {{ doc.content }}
464
        {% endfor %}
465
        Question: {{question}}
466
        Answer:
467
        \"\"\"
468

469
        retriever = InMemoryBM25Retriever(document_store=document_store)
470
        prompt_builder = PromptBuilder(template=prompt_template)
471
        llm = OpenAIGenerator(api_key=Secret.from_token(api_key))
472

473
        rag_pipeline = AsyncPipeline()
474
        rag_pipeline.add_component("retriever", retriever)
475
        rag_pipeline.add_component("prompt_builder", prompt_builder)
476
        rag_pipeline.add_component("llm", llm)
477
        rag_pipeline.connect("retriever", "prompt_builder.documents")
478
        rag_pipeline.connect("prompt_builder", "llm")
479

480
        # Ask a question
481
        question = "Who lives in Paris?"
482

483

484
        async def run_inner(data, include_outputs_from):
485
            return await rag_pipeline.run_async(data=data, include_outputs_from=include_outputs_from)
486

487
        data = {
488
            "retriever": {"query": question},
489
            "prompt_builder": {"question": question},
490
        }
491

492
        results = rag_pipeline.run(data)
493

494
        print(results["llm"]["replies"])
495
        # Jean lives in Paris
496
        ```
497

498
        :param data:
499
            A dictionary of inputs for the pipeline's components. Each key is a component name
500
            and its value is a dictionary of that component's input parameters:
501
            ```
502
            data = {
503
                "comp1": {"input1": 1, "input2": 2},
504
            }
505
            ```
506
            For convenience, this format is also supported when input names are unique:
507
            ```
508
            data = {
509
                "input1": 1, "input2": 2,
510
            }
511
            ```
512
        :param include_outputs_from:
513
            Set of component names whose individual outputs are to be
514
            included in the pipeline's output. For components that are
515
            invoked multiple times (in a loop), only the last-produced
516
            output is included.
517
        :param concurrency_limit: The maximum number of components that should be allowed to run concurrently.
518
        :returns:
519
            A dictionary where each entry corresponds to a component name
520
            and its output. If `include_outputs_from` is `None`, this dictionary
521
            will only contain the outputs of leaf components, i.e., components
522
            without outgoing connections.
523

524
        :raises ValueError:
525
            If invalid inputs are provided to the pipeline.
526
        :raises PipelineRuntimeError:
527
            If the Pipeline contains cycles with unsupported connections that would cause
528
            it to get stuck and fail running.
529
            Or if a Component fails or returns output in an unsupported type.
530
        :raises PipelineMaxComponentRuns:
531
            If a Component reaches the maximum number of times it can be run in this Pipeline.
532
        """
533
        return asyncio.run(
×
534
            self.run_async(data=data, include_outputs_from=include_outputs_from, concurrency_limit=concurrency_limit)
535
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc