• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ICRAR / daliuge / 4908056526

pending completion
4908056526

push

github

Andreas Wicenec
Fixed small issues with existing graphs

20 of 25 new or added lines in 2 files covered. (80.0%)

88 existing lines in 5 files now uncovered.

15342 of 19053 relevant lines covered (80.52%)

1.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.78
/daliuge-translator/dlg/dropmake/lg_node.py
1
#
2
#    ICRAR - International Centre for Radio Astronomy Research
3
#    (c) UWA - The University of Western Australia, 2015
4
#    Copyright by UWA (in the framework of the ICRAR)
5
#    All rights reserved
6
#
7
#    This library is free software; you can redistribute it and/or
8
#    modify it under the terms of the GNU Lesser General Public
9
#    License as published by the Free Software Foundation; either
10
#    version 2.1 of the License, or (at your option) any later version.
11
#
12
#    This library is distributed in the hope that it will be useful,
13
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
14
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15
#    Lesser General Public License for more details.
16
#
17
#    You should have received a copy of the GNU Lesser General Public
18
#    License along with this library; if not, write to the Free Software
19
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston,
20
#    MA 02111-1307  USA
21
#
22
"""
2✔
23
The DALiuGE resource manager uses the requested logical graphs, the available resources and
24
the profiling information and turns it into the partitioned physical graph,
25
which will then be deployed and monitored by the Physical Graph Manager
26
"""
27

28
if __name__ == "__main__":
2✔
29
    __package__ = "dlg.dropmake"
×
30

31
import logging
2✔
32
import math
2✔
33
import random
2✔
34
import re
2✔
35

36
from dlg.common import CategoryType
2✔
37
from dlg.common import dropdict
2✔
38
from dlg.dropmake.dm_utils import (
2✔
39
    GraphException,
40
    GInvalidLink,
41
    GInvalidNode,
42
)
43
from dlg.dropmake.utils.bash_parameter import BashCommand
2✔
44
from .definition_classes import Categories, DATA_TYPES, APP_TYPES
2✔
45

46
logger = logging.getLogger(__name__)
2✔
47

48

49
class LGNode:
2✔
50
    def __init__(self, jd, group_q, done_dict, ssid):
2✔
51
        """
52
        jd: json_dict (dict)
53
        group_q: group queue (defaultdict)
54
        done_dict: LGNode that have been processed (Dict)
55
        ssid:   session id (string)
56
        """
57
        self.id = jd["key"]  # node ID
2✔
58
        self.jd = jd  # JSON TODO: this should be removed
2✔
59
        self.group_q = group_q  # the group hierarchy queue
2✔
60
        self.group = None  # used if node belongs to group
2✔
61
        self._children = []  # list of LGNode objects, children of this node
2✔
62
        self._ssid = ssid  # session ID
2✔
63
        self.is_app = self.jd["categoryType"] == CategoryType.APPLICATION
2✔
64
        self.is_data = self.jd["categoryType"] == CategoryType.DATA
2✔
65
        self.weight = 1  # try to find the weights, else set to 1
2✔
66
        self._converted = False
2✔
67
        self._h_level = None  # hierarcht level
2✔
68
        self._g_h = None
2✔
69
        self._dop = None  # degree of parallelism
2✔
70
        self._gaw = None
2✔
71
        self._grpw = None
2✔
72
        self._inputs = []  # list of LGNode objects connected to this node
2✔
73
        self._outputs = []  # list of LGNode objects connected to this node
2✔
74
        self.inputPorts = "inputPorts"
2✔
75
        self.outputPorts = "outputPorts"
2✔
76
        logger.debug("%s input_ports: %s", self.name, self.inputPorts)
2✔
77
        logger.debug("%s output_ports: %s", self.name, self.outputPorts)
2✔
78
        self.dropclass = ""  # e.g. dlg.apps.simple.HelloWorldAPP
2✔
79
        self.reprodata = jd.get("reprodata", {}).copy()
2✔
80
        if "isGroup" in jd and jd["isGroup"] is True:
2✔
81
            self.is_group = True
2✔
82
            for wn in group_q[self.id]:
2✔
83
                wn.group = self
2✔
84
                self.add_child(wn)
2✔
85
            group_q.pop(self.id)  # not thread safe
2✔
86
        else:
87
            self.is_group = False
2✔
88

89
        if "group" in jd:
2✔
90
            grp_id = jd["group"]
2✔
91
            if grp_id in done_dict:
2✔
92
                grp_nd = done_dict[grp_id]
2✔
93
                self.group = grp_nd
2✔
94
                grp_nd.add_child(self)
2✔
95
            else:
96
                group_q[grp_id].append(self)
2✔
97

98
        done_dict[self.id] = self
2✔
99

100
    # def __str__(self):
101
    #     return json.dumps(self.jd)
102

103
    @property
2✔
104
    def inputPorts(self):
2✔
105
        return self._inputPorts
×
106

107
    @inputPorts.setter
2✔
108
    def inputPorts(self, value):
2✔
109
        if (
×
110
            "categoryType" in value and value["categoryType"] == "Construct"
111
        ) or ("type" in value and value["type"] == "Construct"):
112
            self._inputPorts = []
×
113
        elif not "inputPorts" in value:
×
114
            self._inputPorts = [
×
115
                f
116
                for f in value["fields"]
117
                if "usage" in f and f["usage"] in ["InputPort", "InOutPort"]
118
            ]
119
            # we need this as long as the fields are still using "name"
120
            if len(self._inputPorts) > 0 and "name" in self._inputPorts[0]:
×
121
                for p in self._inputPorts:
×
122
                    p["name"] = p["name"]
×
123
        else:
124
            self._inputPorts = value["inputPorts"]
×
125

126
    @property
2✔
127
    def outputPorts(self):
2✔
128
        return self._outputPorts
×
129

130
    @outputPorts.setter
2✔
131
    def outputPorts(self, value):
2✔
132
        if (
×
133
            "categoryType" in value and value["categoryType"] == "Construct"
134
        ) or ("type" in value and value["type"] == "Construct"):
135
            self._outputPorts = []
×
136
        elif not "outputPorts" in value:
×
137
            self._outputPorts = [
×
138
                f
139
                for f in value["fields"]
140
                if f["usage"] in ["OutputPort", "InOutPort"]
141
            ]
142
            # we need this as long as the fields are still using "name"
143
            if len(self._outputPorts) > 0 and "name" in self._outputPorts[0]:
×
144
                for p in self._outputPorts:
×
145
                    p["name"] = p["name"]
×
146
        else:
147
            self._outputPorts = value["outputPorts"]
×
148

149
    @property
2✔
150
    def jd(self):
2✔
151
        return self._jd
2✔
152

153
    @jd.setter
2✔
154
    def jd(self, node_json):
2✔
155
        """
156
        Setting he jd property to the original data structure directly loaded
157
        from JSON.
158
        """
159
        if "categoryType" not in node_json:
2✔
160
            if node_json["category"] in APP_TYPES:
2✔
161
                node_json["categoryType"] = CategoryType.APPLICATION
2✔
162
            elif node_json["category"] in DATA_TYPES:
×
163
                node_json["categoryType"] = CategoryType.DATA
×
164
        self._jd = node_json
2✔
165

166
    @property
2✔
167
    def reprodata(self):
2✔
168
        return self._reprodata
×
169

170
    @reprodata.setter
2✔
171
    def reprodata(self, value):
2✔
172
        self._reprodata = value
2✔
173

174
    @property
2✔
175
    def is_group(self):
2✔
176
        return self._is_group
2✔
177

178
    @is_group.setter
2✔
179
    def is_group(self, value):
2✔
180
        self._is_group = value
2✔
181

182
    @property
2✔
183
    def id(self):
2✔
184
        return self._id
2✔
185

186
    @id.setter
2✔
187
    def id(self, value):
2✔
188
        self._id = value
2✔
189

190
    @property
2✔
191
    def nodetype(self):
2✔
192
        return self._nodetype
×
193

194
    @nodetype.setter
2✔
195
    def nodetype(self, value):
2✔
196
        self._nodetype = value
×
197

198
    @property
2✔
199
    def dropclass(self):
2✔
200
        return self._dropclass
2✔
201

202
    @dropclass.setter
2✔
203
    def dropclass(self, default_value):
2✔
204
        self.is_data = False
2✔
205
        self.is_app = False
2✔
206
        keys = []
2✔
207
        value = None
2✔
208
        if default_value is None or len(default_value) == 0:
2✔
209
            default_value = "dlg.apps.simple.SleepApp"
2✔
210
        if self.jd["categoryType"] == CategoryType.DATA:
2✔
211
            self.is_data = True
2✔
212
            keys = [
2✔
213
                "dropclass",
214
                "Data class",
215
                "dataclass",
216
            ]
217
        elif self.jd["categoryType"] == CategoryType.APPLICATION:
2✔
218
            keys = [
2✔
219
                "dropclass",
220
                "Application Class",
221
                "Application class",
222
                "appclass",
223
            ]
224
            self.is_app = True
2✔
225
        elif self.jd["categoryType"] in [
2✔
226
            CategoryType.CONSTRUCT,
227
            CategoryType.CONTROL,
228
        ]:
229
            keys = ["inputApplicationName"]
2✔
230
        elif self.jd["categoryType"] in ["Other"]:
2✔
231
            value = "Other"
2✔
232
        else:
233
            logger.error(
2✔
234
                "Found unknown categoryType: %s", self.jd["categoryType"]
235
            )
236
            # raise ValueError
237
        for key in keys:
2✔
238
            if key in self.jd:
2✔
239
                value = self.jd[key]
2✔
240
                break
2✔
241
            if value is None or value == "":
2✔
242
                value = default_value
2✔
243

244
        self._dropclass = value
2✔
245

246
    @property
2✔
247
    def name(self):
2✔
248
        if self.jd.get("name"):
2✔
249
            # backwards compatibility
250
            # TODO: deprecated
251
            return self.jd.get("name", "")
2✔
252
        else:
253
            return self.jd.get("name", "")
2✔
254

255
    @property
2✔
256
    def category(self):
2✔
257
        return self.jd.get("category", "Unknown")
2✔
258

259
    @property
2✔
260
    def categoryType(self):
2✔
261
        return self.jd.get("categoryType", "Unknown")
2✔
262

263
    @property
2✔
264
    def group(self):
2✔
265
        return self._grp
2✔
266

267
    @group.setter
2✔
268
    def group(self, value):
2✔
269
        self._grp = value
2✔
270

271
    def has_group(self):
2✔
272
        return self.group is not None
2✔
273

274
    def has_converted(self):
2✔
UNCOV
275
        return self._converted
×
276

277
    def complete_conversion(self):
2✔
UNCOV
278
        self._converted = True
×
279

280
    @property
2✔
281
    def gid(self):
2✔
282
        if self.group is None:
2✔
283
            return 0
2✔
284
        else:
285
            return self.group.id
2✔
286

287
    def add_output(self, lg_node):
2✔
288
        if lg_node not in self._outputs:
2✔
289
            self._outputs.append(lg_node)
2✔
290

291
    def add_input(self, lg_node):
2✔
292
        # only add if not already there
293
        # this may happen in nested constructs
294
        if lg_node not in self._inputs:
2✔
295
            self._inputs.append(lg_node)
2✔
296

297
    def add_child(self, lg_node):
2✔
298
        """
299
        Add a group member
300
        """
301
        if (
2✔
302
            lg_node.is_group
303
            and not (lg_node.is_scatter)
304
            and not (lg_node.is_loop)
305
            and not (lg_node.is_groupby)
306
        ):
UNCOV
307
            raise GInvalidNode(
×
308
                "Only Scatters, Loops and GroupBys can be nested, but {0} is neither".format(
309
                    lg_node.id
310
                )
311
            )
312
        self._children.append(lg_node)
2✔
313

314
    @property
2✔
315
    def children(self):
2✔
316
        return self._children
2✔
317

318
    @property
2✔
319
    def outputs(self):
2✔
UNCOV
320
        return self._outputs
×
321

322
    @property
2✔
323
    def inputs(self):
2✔
324
        return self._inputs
2✔
325

326
    @property
2✔
327
    def h_level(self):
2✔
328
        if self._h_level is None:
2✔
329
            _level = 0
2✔
330
            cg = self
2✔
331
            while cg.has_group():
2✔
332
                cg = cg.group
2✔
333
                _level += 1
2✔
334
            if self.is_mpi:
2✔
UNCOV
335
                _level += 1
×
336
            self._h_level = _level
2✔
337
        return self._h_level
2✔
338

339
    @property
2✔
340
    def group_hierarchy(self):
2✔
341
        if self._g_h is None:
2✔
342
            glist = []
2✔
343
            cg = self
2✔
344
            while cg.has_group():
2✔
345
                glist.append(str(cg.gid))
2✔
346
                cg = cg.group
2✔
347
            glist.append("0")
2✔
348
            self._g_h = "/".join(reversed(glist))
2✔
349
        return self._g_h
2✔
350

351
    @property
2✔
352
    def weight(self):
2✔
353
        return self._weight
2✔
354

355
    @weight.setter
2✔
356
    def weight(self, default_value):
2✔
357
        key = []
2✔
358
        if self.is_app:
2✔
359
            key = [
2✔
360
                k
361
                for k in self.jd
362
                if re.match(r"execution[\s\_]time", k.lower())
363
            ]
364
        elif self.is_data:
2✔
365
            key = [
2✔
366
                k for k in self.jd if re.match(r"data[\s\_]volume", k.lower())
367
            ]
368
        try:
2✔
369
            self._weight = int(self.jd[key[0]])
2✔
370
        except (KeyError, ValueError, IndexError):
2✔
371
            self._weight = int(default_value)
2✔
372

373
    @property
2✔
374
    def has_child(self):
2✔
UNCOV
375
        return len(self._children) > 0
×
376

377
    @property
2✔
378
    def has_output(self):
2✔
UNCOV
379
        return len(self._outputs) > 0
×
380

381
    @property
2✔
382
    def is_data(self):
2✔
383
        return self._is_data
2✔
384

385
    @is_data.setter
2✔
386
    def is_data(self, value):
2✔
387
        self._is_data = value
2✔
388

389
    @property
2✔
390
    def is_app(self):
2✔
391
        return self._is_app
2✔
392
        # return self.jd["categoryType"] == CategoryType.APPLICATION
393

394
    @is_app.setter
2✔
395
    def is_app(self, value):
2✔
396
        self._is_app = value
2✔
397

398
    @property
2✔
399
    def is_start_node(self):
2✔
400
        return self.jd["category"] == Categories.START
2✔
401

402
    @property
2✔
403
    def is_end_node(self):
2✔
UNCOV
404
        return self.jd["category"] == Categories.END
×
405

406
    @property
2✔
407
    def is_start(self):
2✔
408
        return not self.has_group()
2✔
409

410
    @property
2✔
411
    def is_dag_root(self):
2✔
412
        leng = len(self.inputs)
×
UNCOV
413
        if leng > 1:
×
414
            return False
×
415
        elif leng == 0:
×
416
            if self.is_start_node:
×
UNCOV
417
                return False
×
418
            else:
UNCOV
419
                return True
×
UNCOV
420
        elif self.is_start:
×
UNCOV
421
            return True
×
422
        else:
UNCOV
423
            return False
×
424

425
    @property
2✔
426
    def is_start_listener(self):
2✔
427
        """
428
        is this a socket listener node
429
        """
430
        return len(self.inputs) == 1 and self.is_start_node and self.is_data
2✔
431

432
    @property
2✔
433
    def is_group_start(self):
2✔
434
        """
435
        is this a node starting a group (usually inside a loop)
436
        """
437
        result = False
2✔
438
        if self.has_group() and (
2✔
439
            "group_start" in self.jd
440
            or "Group start" in self.jd
441
            or "Group Start" in self.jd
442
        ):
443
            gs = (
2✔
444
                self.jd.get("group_start", False)
445
                if "group_start" in self.jd
446
                else self.jd.get("Group start", False)
447
            )
448
            if type(gs) == type(True):
2✔
449
                result = gs
2✔
450
            elif type(gs) in [type(1), type(1.0)]:
2✔
451
                result = 1 == gs
2✔
452
            elif type(gs) == type("s"):
2✔
453
                result = gs.lower() in ("true", "1")
2✔
454
        return result
2✔
455

456
    @property
2✔
457
    def is_group_end(self):
2✔
458
        """
459
        is this a node ending a group (usually inside a loop)
460
        """
461
        result = False
2✔
462
        if self.has_group() and (
2✔
463
            "group_end" in self.jd
464
            or "Group end" in self.jd
465
            or "Group End" in self.jd
466
        ):
467
            ge = (
2✔
468
                self.jd.get("group_end", False)
469
                if "group_end" in self.jd
470
                else self.jd.get("Group end", False)
471
            )
472
            if type(ge) == type(True):
2✔
473
                result = ge
2✔
474
            elif type(ge) in [type(1), type(1.0)]:
2✔
UNCOV
475
                result = 1 == ge
×
476
            elif type(ge) == type("s"):
2✔
477
                result = ge.lower() in ("true", "1")
2✔
478
        return result
2✔
479

480
    @property
2✔
481
    def is_scatter(self):
2✔
482
        return self.is_group and self._jd["category"] == Categories.SCATTER
2✔
483

484
    @property
2✔
485
    def is_gather(self):
2✔
486
        return self._jd["category"] == Categories.GATHER
2✔
487

488
    @property
2✔
489
    def is_loop(self):
2✔
490
        return self.is_group and self._jd["category"] == Categories.LOOP
2✔
491

492
    @property
2✔
493
    def is_service(self):
2✔
494
        """
495
        Determines whether a node the parent service node (not the input application)
496
        """
497
        return self._jd["category"] == Categories.SERVICE
2✔
498

499
    @property
2✔
500
    def is_groupby(self):
2✔
501
        return self._jd["category"] == Categories.GROUP_BY
2✔
502

503
    @property
2✔
504
    def is_branch(self):
2✔
505
        # This is the only special thing required for a branch
506
        return self._jd["category"] == Categories.BRANCH
2✔
507

508
    @property
2✔
509
    def is_mpi(self):
2✔
510
        return self._jd["category"] == Categories.MPI
2✔
511

512
    @property
2✔
513
    def group_keys(self):
2✔
514
        """
515
        Return:
516
            None or a list of keys (each key is an integer)
517
        """
518
        if not self.is_groupby:
2✔
519
            return None
2✔
520
        val = str(self.jd.get("group_key", "None"))
2✔
521
        if val in ["None", "-1", ""]:
2✔
522
            return None
2✔
523
        else:
UNCOV
524
            try:
×
UNCOV
525
                return [int(x) for x in val.split(",")]
×
UNCOV
526
            except ValueError as ve:
×
UNCOV
527
                raise GraphException(
×
528
                    "group_key must be an integer or comma-separated integers: {0}".format(
529
                        ve
530
                    )
531
                )
532

533
    @property
2✔
534
    def inputPorts(self):
2✔
535
        return self._inputPorts
2✔
536

537
    @inputPorts.setter
2✔
538
    def inputPorts(self, port="inputPorts"):
2✔
539
        self._inputPorts = self._getPortName(ports="inputPorts", index=0)
2✔
540

541
    @property
2✔
542
    def outputPorts(self):
2✔
543
        return self._outputPorts
2✔
544

545
    @outputPorts.setter
2✔
546
    def outputPorts(self, port="outputPorts"):
2✔
547
        self._outputPorts = self._getPortName(ports="outputPorts", index=0)
2✔
548

549
    @property
2✔
550
    def gather_width(self):
2✔
551
        """
552
        Gather width
553
        """
554
        if self.is_gather:
2✔
555
            if self._gaw is None:
2✔
556
                try:
2✔
557
                    self._gaw = int(self.jd["num_of_inputs"])
2✔
UNCOV
558
                except:
×
UNCOV
559
                    self._gaw = 1
×
560
            return self._gaw
2✔
561
        else:
562
            """
563
            TODO: use OO style to replace all type-related statements!
564
            """
UNCOV
565
            return None
×
566

567
    @property
2✔
568
    def groupby_width(self):
2✔
569
        """
570
        GroupBy count
571
        """
572
        if self.is_groupby:
2✔
573
            if self._grpw is None:
2✔
574
                tlgn = self.inputs[0]
2✔
575
                re_dop = 1
2✔
576
                cg = tlgn.group  # exclude its own group
2✔
577
                while cg.has_group():
2✔
578
                    re_dop *= cg.group.dop
2✔
579
                    cg = cg.group
2✔
580
                self._grpw = re_dop
2✔
581
            return self._grpw
2✔
582
        else:
UNCOV
583
            return None
×
584

585
    @property
2✔
586
    def group_by_scatter_layers(self):
2✔
587
        """
588
        Return:
589
            scatter layers info associated with this group by logical node
590
            A tuple of three items:
591
                (1) DoP
592
                (2) layer indexes (list) from innser scatter to outer scatter
593
                (3) layers (list)
594
        """
595
        if not self.is_groupby:
2✔
UNCOV
596
            return None
×
597

598
        tlgn = self.inputs[0]
2✔
599
        grpks = self.group_keys
2✔
600
        ret_dop = 1
2✔
601
        layer_index = []  # from inner to outer
2✔
602
        layers = []  # from inner to outer
2✔
603
        c = 0
2✔
604
        if tlgn.group.is_groupby:
2✔
605
            # group by followed by another group by
UNCOV
606
            if grpks is None or len(grpks) < 1:
×
UNCOV
607
                raise GInvalidNode(
×
608
                    "Must specify group_key for Group By '{0}'".format(
609
                        self.name
610
                    )
611
                )
612
            # find the "root" groupby and get all of its scatters
UNCOV
613
            inputgrp = self
×
UNCOV
614
            while (inputgrp is not None) and inputgrp.inputs[
×
615
                0
616
            ].group.is_groupby:
617
                inputgrp = inputgrp.inputs[0].group
×
618
            # inputgrp now is the "root" groupby that follows Scatter immiately
619
            # move it to Scatter
620
            inputgrp = inputgrp.inputs[0].group
×
621
            # go thru all the scatters
UNCOV
622
            while (inputgrp is not None) and inputgrp.is_scatter:
×
623
                if inputgrp.id in grpks:
×
UNCOV
624
                    ret_dop *= inputgrp.dop
×
625
                    layer_index.append(c)
×
626
                    layers.append(inputgrp)
×
627
                inputgrp = inputgrp.group
×
628
                c += 1
×
629
        else:
630
            if grpks is None or len(grpks) < 1:
2✔
631
                ret_dop = tlgn.group.dop
2✔
632
                layer_index.append(0)
2✔
633
                layers.append(tlgn.group)
2✔
634
            else:
UNCOV
635
                if len(grpks) == 1:
×
UNCOV
636
                    if grpks[0] == tlgn.group.id:
×
UNCOV
637
                        ret_dop = tlgn.group.dop
×
638
                        layer_index.append(0)
×
639
                        layers.append(tlgn.group)
×
640
                    else:
641
                        raise GInvalidNode(
×
642
                            "Wrong single group_key for {0}".format(self.name)
643
                        )
644
                else:
UNCOV
645
                    inputgrp = tlgn.group
×
646
                    # find the "groupby column list" from all layers of scatter loops
UNCOV
647
                    while (inputgrp is not None) and inputgrp.is_scatter:
×
648
                        if inputgrp.id in grpks:
×
UNCOV
649
                            ret_dop *= inputgrp.dop
×
650
                            layer_index.append(c)
×
651
                            layers.append(inputgrp)
×
652
                        inputgrp = inputgrp.group
×
653
                        c += 1
×
654

655
        return ret_dop, layer_index, layers
2✔
656

657
    @property
2✔
658
    def dop(self):
2✔
659
        """
660
        Degree of Parallelism:  integer
661
        default:    1
662
        """
663
        if self._dop is None:
2✔
664
            if self.is_group:
2✔
665
                if self.is_scatter:
2✔
666
                    for kw in [
2✔
667
                        "num_of_copies",
668
                        "num_of_splits",
669
                        "Number of copies",
670
                    ]:
671
                        if kw in self.jd:
2✔
672
                            self._dop = int(self.jd[kw])
2✔
673
                            break
2✔
674
                    if self._dop is None:
2✔
UNCOV
675
                        self._dop = 4  # dummy impl. TODO: Why is this here?
×
676
                elif self.is_gather:
2✔
677
                    try:
2✔
678
                        tlgn = self.inputs[0]
2✔
UNCOV
679
                    except IndexError:
×
UNCOV
680
                        raise GInvalidLink(
×
681
                            "Gather '{0}' does not have input!".format(self.id)
682
                        )
683
                    if tlgn.is_groupby:
2✔
684
                        tt = tlgn.dop
2✔
685
                    else:
686
                        tt = self.dop_diff(tlgn)
2✔
687
                    self._dop = int(math.ceil(tt / float(self.gather_width)))
2✔
688
                elif self.is_groupby:
2✔
689
                    self._dop = self.group_by_scatter_layers[0]
2✔
690
                elif self.is_loop:
2✔
691
                    for key in [
2✔
692
                        "num_of_iter",
693
                        "Number of Iterations",
694
                        "Number of loops",
695
                    ]:
696
                        if key in self.jd:
2✔
697
                            self._dop = int(self.jd.get(key, 1))
2✔
698
                            break
2✔
UNCOV
699
                elif self.is_service:
×
UNCOV
700
                    self._dop = 1  # TODO: number of compute nodes
×
701
                else:
702
                    raise GInvalidNode(
×
703
                        "Unrecognised (Group) Logical Graph Node: '{0}'".format(
704
                            self._jd["category"]
705
                        )
706
                    )
707
            elif self.is_mpi:
2✔
708
                self._dop = int(self.jd["num_of_procs"])
2✔
709
            else:
UNCOV
710
                self._dop = 1
×
711
        return self._dop
2✔
712

713
    def dop_diff(self, that_lgn):
2✔
714
        """
715
        TODO: This does not belong in the LGNode class
716

717
        dop difference between inner node/group and outer group
718
        e.g for each outer group, how many instances of inner nodes/groups
719
        """
720
        # if (self.is_group() or that_lgn.is_group()):
721
        #     raise GraphException("Cannot compute dop diff between groups.")
722
        # don't check h_related for efficiency since it should have been checked
723
        # if (self.h_related(that_lgn)):
724
        il = self.h_level
2✔
725
        al = that_lgn.h_level
2✔
726
        if il == al:
2✔
727
            return 1
2✔
728
        elif il > al:
2✔
729
            oln = that_lgn
2✔
730
            iln = self
2✔
731
        else:
732
            iln = that_lgn
2✔
733
            oln = self
2✔
734
        re_dop = 1
2✔
735
        cg = iln
2✔
736
        init_cond = cg.gid != oln.gid and cg.has_group()
2✔
737
        while init_cond or cg.is_mpi:
2✔
738
            if cg.is_mpi:
2✔
UNCOV
739
                re_dop *= cg.dop
×
740
            # else:
741
            if init_cond:
2✔
742
                re_dop *= cg.group.dop
2✔
743
            cg = cg.group
2✔
744
            if cg is None:
2✔
UNCOV
745
                break
×
746
            init_cond = cg.gid != oln.gid and cg.has_group()
2✔
747
        return re_dop
2✔
748
        # else:
749
        #     pass
750
        # raise GInvalidLink("{0} and {1} are not hierarchically related".format(self.id, that_lgn.id))
751

752
    def h_related(self, that_lgn):
2✔
753
        """
754
        TODO: This does not belong in the LGNode class
755
        """
756
        that_gh = that_lgn.group_hierarchy
2✔
757
        this_gh = self.group_hierarchy
2✔
758
        if len(that_gh) + len(this_gh) <= 1:
2✔
759
            # at least one is at the root level
UNCOV
760
            return True
×
761

762
        return that_gh.find(this_gh) > -1 or this_gh.find(that_gh) > -1
2✔
763

764
    def make_oid(self, iid="0"):
2✔
765
        """
766
        return:
767
            ssid_id_iid (string), where
768
            ssid:   session id
769
            id:     logical graph node key
770
            iid:    instance id (for the physical graph node)
771
        """
772
        # TODO: This is rather ugly, but a quick and dirty fix. The iid is the rank data we need
773
        rank = [int(x) for x in iid.split("/")]
2✔
774
        return "{0}_{1}_{2}".format(self._ssid, self.id, iid), rank
2✔
775

776
    def _update_key_value_attributes(self, kwargs):
2✔
777
        # get the arguments from new fields dictionary in a backwards compatible way
778
        if "fields" in self.jd:
2✔
779
            self.jd.update({"nodeAttributes": {}})
2✔
780
            kwargs.update({"nodeAttributes": {}})
2✔
781
            for je in self.jd["fields"]:
2✔
782
                # The field to be used is not the text, but the name field
783
                self.jd[je["name"]] = je["value"]
2✔
784
                kwargs[je["name"]] = je["value"]
2✔
785
                self.jd["nodeAttributes"].update({je["name"]: je})
2✔
786
                kwargs["nodeAttributes"].update({je["name"]: je})
2✔
787
        kwargs[
2✔
788
            "applicationArgs"
789
        ] = {}  # make sure the dict always exists downstream
790
        if "applicationArgs" in self.jd:  # and fill it if provided
2✔
UNCOV
791
            for je in self.jd["applicationArgs"]:
×
UNCOV
792
                j = {je["name"]: {k: je[k] for k in je if k not in ["name"]}}
×
UNCOV
793
                self.jd.update(j)
×
794
                kwargs["applicationArgs"].update(j)
×
795
        if "nodeAttributes" not in kwargs:
2✔
796
            kwargs.update({"nodeAttributes": {}})
×
797
        for k, na in kwargs["nodeAttributes"].items():
2✔
798
            if (
2✔
799
                "parameterType" in na
800
                and na["parameterType"] == "ApplicationArgument"
801
            ):
802
                kwargs["applicationArgs"].update({k: na})
2✔
803
        # NOTE: drop Argxx keywords
804

805
    def _getPortName(
2✔
806
        self, ports: str = "outputPorts", index: int = 0, portId=None
807
    ):
808
        """
809
        Return name of port if it exists
810
        """
811
        port_selector = {
2✔
812
            "inputPorts": ["InputPort", "InputOutput"],
813
            "outputPorts": ["OutputPort", "InputOutput"],
814
        }
815
        ports_dict = {}
2✔
816
        name = None
2✔
817
        # if portId is None and index >= 0:
818
        if index >= 0:
2✔
819
            if ports in port_selector:
2✔
820
                for field in self.jd["fields"]:
2✔
821
                    if "usage" not in field:  # fixes manual graphs
2✔
822
                        continue
2✔
823
                    if field["usage"] in port_selector[ports]:
2✔
824
                        if portId is None:
2✔
825
                            name = field["name"]
2✔
826
                        elif field["id"] == portId:
2✔
827
                            name = field["name"]
2✔
828
                        # can't be sure that name is unique
829
                        if name not in ports_dict:
2✔
830
                            ports_dict[name] = [field["id"]]
2✔
831
                        else:
832
                            ports_dict[name].append(field["id"])
2✔
833
        else:
834
            # TODO: This is not really correct, but maybe not needed at all?
UNCOV
835
            for port in port_selector[ports]:
×
UNCOV
836
                name = [
×
837
                    p["name"]
838
                    for p in self.jd[port]
839
                    if port in self.jd and p["Id"] == portId
840
                ]
UNCOV
841
                name = name[0] if len(name) > 0 else None
×
UNCOV
842
                if name is not None:
×
UNCOV
843
                    break
×
844
        return name if index >= 0 else ports_dict
2✔
845

846
    def _create_groupby_drops(self, drop_spec):
2✔
847
        drop_spec.update(
2✔
848
            {
849
                "dropclass": "dlg.apps.simple.SleepApp",
850
                "categoryType": "Application",
851
            }
852
        )
853
        sij = self.inputs[0]
2✔
854
        if not sij.is_data:
2✔
UNCOV
855
            raise GInvalidNode(
×
856
                "GroupBy should be connected to a DataDrop, not '%s'"
857
                % sij.category
858
            )
859
        dw = sij.weight * self.groupby_width
2✔
860

861
        # additional generated drop
862
        dropSpec_grp = dropdict(
2✔
863
            {
864
                "oid": "{0}-grp-data".format(drop_spec["oid"]),
865
                "categoryType": CategoryType.DATA,
866
                "dropclass": "dlg.data.drops.memory.InMemoryDROP",
867
                "name": "grpdata",
868
                "weight": dw,
869
                "rank": drop_spec["rank"],
870
                "reprodata": self.jd.get("reprodata", {}),
871
            }
872
        )
873
        kwargs = {}
2✔
874
        kwargs["grp-data_drop"] = dropSpec_grp
2✔
875
        kwargs[
2✔
876
            "weight"
877
        ] = 1  # barrier literarlly takes no time for its own computation
878
        kwargs["sleep_time"] = 1
2✔
879
        drop_spec.update(kwargs)
2✔
880
        drop_spec.addOutput(dropSpec_grp, name="grpdata")
2✔
881
        dropSpec_grp.addProducer(drop_spec, name="grpdata")
2✔
882
        return drop_spec
2✔
883

884
    def _create_gather_drops(self, drop_spec):
2✔
885
        drop_spec.update(
2✔
886
            {
887
                "dropclass": "dlg.apps.simple.SleepApp",
888
                "categoryType": "Application",
889
            }
890
        )
891
        gi = self.inputs[0]
2✔
892
        if gi.is_groupby:
2✔
893
            gii = gi.inputs[0]
2✔
894
            dw = (
2✔
895
                int(gii.jd["data_volume"])
896
                * gi.groupby_width
897
                * self.gather_width
898
            )
899
        else:  # data
900
            dw = gi.weight * self.gather_width
2✔
901

902
            # additional generated drop
903
        dropSpec_gather = dropdict(
2✔
904
            {
905
                "oid": "{0}-gather-data".format(drop_spec["oid"]),
906
                "categoryType": CategoryType.DATA,
907
                "dropclass": "dlg.data.drops.memory.InMemoryDROP",
908
                "name": "gthrdt",
909
                "weight": dw,
910
                "rank": drop_spec["rank"],
911
                "reprodata": self.jd.get("reprodata", {}),
912
            }
913
        )
914
        kwargs = {}
2✔
915
        kwargs["gather-data_drop"] = dropSpec_gather
2✔
916
        kwargs["weight"] = 1
2✔
917
        kwargs["sleep_time"] = 1
2✔
918
        drop_spec.update(kwargs)
2✔
919
        drop_spec.addOutput(dropSpec_gather, name="gthrdata")
2✔
920
        dropSpec_gather.addProducer(drop_spec, name="gthrdata")
2✔
921
        return drop_spec
2✔
922

923
    def _create_listener_drops(self, drop_spec):
2✔
924
        # create socket listener DROP first
UNCOV
925
        drop_spec.update(
×
926
            {
927
                "oid": drop_spec["oid"],
928
                "categoryType": CategoryType.DATA,
929
                "dropclass": "dlg.data.drops.memory.InMemoryDROP",
930
            }
931
        )
932

933
        # additional generated drop
UNCOV
934
        dropSpec_socket = dropdict(
×
935
            {
936
                "oid": "{0}-s".format(drop_spec["oid"]),
937
                "categoryType": CategoryType.APPLICATION,
938
                "category": "PythonApp",
939
                "dropclass": "dlg.apps.simple.SleepApp",
940
                "name": "lstnr",
941
                "weigth": 5,
942
                "sleep_time": 1,
943
                "reprodata": self.jd.get("reprodata", {}),
944
            }
945
        )
946
        # tw -- task weight
UNCOV
947
        dropSpec_socket["autostart"] = 1
×
UNCOV
948
        drop_spec.update({"listener_drop": dropSpec_socket})
×
UNCOV
949
        dropSpec_socket.addOutput(
×
950
            drop_spec, name=self._getPortName(ports="outputPorts")
951
        )
952
        return drop_spec
×
953

954
    def _create_app_drop(self, drop_spec):
2✔
955
        # default generic component becomes "sleep and copy"
956
        if "appclass" in self.jd:
2✔
NEW
957
            app_class = self.jd["appclass"]
×
958
        elif self.dropclass is None or self.dropclass == "":
2✔
959
            logger.debug("No dropclass found in: %s", self)
2✔
960
            app_class = "dlg.apps.simple.SleepApp"
2✔
961
        else:
962
            app_class = self.dropclass
2✔
963
        if self.dropclass == "dlg.apps.simple.SleepApp":
2✔
964
            if self.category == "BashShellApp":
2✔
965
                app_class = "dlg.apps.bash_shell_app.BashShellApp"
2✔
966
            elif self.category == "Docker":
2✔
967
                app_class = "dlg.apps.dockerapp.DockerApp"
2✔
968
            else:
969
                logger.debug("Might be a problem with this node: %s", self.jd)
2✔
970

971
        self.dropclass = app_class
2✔
972
        execTime = self.weight
2✔
973
        self.jd["dropclass"] = app_class
2✔
974
        self.dropclass = app_class
2✔
975
        logger.debug("Creating app drop using class: %s", app_class)
2✔
976
        if self.dropclass is None or self.dropclass == "":
2✔
NEW
977
            logger.warning(f"Something wrong with this node: {self.jd}")
×
978
        if self.weight is not None:
2✔
979
            execTime = self.weight
2✔
980
            if execTime < 0:
2✔
UNCOV
981
                raise GraphException(
×
982
                    "Execution_time must be greater"
983
                    " than 0 for Node '%s'" % self.name
984
                )
985
        else:
UNCOV
986
            execTime = random.randint(3, 8)
×
987
        kwargs = {}
2✔
988
        kwargs["weight"] = execTime
2✔
989
        if app_class == "dlg.apps.simple.SleepApp":
2✔
990
            kwargs["sleep_time"] = execTime
2✔
991

992
        drop_spec.update(
2✔
993
            {
994
                "dropclass": app_class,
995
            }
996
        )
997
        kwargs["num_cpus"] = int(self.jd.get("num_cpus", 1))
2✔
998
        if "mkn" in self.jd:
2✔
UNCOV
999
            kwargs["mkn"] = self.jd["mkn"]
×
1000
        self._update_key_value_attributes(kwargs)
2✔
1001
        drop_spec.update(kwargs)
2✔
1002
        return drop_spec
2✔
1003

1004
    def _create_data_drop(self, drop_spec):
2✔
1005
        # backwards compatibility
1006
        if "dataclass" in self.jd:
2✔
UNCOV
1007
            self.dropclass = self.jd["dataclass"]
×
1008
        if (
2✔
1009
            not hasattr(self, "dropclass")
1010
            or self.dropclass == "dlg.apps.simple.SleepApp"
1011
        ):
1012
            if self.category == "File":
2✔
1013
                self.dropclass = "dlg.data.drops.file.FileDROP"
2✔
1014
            elif self.category == "Memory":
2✔
1015
                self.dropclass = "dlg.data.drops.memory.InMemoryDROP"
2✔
1016
            elif self.category == "SharedMemory":
2✔
1017
                self.dropclass = "dlg.data.drops.memory.SharedMemoryDROP"
2✔
1018
            else:
UNCOV
1019
                raise TypeError("Unknown data class for drop: %s", self.jd)
×
1020
        logger.debug("Creating data drop using class: %s", self.dropclass)
2✔
1021
        kwargs = {}
2✔
1022
        kwargs["dropclass"] = self.dropclass
2✔
1023
        kwargs["weight"] = self.weight
2✔
1024
        if self.is_start_listener:
2✔
UNCOV
1025
            drop_spec = self._create_listener_drops(drop_spec)
×
1026
        drop_spec.update(kwargs)
2✔
1027
        return drop_spec
2✔
1028

1029
    def make_single_drop(self, iid="0", **kwargs):
2✔
1030
        """
1031
        make only one drop from a LG nodes
1032
        one-one mapping
1033

1034
        Dummy implementation as of 09/12/15
1035
        """
1036
        if self.is_loop:
2✔
UNCOV
1037
            return {}
×
1038

1039
        oid, rank = self.make_oid(iid)
2✔
1040
        # default spec
1041
        drop_spec = dropdict(
2✔
1042
            {
1043
                "oid": oid,
1044
                "name": self.name,
1045
                "categoryType": self.categoryType,
1046
                "category": self.category,
1047
                "dropclass": self.dropclass,
1048
                "storage": self.category,
1049
                "rank": rank,
1050
                "reprodata": self.jd.get("reprodata", {}),
1051
            }
1052
        )
1053
        drop_spec.update(kwargs)
2✔
1054
        if self.is_data:
2✔
1055
            drop_spec = self._create_data_drop(drop_spec)
2✔
1056
        elif self.is_app:
2✔
1057
            drop_spec = self._create_app_drop(drop_spec)
2✔
1058
        elif self.category == Categories.GROUP_BY:
2✔
1059
            drop_spec = self._create_groupby_drops(drop_spec)
2✔
1060
        elif self.category == Categories.GATHER:
2✔
1061
            drop_spec = self._create_gather_drops(drop_spec)
2✔
1062
        elif self.is_service or self.is_branch:
2✔
1063
            kwargs["categoryType"] = "Application"
2✔
1064
            self.jd["categoryType"] = "Application"
2✔
1065
            drop_spec = self._create_app_drop(drop_spec)
2✔
1066
        kwargs["iid"] = iid
2✔
1067
        kwargs["lg_key"] = self.id
2✔
1068
        if self.is_branch:
2✔
1069
            kwargs["categoryType"] = "Application"
2✔
1070
        kwargs["name"] = self.name
2✔
1071
        # Behaviour is that child-nodes inherit reproducibility data from their parents.
1072
        if self._reprodata is not None:
2✔
1073
            kwargs["reprodata"] = self._reprodata.copy()
2✔
1074
        drop_spec.update(kwargs)
2✔
1075
        return drop_spec
2✔
1076

1077
    @staticmethod
2✔
1078
    def str_to_bool(value, default_value=False):
2✔
UNCOV
1079
        res = True if value in ["1", "true", "True", "yes"] else default_value
×
UNCOV
1080
        return res
×
1081

1082
    @staticmethod
2✔
1083
    def _mkn_substitution(mkn, value):
2✔
UNCOV
1084
        if "%m" in value:
×
UNCOV
1085
            value = value.replace("%m", str(mkn[0]))
×
UNCOV
1086
        if "%k" in value:
×
UNCOV
1087
            value = value.replace("%k", str(mkn[1]))
×
UNCOV
1088
        if "%n" in value:
×
UNCOV
1089
            value = value.replace("%n", str(mkn[2]))
×
1090

UNCOV
1091
        return value
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc