• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 13658995133

04 Mar 2025 05:04PM UTC coverage: 90.206% (-0.01%) from 90.216%
13658995133

Pull #8916

github

web-flow
Merge 47991bd9a into 830e7497c
Pull Request #8916: feat: csv to document row level conversion

9616 of 10660 relevant lines covered (90.21%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.55
haystack/core/pipeline/descriptions.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
from typing import Dict, List
1✔
6

7
import networkx  # type:ignore
1✔
8

9
from haystack import logging
1✔
10
from haystack.core.component.types import InputSocket, OutputSocket
1✔
11
from haystack.core.type_utils import _type_name
1✔
12

13
logger = logging.getLogger(__name__)
1✔
14

15

16
def find_pipeline_inputs(
1✔
17
    graph: networkx.MultiDiGraph, include_connected_sockets: bool = False
18
) -> Dict[str, List[InputSocket]]:
19
    """
20
    Collect components that have disconnected/connected input sockets.
21

22
    Note that this method returns *ALL* disconnected input sockets, including all such sockets with default values.
23
    """
24
    return {
1✔
25
        name: [
26
            socket
27
            for socket in data.get("input_sockets", {}).values()
28
            if socket.is_variadic or (include_connected_sockets or not socket.senders)
29
        ]
30
        for name, data in graph.nodes(data=True)
31
    }
32

33

34
def find_pipeline_outputs(
1✔
35
    graph: networkx.MultiDiGraph, include_connected_sockets: bool = False
36
) -> Dict[str, List[OutputSocket]]:
37
    """
38
    Collect components that have disconnected/connected output sockets. They define the pipeline output.
39
    """
40
    return {
1✔
41
        name: [
42
            socket
43
            for socket in data.get("output_sockets", {}).values()
44
            if (include_connected_sockets or not socket.receivers)
45
        ]
46
        for name, data in graph.nodes(data=True)
47
    }
48

49

50
def describe_pipeline_inputs(graph: networkx.MultiDiGraph):
1✔
51
    """
52
    Returns a dictionary with the input names and types that this pipeline accepts.
53
    """
54
    inputs = {
×
55
        comp: {socket.name: {"type": socket.type, "is_mandatory": socket.is_mandatory} for socket in data}
56
        for comp, data in find_pipeline_inputs(graph).items()
57
        if data
58
    }
59
    return inputs
×
60

61

62
def describe_pipeline_inputs_as_string(graph: networkx.MultiDiGraph):
1✔
63
    """
64
    Returns a string representation of the input names and types that this pipeline accepts.
65
    """
66
    inputs = describe_pipeline_inputs(graph)
×
67
    message = "This pipeline expects the following inputs:\n"
×
68
    for comp, sockets in inputs.items():
×
69
        if sockets:
×
70
            message += f"- {comp}:\n"
×
71
            for name, socket in sockets.items():
×
72
                message += f"    - {name}: {_type_name(socket['type'])}\n"
×
73
    return message
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc