• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 12709332184

10 Jan 2025 12:11PM UTC coverage: 91.1% (+0.001%) from 91.099%
12709332184

Pull #8702

github

web-flow
Merge 3e4d6bfca into 08cf09f83
Pull Request #8702: fix: `OpenAIChatGenerator` - do not pass tools to the OpenAI client when none are provided

8660 of 9506 relevant lines covered (91.1%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

42.33
haystack/core/pipeline/pipeline.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
from copy import deepcopy
1✔
6
from typing import Any, Dict, List, Mapping, Optional, Set, Tuple
1✔
7
from warnings import warn
1✔
8

9
import networkx as nx
1✔
10

11
from haystack import logging, tracing
1✔
12
from haystack.core.component import Component
1✔
13
from haystack.core.errors import PipelineMaxComponentRuns, PipelineRuntimeError
1✔
14
from haystack.core.pipeline.base import (
1✔
15
    _dequeue_component,
16
    _dequeue_waiting_component,
17
    _enqueue_component,
18
    _enqueue_waiting_component,
19
)
20
from haystack.telemetry import pipeline_running
1✔
21

22
from .base import PipelineBase, _add_missing_input_defaults, _is_lazy_variadic
1✔
23

24
logger = logging.getLogger(__name__)
1✔
25

26

27
class Pipeline(PipelineBase):
1✔
28
    """
29
    Synchronous version of the orchestration engine.
30

31
    Orchestrates component execution according to the execution graph, one after the other.
32
    """
33

34
    def _run_component(
1✔
35
        self, name: str, inputs: Dict[str, Any], parent_span: Optional[tracing.Span] = None
36
    ) -> Dict[str, Any]:
37
        """
38
        Runs a Component with the given inputs.
39

40
        :param name: Name of the Component as defined in the Pipeline.
41
        :param inputs: Inputs for the Component.
42
        :param parent_span: The parent span to use for the newly created span.
43
            This is to allow tracing to be correctly linked to the pipeline run.
44
        :raises PipelineRuntimeError: If Component doesn't return a dictionary.
45
        :return: The output of the Component.
46
        """
47
        instance: Component = self.graph.nodes[name]["instance"]
1✔
48

49
        with tracing.tracer.trace(
1✔
50
            "haystack.component.run",
51
            tags={
52
                "haystack.component.name": name,
53
                "haystack.component.type": instance.__class__.__name__,
54
                "haystack.component.input_types": {k: type(v).__name__ for k, v in inputs.items()},
55
                "haystack.component.input_spec": {
56
                    key: {
57
                        "type": (value.type.__name__ if isinstance(value.type, type) else str(value.type)),
58
                        "senders": value.senders,
59
                    }
60
                    for key, value in instance.__haystack_input__._sockets_dict.items()  # type: ignore
61
                },
62
                "haystack.component.output_spec": {
63
                    key: {
64
                        "type": (value.type.__name__ if isinstance(value.type, type) else str(value.type)),
65
                        "receivers": value.receivers,
66
                    }
67
                    for key, value in instance.__haystack_output__._sockets_dict.items()  # type: ignore
68
                },
69
            },
70
            parent_span=parent_span,
71
        ) as span:
72
            # We deepcopy the inputs otherwise we might lose that information
73
            # when we delete them in case they're sent to other Components
74
            span.set_content_tag("haystack.component.input", deepcopy(inputs))
1✔
75
            logger.info("Running component {component_name}", component_name=name)
1✔
76
            res: Dict[str, Any] = instance.run(**inputs)
1✔
77
            self.graph.nodes[name]["visits"] += 1
1✔
78

79
            # After a Component that has variadic inputs is run, we need to reset the variadic inputs that were consumed
80
            for socket in instance.__haystack_input__._sockets_dict.values():  # type: ignore
1✔
81
                if socket.name not in inputs:
1✔
82
                    continue
×
83
                if socket.is_variadic:
1✔
84
                    inputs[socket.name] = []
1✔
85

86
            if not isinstance(res, Mapping):
1✔
87
                raise PipelineRuntimeError(
×
88
                    f"Component '{name}' didn't return a dictionary. "
89
                    "Components must always return dictionaries: check the documentation."
90
                )
91
            span.set_tag("haystack.component.visits", self.graph.nodes[name]["visits"])
1✔
92
            span.set_content_tag("haystack.component.output", res)
1✔
93

94
            return res
1✔
95

96
    def _run_subgraph(  # noqa: PLR0915
1✔
97
        self,
98
        cycle: List[str],
99
        component_name: str,
100
        components_inputs: Dict[str, Dict[str, Any]],
101
        *,
102
        include_outputs_from: Optional[Set[str]] = None,
103
        parent_span: Optional[tracing.Span] = None,
104
    ) -> Tuple[Dict[str, Any], Dict[str, Any]]:
105
        """
106
        Runs a `cycle` in the Pipeline starting from `component_name`.
107

108
        This will return once there are no inputs for the Components in `cycle`.
109

110
        This is an internal method meant to be used in `Pipeline.run()` only.
111

112
        :param cycle:
113
            List of Components that are part of the cycle being run
114
        :param component_name:
115
            Name of the Component that will start execution of the cycle
116
        :param components_inputs:
117
            Components inputs, this might include inputs for Components that are not part
118
            of the cycle but part of the wider Pipeline's graph
119
        :param include_outputs_from:
120
            Set of component names whose individual outputs are to be
121
            included in the cycle's output. In case a Component is executed multiple times
122
            only the last-produced output is included.
123
        :returns:
124
            Outputs of all the Components that are not connected to other Components in `cycle`.
125
            If `include_outputs_from` is set those Components' outputs will be included.
126
        :raises PipelineMaxComponentRuns:
127
            If a Component reaches the maximum number of times it can be run in this Pipeline
128
        """
129
        waiting_queue: List[Tuple[str, Component]] = []
×
130
        run_queue: List[Tuple[str, Component]] = []
×
131

132
        # Create the run queue starting with the component that needs to run first
133
        start_index = cycle.index(component_name)
×
134
        for node in cycle[start_index:]:
×
135
            run_queue.append((node, self.graph.nodes[node]["instance"]))
×
136

137
        include_outputs_from = set() if include_outputs_from is None else include_outputs_from
×
138

139
        before_last_waiting_queue: Optional[Set[str]] = None
×
140
        last_waiting_queue: Optional[Set[str]] = None
×
141

142
        subgraph_outputs = {}
×
143
        # These are outputs that are sent to other Components but the user explicitly
144
        # asked to include them in the final output.
145
        extra_outputs = {}
×
146

147
        # This variable is used to keep track if we still need to run the cycle or not.
148
        # When a Component doesn't send outputs to another Component
149
        # that's inside the subgraph, we stop running this subgraph.
150
        cycle_received_inputs = False
×
151

152
        while not cycle_received_inputs:
×
153
            # Here we run the Components
154
            name, comp = run_queue.pop(0)
×
155
            if _is_lazy_variadic(comp) and not all(_is_lazy_variadic(comp) for _, comp in run_queue):
×
156
                # We run Components with lazy variadic inputs only if there only Components with
157
                # lazy variadic inputs left to run
158
                _enqueue_waiting_component((name, comp), waiting_queue)
×
159
                continue
×
160

161
            # As soon as a Component returns only output that is not part of the cycle, we can stop
162
            if self._component_has_enough_inputs_to_run(name, components_inputs):
×
163
                if self.graph.nodes[name]["visits"] > self._max_runs_per_component:
×
164
                    msg = f"Maximum run count {self._max_runs_per_component} reached for component '{name}'"
×
165
                    raise PipelineMaxComponentRuns(msg)
×
166

167
                res: Dict[str, Any] = self._run_component(name, components_inputs[name], parent_span=parent_span)
×
168

169
                # Delete the inputs that were consumed by the Component and are not received from
170
                # the user or from Components that are part of this cycle
171
                sockets = list(components_inputs[name].keys())
×
172
                for socket_name in sockets:
×
173
                    senders = comp.__haystack_input__._sockets_dict[socket_name].senders  # type: ignore
×
174
                    if not senders:
×
175
                        # We keep inputs that came from the user
176
                        continue
×
177
                    all_senders_in_cycle = all(sender in cycle for sender in senders)
×
178
                    if all_senders_in_cycle:
×
179
                        # All senders are in the cycle, we can remove the input.
180
                        # We'll receive it later at a certain point.
181
                        del components_inputs[name][socket_name]
×
182

183
                if name in include_outputs_from:
×
184
                    # Deepcopy the outputs to prevent downstream nodes from modifying them
185
                    # We don't care about loops - Always store the last output.
186
                    extra_outputs[name] = deepcopy(res)
×
187

188
                # Reset the waiting for input previous states, we managed to run a component
189
                before_last_waiting_queue = None
×
190
                last_waiting_queue = None
×
191

192
                # Check if a component doesn't send any output to components that are part of the cycle
193
                final_output_reached = False
×
194
                for output_socket in res.keys():
×
195
                    for receiver in comp.__haystack_output__._sockets_dict[output_socket].receivers:  # type: ignore
×
196
                        if receiver in cycle:
×
197
                            final_output_reached = True
×
198
                            break
×
199
                    if final_output_reached:
×
200
                        break
×
201

202
                if not final_output_reached:
×
203
                    # We stop only if the Component we just ran doesn't send any output to sockets that
204
                    # are part of the cycle
205
                    cycle_received_inputs = True
×
206

207
                # We manage to run this component that was in the waiting list, we can remove it.
208
                # This happens when a component was put in the waiting list but we reached it from another edge.
209
                _dequeue_waiting_component((name, comp), waiting_queue)
×
210
                for pair in self._find_components_that_will_receive_no_input(name, res, components_inputs):
×
211
                    _dequeue_component(pair, run_queue, waiting_queue)
×
212

213
                receivers = [item for item in self._find_receivers_from(name) if item[0] in cycle]
×
214

215
                res = self._distribute_output(receivers, res, components_inputs, run_queue, waiting_queue)
×
216

217
                # We treat a cycle as a completely independent graph, so we keep track of output
218
                # that is not sent inside the cycle.
219
                # This output is going to get distributed to the wider graph after we finish running
220
                # a cycle.
221
                # All values that are left at this point go outside the cycle.
222
                if len(res) > 0:
×
223
                    subgraph_outputs[name] = res
×
224
            else:
225
                # This component doesn't have enough inputs so we can't run it yet
226
                _enqueue_waiting_component((name, comp), waiting_queue)
×
227

228
            if len(run_queue) == 0 and len(waiting_queue) > 0:
×
229
                # Check if we're stuck in a loop.
230
                # It's important to check whether previous waitings are None as it could be that no
231
                # Component has actually been run yet.
232
                if (
×
233
                    before_last_waiting_queue is not None
234
                    and last_waiting_queue is not None
235
                    and before_last_waiting_queue == last_waiting_queue
236
                ):
237
                    if self._is_stuck_in_a_loop(waiting_queue):
×
238
                        # We're stuck! We can't make any progress.
239
                        msg = (
×
240
                            "Pipeline is stuck running in a loop. Partial outputs will be returned. "
241
                            "Check the Pipeline graph for possible issues."
242
                        )
243
                        warn(RuntimeWarning(msg))
×
244
                        break
×
245

246
                    (name, comp) = self._find_next_runnable_lazy_variadic_or_default_component(waiting_queue)
×
247
                    _add_missing_input_defaults(name, comp, components_inputs)
×
248
                    _enqueue_component((name, comp), run_queue, waiting_queue)
×
249
                    continue
×
250

251
                before_last_waiting_queue = last_waiting_queue.copy() if last_waiting_queue is not None else None
×
252
                last_waiting_queue = {item[0] for item in waiting_queue}
×
253

254
                (name, comp) = self._find_next_runnable_component(components_inputs, waiting_queue)
×
255
                _add_missing_input_defaults(name, comp, components_inputs)
×
256
                _enqueue_component((name, comp), run_queue, waiting_queue)
×
257

258
        return subgraph_outputs, extra_outputs
×
259

260
    def run(  # noqa: PLR0915, PLR0912
1✔
261
        self, data: Dict[str, Any], include_outputs_from: Optional[Set[str]] = None
262
    ) -> Dict[str, Any]:
263
        """
264
        Runs the Pipeline with given input data.
265

266
        Usage:
267
        ```python
268
        from haystack import Pipeline, Document
269
        from haystack.utils import Secret
270
        from haystack.document_stores.in_memory import InMemoryDocumentStore
271
        from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
272
        from haystack.components.generators import OpenAIGenerator
273
        from haystack.components.builders.answer_builder import AnswerBuilder
274
        from haystack.components.builders.prompt_builder import PromptBuilder
275

276
        # Write documents to InMemoryDocumentStore
277
        document_store = InMemoryDocumentStore()
278
        document_store.write_documents([
279
            Document(content="My name is Jean and I live in Paris."),
280
            Document(content="My name is Mark and I live in Berlin."),
281
            Document(content="My name is Giorgio and I live in Rome.")
282
        ])
283

284
        prompt_template = \"\"\"
285
        Given these documents, answer the question.
286
        Documents:
287
        {% for doc in documents %}
288
            {{ doc.content }}
289
        {% endfor %}
290
        Question: {{question}}
291
        Answer:
292
        \"\"\"
293

294
        retriever = InMemoryBM25Retriever(document_store=document_store)
295
        prompt_builder = PromptBuilder(template=prompt_template)
296
        llm = OpenAIGenerator(api_key=Secret.from_token(api_key))
297

298
        rag_pipeline = Pipeline()
299
        rag_pipeline.add_component("retriever", retriever)
300
        rag_pipeline.add_component("prompt_builder", prompt_builder)
301
        rag_pipeline.add_component("llm", llm)
302
        rag_pipeline.connect("retriever", "prompt_builder.documents")
303
        rag_pipeline.connect("prompt_builder", "llm")
304

305
        # Ask a question
306
        question = "Who lives in Paris?"
307
        results = rag_pipeline.run(
308
            {
309
                "retriever": {"query": question},
310
                "prompt_builder": {"question": question},
311
            }
312
        )
313

314
        print(results["llm"]["replies"])
315
        # Jean lives in Paris
316
        ```
317

318
        :param data:
319
            A dictionary of inputs for the pipeline's components. Each key is a component name
320
            and its value is a dictionary of that component's input parameters:
321
            ```
322
            data = {
323
                "comp1": {"input1": 1, "input2": 2},
324
            }
325
            ```
326
            For convenience, this format is also supported when input names are unique:
327
            ```
328
            data = {
329
                "input1": 1, "input2": 2,
330
            }
331
            ```
332
        :param include_outputs_from:
333
            Set of component names whose individual outputs are to be
334
            included in the pipeline's output. For components that are
335
            invoked multiple times (in a loop), only the last-produced
336
            output is included.
337
        :returns:
338
            A dictionary where each entry corresponds to a component name
339
            and its output. If `include_outputs_from` is `None`, this dictionary
340
            will only contain the outputs of leaf components, i.e., components
341
            without outgoing connections.
342

343
        :raises PipelineRuntimeError:
344
            If the Pipeline contains cycles with unsupported connections that would cause
345
            it to get stuck and fail running.
346
            Or if a Component fails or returns output in an unsupported type.
347
        :raises PipelineMaxComponentRuns:
348
            If a Component reaches the maximum number of times it can be run in this Pipeline.
349
        """
350
        pipeline_running(self)
1✔
351

352
        # Reset the visits count for each component
353
        self._init_graph()
1✔
354

355
        # TODO: Remove this warmup once we can check reliably whether a component has been warmed up or not
356
        # As of now it's here to make sure we don't have failing tests that assume warm_up() is called in run()
357
        self.warm_up()
1✔
358

359
        # normalize `data`
360
        data = self._prepare_component_input_data(data)
1✔
361

362
        # Raise if input is malformed in some way
363
        self._validate_input(data)
1✔
364

365
        # Normalize the input data
366
        components_inputs: Dict[str, Dict[str, Any]] = self._normalize_varidiac_input_data(data)
1✔
367

368
        # These variables are used to detect when we're stuck in a loop.
369
        # Stuck loops can happen when one or more components are waiting for input but
370
        # no other component is going to run.
371
        # This can happen when a whole branch of the graph is skipped for example.
372
        # When we find that two consecutive iterations of the loop where the waiting_queue is the same,
373
        # we know we're stuck in a loop and we can't make any progress.
374
        #
375
        # They track the previous two states of the waiting_queue. So if waiting_queue would n,
376
        # before_last_waiting_queue would be n-2 and last_waiting_queue would be n-1.
377
        # When we run a component, we reset both.
378
        before_last_waiting_queue: Optional[Set[str]] = None
1✔
379
        last_waiting_queue: Optional[Set[str]] = None
1✔
380

381
        # The waiting_for_input list is used to keep track of components that are waiting for input.
382
        waiting_queue: List[Tuple[str, Component]] = []
1✔
383

384
        include_outputs_from = set() if include_outputs_from is None else include_outputs_from
1✔
385

386
        # This is what we'll return at the end
387
        final_outputs: Dict[Any, Any] = {}
1✔
388

389
        # Break cycles in case there are, this is a noop if no cycle is found.
390
        # This will raise if a cycle can't be broken.
391
        graph_without_cycles, components_in_cycles = self._break_supported_cycles_in_graph()
1✔
392

393
        run_queue: List[Tuple[str, Component]] = []
1✔
394
        for node in nx.topological_sort(graph_without_cycles):
1✔
395
            run_queue.append((node, self.graph.nodes[node]["instance"]))
1✔
396

397
        # Set defaults inputs for those sockets that don't receive input neither from the user
398
        # nor from other Components.
399
        # If they have no default nothing is done.
400
        # This is important to ensure correct order execution, otherwise some variadic
401
        # Components that receive input from the user might be run before than they should.
402
        for name, comp in self.graph.nodes(data="instance"):
1✔
403
            if name not in components_inputs:
1✔
404
                components_inputs[name] = {}
1✔
405
            for socket_name, socket in comp.__haystack_input__._sockets_dict.items():
1✔
406
                if socket_name in components_inputs[name]:
1✔
407
                    continue
1✔
408
                if not socket.senders:
1✔
409
                    value = socket.default_value
1✔
410
                    if socket.is_variadic:
1✔
411
                        value = [value]
×
412
                    components_inputs[name][socket_name] = value
1✔
413

414
        with tracing.tracer.trace(
1✔
415
            "haystack.pipeline.run",
416
            tags={
417
                "haystack.pipeline.input_data": data,
418
                "haystack.pipeline.output_data": final_outputs,
419
                "haystack.pipeline.metadata": self.metadata,
420
                "haystack.pipeline.max_runs_per_component": self._max_runs_per_component,
421
            },
422
        ) as span:
423
            # Cache for extra outputs, if enabled.
424
            extra_outputs: Dict[Any, Any] = {}
1✔
425

426
            while len(run_queue) > 0:
1✔
427
                name, comp = run_queue.pop(0)
1✔
428

429
                if _is_lazy_variadic(comp) and not all(_is_lazy_variadic(comp) for _, comp in run_queue):
1✔
430
                    # We run Components with lazy variadic inputs only if there only Components with
431
                    # lazy variadic inputs left to run
432
                    _enqueue_waiting_component((name, comp), waiting_queue)
×
433
                    continue
×
434
                if self._component_has_enough_inputs_to_run(name, components_inputs) and components_in_cycles.get(
1✔
435
                    name, []
436
                ):
437
                    cycles = components_in_cycles.get(name, [])
×
438

439
                    # This component is part of one or more cycles, let's get the first one and run it.
440
                    # We can reliably pick any of the cycles if there are multiple ones, the way cycles
441
                    # are run doesn't make a different whether we pick the first or any of the others a
442
                    # Component is part of.
443
                    subgraph_output, subgraph_extra_output = self._run_subgraph(
×
444
                        cycles[0], name, components_inputs, include_outputs_from=include_outputs_from, parent_span=span
445
                    )
446

447
                    # After a cycle is run the previous run_queue can't be correct anymore cause it's
448
                    # not modified when running the subgraph.
449
                    # So we reset it given the output returned by the subgraph.
450
                    run_queue = []
×
451

452
                    # Reset the waiting for input previous states, we managed to run at least one component
453
                    before_last_waiting_queue = None
×
454
                    last_waiting_queue = None
×
455

456
                    # Merge the extra outputs
457
                    extra_outputs.update(subgraph_extra_output)
×
458

459
                    for component_name, component_output in subgraph_output.items():
×
460
                        receivers = self._find_receivers_from(component_name)
×
461
                        component_output = self._distribute_output(
×
462
                            receivers, component_output, components_inputs, run_queue, waiting_queue
463
                        )
464

465
                        if len(component_output) > 0:
×
466
                            final_outputs[component_name] = component_output
×
467

468
                elif self._component_has_enough_inputs_to_run(name, components_inputs):
1✔
469
                    if self.graph.nodes[name]["visits"] > self._max_runs_per_component:
1✔
470
                        msg = f"Maximum run count {self._max_runs_per_component} reached for component '{name}'"
×
471
                        raise PipelineMaxComponentRuns(msg)
×
472

473
                    res: Dict[str, Any] = self._run_component(name, components_inputs[name], parent_span=span)
1✔
474

475
                    # Delete the inputs that were consumed by the Component and are not received from the user
476
                    sockets = list(components_inputs[name].keys())
1✔
477
                    for socket_name in sockets:
1✔
478
                        senders = comp.__haystack_input__._sockets_dict[socket_name].senders
1✔
479
                        if senders:
1✔
480
                            # Delete all inputs that are received from other Components
481
                            del components_inputs[name][socket_name]
1✔
482
                        # We keep inputs that came from the user
483

484
                    if name in include_outputs_from:
1✔
485
                        # Deepcopy the outputs to prevent downstream nodes from modifying them
486
                        # We don't care about loops - Always store the last output.
487
                        extra_outputs[name] = deepcopy(res)
×
488

489
                    # Reset the waiting for input previous states, we managed to run a component
490
                    before_last_waiting_queue = None
1✔
491
                    last_waiting_queue = None
1✔
492

493
                    # We manage to run this component that was in the waiting list, we can remove it.
494
                    # This happens when a component was put in the waiting list but we reached it from another edge.
495
                    _dequeue_waiting_component((name, comp), waiting_queue)
1✔
496

497
                    for pair in self._find_components_that_will_receive_no_input(name, res, components_inputs):
1✔
498
                        _dequeue_component(pair, run_queue, waiting_queue)
×
499
                    receivers = self._find_receivers_from(name)
1✔
500
                    res = self._distribute_output(receivers, res, components_inputs, run_queue, waiting_queue)
1✔
501

502
                    if len(res) > 0:
1✔
503
                        final_outputs[name] = res
1✔
504
                else:
505
                    # This component doesn't have enough inputs so we can't run it yet
506
                    _enqueue_waiting_component((name, comp), waiting_queue)
×
507

508
                if len(run_queue) == 0 and len(waiting_queue) > 0:
1✔
509
                    # Check if we're stuck in a loop.
510
                    # It's important to check whether previous waitings are None as it could be that no
511
                    # Component has actually been run yet.
512
                    if (
×
513
                        before_last_waiting_queue is not None
514
                        and last_waiting_queue is not None
515
                        and before_last_waiting_queue == last_waiting_queue
516
                    ):
517
                        if self._is_stuck_in_a_loop(waiting_queue):
×
518
                            # We're stuck! We can't make any progress.
519
                            msg = (
×
520
                                "Pipeline is stuck running in a loop. Partial outputs will be returned. "
521
                                "Check the Pipeline graph for possible issues."
522
                            )
523
                            warn(RuntimeWarning(msg))
×
524
                            break
×
525

526
                        (name, comp) = self._find_next_runnable_lazy_variadic_or_default_component(waiting_queue)
×
527
                        _add_missing_input_defaults(name, comp, components_inputs)
×
528
                        _enqueue_component((name, comp), run_queue, waiting_queue)
×
529
                        continue
×
530

531
                    before_last_waiting_queue = last_waiting_queue.copy() if last_waiting_queue is not None else None
×
532
                    last_waiting_queue = {item[0] for item in waiting_queue}
×
533

534
                    (name, comp) = self._find_next_runnable_component(components_inputs, waiting_queue)
×
535
                    _add_missing_input_defaults(name, comp, components_inputs)
×
536
                    _enqueue_component((name, comp), run_queue, waiting_queue)
×
537

538
            if len(include_outputs_from) > 0:
1✔
539
                for name, output in extra_outputs.items():
×
540
                    inner = final_outputs.get(name)
×
541
                    if inner is None:
×
542
                        final_outputs[name] = output
×
543
                    else:
544
                        # Let's not override any keys that are already
545
                        # in the final_outputs as they might be different
546
                        # from what we cached in extra_outputs, e.g. when loops
547
                        # are involved.
548
                        for k, v in output.items():
×
549
                            if k not in inner:
×
550
                                inner[k] = v
×
551

552
            return final_outputs
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc