• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ICRAR / daliuge / 4911681207

pending completion
4911681207

Pull #231

github

GitHub
Merge 9186e10d1 into e48989cce
Pull Request #231: Liu 355

180 of 229 new or added lines in 17 files covered. (78.6%)

26 existing lines in 5 files now uncovered.

15345 of 19059 relevant lines covered (80.51%)

1.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.71
/daliuge-common/dlg/common/__init__.py
1
#
2
#    ICRAR - International Centre for Radio Astronomy Research
3
#    (c) UWA - The University of Western Australia, 2020
4
#    Copyright by UWA (in the framework of the ICRAR)
5
#    All rights reserved
6
#
7
#    This library is free software; you can redistribute it and/or
8
#    modify it under the terms of the GNU Lesser General Public
9
#    License as published by the Free Software Foundation; either
10
#    version 2.1 of the License, or (at your option) any later version.
11
#
12
#    This library is distributed in the hope that it will be useful,
13
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
14
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15
#    Lesser General Public License for more details.
16
#
17
#    You should have received a copy of the GNU Lesser General Public
18
#    License along with this library; if not, write to the Free Software
19
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston,
20
#    MA 02111-1307  USA
21
#
22
from enum import Enum
3✔
23
from dataclasses import dataclass, field, asdict
3✔
24
import logging
3✔
25

26
"""Common utilities used by daliuge packages"""
27
from .osutils import terminate_or_kill, wait_or_kill
3✔
28
from .network import check_port, connect_to, portIsClosed, portIsOpen, write_to
3✔
29
from .streams import ZlibCompressedStream, JSONStream
3✔
30

31
logger = logging.getLogger(__name__)
3✔
32

33

34
class CategoryType(str, Enum):
3✔
35
    DATA = "Data"
3✔
36
    APPLICATION = "Application"
3✔
37
    CONSTRUCT = "Construct"
3✔
38
    GROUP = "Group"
3✔
39
    UNKNOWN = "Unknown"
3✔
40
    SERVICE = "Service"
3✔
41
    CONTAINER = "Container"
3✔
42
    SOCKET = "Socket"
3✔
43
    CONTROL = "Control"
3✔
44
    OTHER = "Other"
3✔
45

46

47
def b2s(b, enc="utf8"):
3✔
48
    "Converts bytes into a string"
49
    return b.decode(enc)
3✔
50

51

52
class dropdict(dict):
3✔
53
    """
54
    An intermediate representation of a DROP that can be easily serialized
55
    into a transport format such as JSON or XML.
56

57
    This dictionary holds all the important information needed to call any given
58
    DROP constructor. The most essential pieces of information are the
59
    DROP's OID, and its type (which determines the class to instantiate).
60
    Depending on the type more fields will be required. This class doesn't
61
    enforce these requirements though, as it only acts as an information
62
    container.
63

64
    This class also offers a few utility methods to make it look more like an
65
    actual DROP class. This way, users can use the same set of methods
66
    both to create DROPs representations (i.e., instances of this class)
67
    and actual DROP instances.
68

69
    Users of this class are, for example, the graph_loader module which deals
70
    with JSON -> DROP representation transformations, and the different
71
    repositories where graph templates are expected to be found by the
72
    DROPManager.
73
    """
74

75
    def __init__(self, init_dict=None):
3✔
76
        if init_dict is None:
3✔
NEW
77
            init_dict = {
×
78
                "oid": None,
79
                "categoryType": "Unknown",
80
            }
81

82
        self.update(init_dict)
3✔
83
        if "oid" not in self:
3✔
84
            self.update({"oid": None})
2✔
85
        return super().__init_subclass__()
3✔
86

87
    def _addSomething(self, other, key, name=None):
3✔
88
        if key not in self:
3✔
89
            self[key] = []
3✔
90
        if other["oid"] not in self[key]:
3✔
91
            # TODO: Returning just the other drop OID instead of the named
92
            #       port list is not a good solution. Required for the dask
93
            #       tests.
94
            append = {other["oid"]: name} if name else other["oid"]
3✔
95
            # if name is None:
96
            # raise ValueError
97
            self[key].append(append)
3✔
98

99
    def addConsumer(self, other, name=None):
3✔
100
        self._addSomething(other, "consumers", name=name)
3✔
101

102
    def addStreamingConsumer(self, other, name=None):
3✔
NEW
103
        self._addSomething(other, "streamingConsumers", name=name)
×
104

105
    def addInput(self, other, name=None):
3✔
106
        self._addSomething(other, "inputs", name=name)
3✔
107

108
    def addStreamingInput(self, other, name=None):
3✔
NEW
109
        self._addSomething(other, "streamingInputs", name=name)
×
110

111
    def addOutput(self, other, name=None):
3✔
112
        self._addSomething(other, "outputs", name=name)
2✔
113

114
    def addProducer(self, other, name=None):
3✔
115
        self._addSomething(other, "producers", name=name)
3✔
116

117

118
def _sanitize_links(links):
3✔
119
    """
120
    Links can now be dictionaries, but we only need
121
    the key.
122
    """
123
    if isinstance(links, list):
3✔
124
        nlinks = []
3✔
125
        for l in links:
3✔
126
            if isinstance(l, dict):  # could be a list of dicts
3✔
127
                nlinks.extend(list(l.keys()))
2✔
128
            else:
129
                nlinks.extend(l) if isinstance(l, list) else nlinks.append(l)
2✔
130
        return nlinks
3✔
131
    elif isinstance(links, dict):
×
132
        return list(links.keys()) if isinstance(links, dict) else links
×
133

134

135
def get_roots(pg_spec):
3✔
136
    """
137
    Returns a set with the OIDs of the dropspecs that are the roots of the given physical
138
    graph specification.
139
    """
140

141
    # We find all the nonroots first, which are easy to spot.
142
    # The rest are the roots
143
    all_oids = set()
3✔
144
    nonroots = set()
3✔
145
    for dropspec in pg_spec:
3✔
146
        # Assumed to be reprodata / other non-drop elements
147
        #
148
        # TODO (rtobar): Note that this should be a temporary measure.
149
        # In principle the pg_spec given here should be a graph, which (until
150
        # recently) consisted on drop specifications only. The fact that repro
151
        # data is now appended at the end of some graphs highlights the need for
152
        # a more formal specification of graphs and other pieces of data that we
153
        # move through the system.
154
        if "oid" not in dropspec:
3✔
155
            continue
2✔
156

157
        oid = dropspec["oid"]
3✔
158
        all_oids.add(oid)
3✔
159
        ctype = (
3✔
160
            dropspec["categoryType"]
161
            if "categoryType" in dropspec
162
            else dropspec["type"]
163
        )
164
        if ctype in (
3✔
165
            CategoryType.APPLICATION,
166
            CategoryType.SOCKET,
167
            "app",
168
        ):
169
            if dropspec.get("inputs", None) or dropspec.get(
3✔
170
                "streamingInputs", None
171
            ):
172
                nonroots.add(oid)
3✔
173
            if dropspec.get("outputs", None):
3✔
174
                do = _sanitize_links(dropspec["outputs"])
3✔
175
                nonroots |= set(do)
3✔
176
        elif ctype == CategoryType.DATA:
3✔
177
            if dropspec.get("producers", None):
3✔
178
                nonroots.add(oid)
3✔
179
            if dropspec.get("consumers", None):
3✔
180
                dc = _sanitize_links(dropspec["consumers"])
3✔
181
                nonroots |= set(dc)
3✔
182
            if dropspec.get("streamingConsumers", None):
3✔
183
                dsc = _sanitize_links(dropspec["streamingConsumers"])
×
184
                nonroots |= set(dsc)
×
185

186
    return all_oids - nonroots
3✔
187

188

189
def get_leaves(pg_spec):
3✔
190
    """
191
    Returns a set with the OIDs of the dropspecs that are the leaves of the given physical
192
    graph specification.
193
    """
194

195
    # We find all the nonleaves first, which are easy to spot.
196
    # The rest are the leaves
197
    all_oids = set()
3✔
198
    nonleaves = set()
3✔
199
    for dropspec in pg_spec:
3✔
200
        oid = dropspec["oid"]
3✔
201
        all_oids.add(oid)
3✔
202
        ctype = (
3✔
203
            dropspec["categoryType"]
204
            if "categoryType" in dropspec
205
            else dropspec["type"]
206
        )
207

208
        if ctype in [CategoryType.APPLICATION, "app"]:
3✔
209
            if dropspec.get("outputs", None):
3✔
210
                nonleaves.add(oid)
2✔
211
            if dropspec.get("streamingInputs", None):
3✔
212
                dsi = _sanitize_links(dropspec["streamingInputs"])
×
213
                nonleaves |= set(dsi)
×
214
            if dropspec.get("inputs", None):
3✔
215
                di = _sanitize_links(dropspec["inputs"])
3✔
216
                nonleaves |= set(di)
3✔
217
        if ctype in [CategoryType.SERVICE, "socket"]:
3✔
218
            nonleaves.add(oid)  # services are never leaves
×
219
            if dropspec.get("streamingInputs", None):
×
220
                dsi = _sanitize_links(dropspec["streamingInputs"])
×
221
                nonleaves |= set(dsi)
×
222
            if dropspec.get("inputs", None):
×
223
                di = _sanitize_links(dropspec["inputs"])
×
224
                nonleaves |= set(di)
×
225
        elif ctype in [CategoryType.DATA, "data"]:
3✔
226
            if dropspec.get("producers", None):
3✔
227
                dp = _sanitize_links(dropspec["producers"])
3✔
228
                nonleaves |= set(dp)
3✔
229
            if dropspec.get("consumers", None) or dropspec.get(
3✔
230
                "streamingConsumers", None
231
            ):
232
                nonleaves.add(oid)
2✔
233

234
    return all_oids - nonleaves
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc