• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ICRAR / daliuge / 4911681207

pending completion
4911681207

Pull #231

github

GitHub
Merge 9186e10d1 into e48989cce
Pull Request #231: Liu 355

180 of 229 new or added lines in 17 files covered. (78.6%)

26 existing lines in 5 files now uncovered.

15345 of 19059 relevant lines covered (80.51%)

1.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.31
/daliuge-translator/dlg/dropmake/lg_node.py
1
#
2
#    ICRAR - International Centre for Radio Astronomy Research
3
#    (c) UWA - The University of Western Australia, 2015
4
#    Copyright by UWA (in the framework of the ICRAR)
5
#    All rights reserved
6
#
7
#    This library is free software; you can redistribute it and/or
8
#    modify it under the terms of the GNU Lesser General Public
9
#    License as published by the Free Software Foundation; either
10
#    version 2.1 of the License, or (at your option) any later version.
11
#
12
#    This library is distributed in the hope that it will be useful,
13
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
14
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15
#    Lesser General Public License for more details.
16
#
17
#    You should have received a copy of the GNU Lesser General Public
18
#    License along with this library; if not, write to the Free Software
19
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston,
20
#    MA 02111-1307  USA
21
#
22
"""
2✔
23
The DALiuGE resource manager uses the requested logical graphs, the available resources and
24
the profiling information and turns it into the partitioned physical graph,
25
which will then be deployed and monitored by the Physical Graph Manager
26
"""
27

28
if __name__ == "__main__":
2✔
29
    __package__ = "dlg.dropmake"
×
30

31
import logging
2✔
32
import math
2✔
33
import random
2✔
34
import re
2✔
35

36
from dlg.common import CategoryType
2✔
37
from dlg.common import dropdict
2✔
38
from dlg.dropmake.dm_utils import (
2✔
39
    GraphException,
40
    GInvalidLink,
41
    GInvalidNode,
42
)
43
from dlg.dropmake.utils.bash_parameter import BashCommand
2✔
44
from .definition_classes import Categories, DATA_TYPES, APP_TYPES
2✔
45

46
logger = logging.getLogger(__name__)
2✔
47

48

49
class LGNode:
2✔
50
    def __init__(self, jd, group_q, done_dict, ssid):
2✔
51
        """
52
        jd: json_dict (dict)
53
        group_q: group queue (defaultdict)
54
        done_dict: LGNode that have been processed (Dict)
55
        ssid:   session id (string)
56
        """
57
        self.id = jd["key"]  # node ID
2✔
58
        self.jd = jd  # JSON TODO: this should be removed
2✔
59
        self.group_q = group_q  # the group hierarchy queue
2✔
60
        self.group = None  # used if node belongs to group
2✔
61
        self._children = []  # list of LGNode objects, children of this node
2✔
62
        self._ssid = ssid  # session ID
2✔
63
        self.is_app = self.jd["categoryType"] == CategoryType.APPLICATION
2✔
64
        self.is_data = self.jd["categoryType"] == CategoryType.DATA
2✔
65
        self.weight = 1  # try to find the weights, else set to 1
2✔
66
        self._converted = False
2✔
67
        self._h_level = None  # hierarcht level
2✔
68
        self._g_h = None
2✔
69
        self._dop = None  # degree of parallelism
2✔
70
        self._gaw = None
2✔
71
        self._grpw = None
2✔
72
        self._inputs = []  # list of LGNode objects connected to this node
2✔
73
        self._outputs = []  # list of LGNode objects connected to this node
2✔
74
        self.inputPorts = "inputPorts"
2✔
75
        self.outputPorts = "outputPorts"
2✔
76
        logger.debug("%s input_ports: %s", self.name, self.inputPorts)
2✔
77
        logger.debug("%s output_ports: %s", self.name, self.outputPorts)
2✔
78
        self.dropclass = ""  # e.g. dlg.apps.simple.HelloWorldAPP
2✔
79
        self.reprodata = jd.get("reprodata", {}).copy()
2✔
80
        if "isGroup" in jd and jd["isGroup"] is True:
2✔
81
            self.is_group = True
2✔
82
            for wn in group_q[self.id]:
2✔
83
                wn.group = self
2✔
84
                self.add_child(wn)
2✔
85
            group_q.pop(self.id)  # not thread safe
2✔
86
        else:
87
            self.is_group = False
2✔
88

89
        if "group" in jd:
2✔
90
            grp_id = jd["group"]
2✔
91
            if grp_id in done_dict:
2✔
92
                grp_nd = done_dict[grp_id]
2✔
93
                self.group = grp_nd
2✔
94
                grp_nd.add_child(self)
2✔
95
            else:
96
                group_q[grp_id].append(self)
2✔
97

98
        done_dict[self.id] = self
2✔
99

100
    # def __str__(self):
101
    #     return json.dumps(self.jd)
102

103
    @property
2✔
104
    def inputPorts(self):
2✔
105
        return self._inputPorts
×
106

107
    @inputPorts.setter
2✔
108
    def inputPorts(self, value):
2✔
109
        if (
×
110
            "categoryType" in value and value["categoryType"] == "Construct"
111
        ) or ("type" in value and value["type"] == "Construct"):
112
            self._inputPorts = []
×
113
        elif not "inputPorts" in value:
×
114
            self._inputPorts = [
×
115
                f
116
                for f in value["fields"]
117
                if "usage" in f and f["usage"] in ["InputPort", "InOutPort"]
118
            ]
119
            # we need this as long as the fields are still using "name"
120
            if len(self._inputPorts) > 0 and "name" in self._inputPorts[0]:
×
121
                for p in self._inputPorts:
×
122
                    p["name"] = p["name"]
×
123
        else:
124
            self._inputPorts = value["inputPorts"]
×
125

126
    @property
2✔
127
    def outputPorts(self):
2✔
128
        return self._outputPorts
×
129

130
    @outputPorts.setter
2✔
131
    def outputPorts(self, value):
2✔
132
        if (
×
133
            "categoryType" in value and value["categoryType"] == "Construct"
134
        ) or ("type" in value and value["type"] == "Construct"):
135
            self._outputPorts = []
×
136
        elif not "outputPorts" in value:
×
137
            self._outputPorts = [
×
138
                f
139
                for f in value["fields"]
140
                if f["usage"] in ["OutputPort", "InOutPort"]
141
            ]
142
            # we need this as long as the fields are still using "name"
143
            if len(self._outputPorts) > 0 and "name" in self._outputPorts[0]:
×
144
                for p in self._outputPorts:
×
145
                    p["name"] = p["name"]
×
146
        else:
147
            self._outputPorts = value["outputPorts"]
×
148

149
    @property
2✔
150
    def jd(self):
2✔
151
        return self._jd
2✔
152

153
    @jd.setter
2✔
154
    def jd(self, node_json):
2✔
155
        """
156
        Setting he jd property to the original data structure directly loaded
157
        from JSON.
158
        """
159
        if "categoryType" not in node_json:
2✔
160
            if node_json["category"] in APP_TYPES:
2✔
161
                node_json["categoryType"] = CategoryType.APPLICATION
2✔
162
            elif node_json["category"] in DATA_TYPES:
×
163
                node_json["categoryType"] = CategoryType.DATA
×
164
        self._jd = node_json
2✔
165

166
    @property
2✔
167
    def reprodata(self):
2✔
168
        return self._reprodata
×
169

170
    @reprodata.setter
2✔
171
    def reprodata(self, value):
2✔
172
        self._reprodata = value
2✔
173

174
    @property
2✔
175
    def is_group(self):
2✔
176
        return self._is_group
2✔
177

178
    @is_group.setter
2✔
179
    def is_group(self, value):
2✔
180
        self._is_group = value
2✔
181

182
    @property
2✔
183
    def id(self):
2✔
184
        return self._id
2✔
185

186
    @id.setter
2✔
187
    def id(self, value):
2✔
188
        self._id = value
2✔
189

190
    @property
2✔
191
    def nodetype(self):
2✔
UNCOV
192
        return self._nodetype
×
193

194
    @nodetype.setter
2✔
195
    def nodetype(self, value):
2✔
UNCOV
196
        self._nodetype = value
×
197

198
    @property
2✔
199
    def dropclass(self):
2✔
200
        return self._dropclass
2✔
201

202
    @dropclass.setter
2✔
203
    def dropclass(self, default_value):
2✔
204
        self.is_data = False
2✔
205
        self.is_app = False
2✔
206
        keys = []
2✔
207
        value = None
2✔
208
        if default_value is None or len(default_value) == 0:
2✔
209
            default_value = "dlg.apps.simple.SleepApp"
2✔
210
        if self.jd["categoryType"] == CategoryType.DATA:
2✔
211
            self.is_data = True
2✔
212
            keys = [
2✔
213
                "dropclass",
214
                "Data class",
215
                "dataclass",
216
            ]
217
        elif self.jd["categoryType"] == CategoryType.APPLICATION:
2✔
218
            keys = [
2✔
219
                "dropclass",
220
                "Application Class",
221
                "Application class",
222
                "appclass",
223
            ]
224
            self.is_app = True
2✔
225
        elif self.jd["categoryType"] in [
2✔
226
            CategoryType.CONSTRUCT,
227
            CategoryType.CONTROL,
228
        ]:
229
            keys = ["inputApplicationName"]
2✔
230
        elif self.jd["categoryType"] in ["Other"]:
2✔
231
            value = "Other"
2✔
232
        else:
233
            logger.error(
2✔
234
                "Found unknown categoryType: %s", self.jd["categoryType"]
235
            )
236
            # raise ValueError
237
        for key in keys:
2✔
238
            if key in self.jd:
2✔
239
                value = self.jd[key]
2✔
240
                break
2✔
241
            if value is None or value == "":
2✔
242
                value = default_value
2✔
243

244
        self._dropclass = value
2✔
245

246
    @property
2✔
247
    def name(self):
2✔
248
        if self.jd.get("name"):
2✔
249
            # backwards compatibility
250
            # TODO: deprecated
251
            return self.jd.get("name", "")
2✔
252
        else:
253
            return self.jd.get("name", "")
2✔
254

255
    @property
2✔
256
    def category(self):
2✔
257
        return self.jd.get("category", "Unknown")
2✔
258

259
    @property
2✔
260
    def categoryType(self):
2✔
261
        return self.jd.get("categoryType", "Unknown")
2✔
262

263
    @property
2✔
264
    def group(self):
2✔
265
        return self._grp
2✔
266

267
    @group.setter
2✔
268
    def group(self, value):
2✔
269
        self._grp = value
2✔
270

271
    def has_group(self):
2✔
272
        return self.group is not None
2✔
273

274
    def has_converted(self):
2✔
275
        return self._converted
×
276

277
    def complete_conversion(self):
2✔
278
        self._converted = True
×
279

280
    @property
2✔
281
    def gid(self):
2✔
282
        if self.group is None:
2✔
283
            return 0
2✔
284
        else:
285
            return self.group.id
2✔
286

287
    def add_output(self, lg_node):
2✔
288
        if lg_node not in self._outputs:
2✔
289
            self._outputs.append(lg_node)
2✔
290

291
    def add_input(self, lg_node):
2✔
292
        # only add if not already there
293
        # this may happen in nested constructs
294
        if lg_node not in self._inputs:
2✔
295
            self._inputs.append(lg_node)
2✔
296

297
    def add_child(self, lg_node):
2✔
298
        """
299
        Add a group member
300
        """
301
        if (
2✔
302
            lg_node.is_group
303
            and not (lg_node.is_scatter)
304
            and not (lg_node.is_loop)
305
            and not (lg_node.is_groupby)
306
        ):
307
            raise GInvalidNode(
×
308
                "Only Scatters, Loops and GroupBys can be nested, but {0} is neither".format(
309
                    lg_node.id
310
                )
311
            )
312
        self._children.append(lg_node)
2✔
313

314
    @property
2✔
315
    def children(self):
2✔
316
        return self._children
2✔
317

318
    @property
2✔
319
    def outputs(self):
2✔
320
        return self._outputs
×
321

322
    @property
2✔
323
    def inputs(self):
2✔
324
        return self._inputs
2✔
325

326
    @property
2✔
327
    def h_level(self):
2✔
328
        if self._h_level is None:
2✔
329
            _level = 0
2✔
330
            cg = self
2✔
331
            while cg.has_group():
2✔
332
                cg = cg.group
2✔
333
                _level += 1
2✔
334
            if self.is_mpi:
2✔
335
                _level += 1
×
336
            self._h_level = _level
2✔
337
        return self._h_level
2✔
338

339
    @property
2✔
340
    def group_hierarchy(self):
2✔
341
        if self._g_h is None:
2✔
342
            glist = []
2✔
343
            cg = self
2✔
344
            while cg.has_group():
2✔
345
                glist.append(str(cg.gid))
2✔
346
                cg = cg.group
2✔
347
            glist.append("0")
2✔
348
            self._g_h = "/".join(reversed(glist))
2✔
349
        return self._g_h
2✔
350

351
    @property
2✔
352
    def weight(self):
2✔
353
        return self._weight
2✔
354

355
    @weight.setter
2✔
356
    def weight(self, default_value):
2✔
357
        key = []
2✔
358
        if self.is_app:
2✔
359
            key = [
2✔
360
                k
361
                for k in self.jd
362
                if re.match(r"execution[\s\_]time", k.lower())
363
            ]
364
        elif self.is_data:
2✔
365
            key = [
2✔
366
                k for k in self.jd if re.match(r"data[\s\_]volume", k.lower())
367
            ]
368
        try:
2✔
369
            self._weight = int(self.jd[key[0]])
2✔
370
        except (KeyError, ValueError, IndexError):
2✔
371
            self._weight = int(default_value)
2✔
372

373
    @property
2✔
374
    def has_child(self):
2✔
375
        return len(self._children) > 0
×
376

377
    @property
2✔
378
    def has_output(self):
2✔
379
        return len(self._outputs) > 0
×
380

381
    @property
2✔
382
    def is_data(self):
2✔
383
        return self._is_data
2✔
384

385
    @is_data.setter
2✔
386
    def is_data(self, value):
2✔
387
        self._is_data = value
2✔
388

389
    @property
2✔
390
    def is_app(self):
2✔
391
        return self._is_app
2✔
392
        # return self.jd["categoryType"] == CategoryType.APPLICATION
393

394
    @is_app.setter
2✔
395
    def is_app(self, value):
2✔
396
        self._is_app = value
2✔
397

398
    @property
2✔
399
    def is_start_node(self):
2✔
400
        return self.jd["category"] == Categories.START
2✔
401

402
    @property
2✔
403
    def is_end_node(self):
2✔
404
        return self.jd["category"] == Categories.END
×
405

406
    @property
2✔
407
    def is_start(self):
2✔
408
        return not self.has_group()
2✔
409

410
    @property
2✔
411
    def is_dag_root(self):
2✔
412
        leng = len(self.inputs)
×
413
        if leng > 1:
×
414
            return False
×
415
        elif leng == 0:
×
416
            if self.is_start_node:
×
417
                return False
×
418
            else:
419
                return True
×
420
        elif self.is_start:
×
421
            return True
×
422
        else:
423
            return False
×
424

425
    @property
2✔
426
    def is_start_listener(self):
2✔
427
        """
428
        is this a socket listener node
429
        """
430
        return len(self.inputs) == 1 and self.is_start_node and self.is_data
2✔
431

432
    @property
2✔
433
    def is_group_start(self):
2✔
434
        """
435
        is this a node starting a group (usually inside a loop)
436
        """
437
        result = False
2✔
438
        if self.has_group() and (
2✔
439
            "group_start" in self.jd
440
            or "Group start" in self.jd
441
            or "Group Start" in self.jd
442
        ):
443
            gs = (
2✔
444
                self.jd.get("group_start", False)
445
                if "group_start" in self.jd
446
                else self.jd.get("Group start", False)
447
            )
448
            if type(gs) == type(True):
2✔
449
                result = gs
2✔
450
            elif type(gs) in [type(1), type(1.0)]:
2✔
451
                result = 1 == gs
2✔
452
            elif type(gs) == type("s"):
2✔
453
                result = gs.lower() in ("true", "1")
2✔
454
        return result
2✔
455

456
    @property
2✔
457
    def is_group_end(self):
2✔
458
        """
459
        is this a node ending a group (usually inside a loop)
460
        """
461
        result = False
2✔
462
        if self.has_group() and (
2✔
463
            "group_end" in self.jd
464
            or "Group end" in self.jd
465
            or "Group End" in self.jd
466
        ):
467
            ge = (
2✔
468
                self.jd.get("group_end", False)
469
                if "group_end" in self.jd
470
                else self.jd.get("Group end", False)
471
            )
472
            if type(ge) == type(True):
2✔
473
                result = ge
2✔
474
            elif type(ge) in [type(1), type(1.0)]:
2✔
475
                result = 1 == ge
×
476
            elif type(ge) == type("s"):
2✔
477
                result = ge.lower() in ("true", "1")
2✔
478
        return result
2✔
479

480
    @property
2✔
481
    def is_scatter(self):
2✔
482
        return self.is_group and self._jd["category"] == Categories.SCATTER
2✔
483

484
    @property
2✔
485
    def is_gather(self):
2✔
486
        return self._jd["category"] == Categories.GATHER
2✔
487

488
    @property
2✔
489
    def is_loop(self):
2✔
490
        return self.is_group and self._jd["category"] == Categories.LOOP
2✔
491

492
    @property
2✔
493
    def is_service(self):
2✔
494
        """
495
        Determines whether a node the parent service node (not the input application)
496
        """
497
        return self._jd["category"] == Categories.SERVICE
2✔
498

499
    @property
2✔
500
    def is_groupby(self):
2✔
501
        return self._jd["category"] == Categories.GROUP_BY
2✔
502

503
    @property
2✔
504
    def is_branch(self):
2✔
505
        # This is the only special thing required for a branch
506
        return self._jd["category"] == Categories.BRANCH
2✔
507

508
    @property
2✔
509
    def is_mpi(self):
2✔
510
        return self._jd["category"] == Categories.MPI
2✔
511

512
    @property
2✔
513
    def group_keys(self):
2✔
514
        """
515
        Return:
516
            None or a list of keys (each key is an integer)
517
        """
518
        if not self.is_groupby:
2✔
519
            return None
2✔
520
        val = str(self.jd.get("group_key", "None"))
2✔
521
        if val in ["None", "-1", ""]:
2✔
522
            return None
2✔
523
        else:
524
            try:
×
525
                return [int(x) for x in val.split(",")]
×
526
            except ValueError as ve:
×
527
                raise GraphException(
×
528
                    "group_key must be an integer or comma-separated integers: {0}".format(
529
                        ve
530
                    )
531
                )
532

533
    @property
2✔
534
    def inputPorts(self):
2✔
535
        return self._inputPorts
2✔
536

537
    @inputPorts.setter
2✔
538
    def inputPorts(self, port="inputPorts"):
2✔
539
        self._inputPorts = self._getPortName(ports="inputPorts", index=0)
2✔
540

541
    @property
2✔
542
    def outputPorts(self):
2✔
543
        return self._outputPorts
2✔
544

545
    @outputPorts.setter
2✔
546
    def outputPorts(self, port="outputPorts"):
2✔
547
        self._outputPorts = self._getPortName(ports="outputPorts", index=0)
2✔
548

549
    @property
2✔
550
    def gather_width(self):
2✔
551
        """
552
        Gather width
553
        """
554
        if self.is_gather:
2✔
555
            if self._gaw is None:
2✔
556
                try:
2✔
557
                    self._gaw = int(self.jd["num_of_inputs"])
2✔
558
                except:
×
559
                    self._gaw = 1
×
560
            return self._gaw
2✔
561
        else:
562
            """
563
            TODO: use OO style to replace all type-related statements!
564
            """
565
            return None
×
566

567
    @property
2✔
568
    def groupby_width(self):
2✔
569
        """
570
        GroupBy count
571
        """
572
        if self.is_groupby:
2✔
573
            if self._grpw is None:
2✔
574
                tlgn = self.inputs[0]
2✔
575
                re_dop = 1
2✔
576
                cg = tlgn.group  # exclude its own group
2✔
577
                while cg.has_group():
2✔
578
                    re_dop *= cg.group.dop
2✔
579
                    cg = cg.group
2✔
580
                self._grpw = re_dop
2✔
581
            return self._grpw
2✔
582
        else:
583
            return None
×
584

585
    @property
2✔
586
    def group_by_scatter_layers(self):
2✔
587
        """
588
        Return:
589
            scatter layers info associated with this group by logical node
590
            A tuple of three items:
591
                (1) DoP
592
                (2) layer indexes (list) from innser scatter to outer scatter
593
                (3) layers (list)
594
        """
595
        if not self.is_groupby:
2✔
596
            return None
×
597

598
        tlgn = self.inputs[0]
2✔
599
        grpks = self.group_keys
2✔
600
        ret_dop = 1
2✔
601
        layer_index = []  # from inner to outer
2✔
602
        layers = []  # from inner to outer
2✔
603
        c = 0
2✔
604
        if tlgn.group.is_groupby:
2✔
605
            # group by followed by another group by
606
            if grpks is None or len(grpks) < 1:
×
607
                raise GInvalidNode(
×
608
                    "Must specify group_key for Group By '{0}'".format(
609
                        self.name
610
                    )
611
                )
612
            # find the "root" groupby and get all of its scatters
613
            inputgrp = self
×
614
            while (inputgrp is not None) and inputgrp.inputs[
×
615
                0
616
            ].group.is_groupby:
617
                inputgrp = inputgrp.inputs[0].group
×
618
            # inputgrp now is the "root" groupby that follows Scatter immiately
619
            # move it to Scatter
620
            inputgrp = inputgrp.inputs[0].group
×
621
            # go thru all the scatters
622
            while (inputgrp is not None) and inputgrp.is_scatter:
×
623
                if inputgrp.id in grpks:
×
624
                    ret_dop *= inputgrp.dop
×
625
                    layer_index.append(c)
×
626
                    layers.append(inputgrp)
×
627
                inputgrp = inputgrp.group
×
628
                c += 1
×
629
        else:
630
            if grpks is None or len(grpks) < 1:
2✔
631
                ret_dop = tlgn.group.dop
2✔
632
                layer_index.append(0)
2✔
633
                layers.append(tlgn.group)
2✔
634
            else:
635
                if len(grpks) == 1:
×
636
                    if grpks[0] == tlgn.group.id:
×
637
                        ret_dop = tlgn.group.dop
×
638
                        layer_index.append(0)
×
639
                        layers.append(tlgn.group)
×
640
                    else:
641
                        raise GInvalidNode(
×
642
                            "Wrong single group_key for {0}".format(self.name)
643
                        )
644
                else:
645
                    inputgrp = tlgn.group
×
646
                    # find the "groupby column list" from all layers of scatter loops
647
                    while (inputgrp is not None) and inputgrp.is_scatter:
×
648
                        if inputgrp.id in grpks:
×
649
                            ret_dop *= inputgrp.dop
×
650
                            layer_index.append(c)
×
651
                            layers.append(inputgrp)
×
652
                        inputgrp = inputgrp.group
×
653
                        c += 1
×
654

655
        return ret_dop, layer_index, layers
2✔
656

657
    @property
2✔
658
    def dop(self):
2✔
659
        """
660
        Degree of Parallelism:  integer
661
        default:    1
662
        """
663
        if self._dop is None:
2✔
664
            if self.is_group:
2✔
665
                if self.is_scatter:
2✔
666
                    for kw in [
2✔
667
                        "num_of_copies",
668
                        "num_of_splits",
669
                        "Number of copies",
670
                    ]:
671
                        if kw in self.jd:
2✔
672
                            self._dop = int(self.jd[kw])
2✔
673
                            break
2✔
674
                    if self._dop is None:
2✔
675
                        self._dop = 4  # dummy impl. TODO: Why is this here?
×
676
                elif self.is_gather:
2✔
677
                    try:
2✔
678
                        tlgn = self.inputs[0]
2✔
679
                    except IndexError:
×
680
                        raise GInvalidLink(
×
681
                            "Gather '{0}' does not have input!".format(self.id)
682
                        )
683
                    if tlgn.is_groupby:
2✔
684
                        tt = tlgn.dop
2✔
685
                    else:
686
                        tt = self.dop_diff(tlgn)
2✔
687
                    self._dop = int(math.ceil(tt / float(self.gather_width)))
2✔
688
                elif self.is_groupby:
2✔
689
                    self._dop = self.group_by_scatter_layers[0]
2✔
690
                elif self.is_loop:
2✔
691
                    for key in [
2✔
692
                        "num_of_iter",
693
                        "Number of Iterations",
694
                        "Number of loops",
695
                    ]:
696
                        if key in self.jd:
2✔
697
                            self._dop = int(self.jd.get(key, 1))
2✔
698
                            break
2✔
699
                elif self.is_service:
×
700
                    self._dop = 1  # TODO: number of compute nodes
×
701
                else:
702
                    raise GInvalidNode(
×
703
                        "Unrecognised (Group) Logical Graph Node: '{0}'".format(
704
                            self._jd["category"]
705
                        )
706
                    )
707
            elif self.is_mpi:
2✔
708
                self._dop = int(self.jd["num_of_procs"])
2✔
709
            else:
710
                self._dop = 1
×
711
        return self._dop
2✔
712

713
    def dop_diff(self, that_lgn):
2✔
714
        """
715
        TODO: This does not belong in the LGNode class
716

717
        dop difference between inner node/group and outer group
718
        e.g for each outer group, how many instances of inner nodes/groups
719
        """
720
        # if (self.is_group() or that_lgn.is_group()):
721
        #     raise GraphException("Cannot compute dop diff between groups.")
722
        # don't check h_related for efficiency since it should have been checked
723
        # if (self.h_related(that_lgn)):
724
        il = self.h_level
2✔
725
        al = that_lgn.h_level
2✔
726
        if il == al:
2✔
727
            return 1
2✔
728
        elif il > al:
2✔
729
            oln = that_lgn
2✔
730
            iln = self
2✔
731
        else:
732
            iln = that_lgn
2✔
733
            oln = self
2✔
734
        re_dop = 1
2✔
735
        cg = iln
2✔
736
        init_cond = cg.gid != oln.gid and cg.has_group()
2✔
737
        while init_cond or cg.is_mpi:
2✔
738
            if cg.is_mpi:
2✔
739
                re_dop *= cg.dop
×
740
            # else:
741
            if init_cond:
2✔
742
                re_dop *= cg.group.dop
2✔
743
            cg = cg.group
2✔
744
            if cg is None:
2✔
745
                break
×
746
            init_cond = cg.gid != oln.gid and cg.has_group()
2✔
747
        return re_dop
2✔
748
        # else:
749
        #     pass
750
        # raise GInvalidLink("{0} and {1} are not hierarchically related".format(self.id, that_lgn.id))
751

752
    def h_related(self, that_lgn):
2✔
753
        """
754
        TODO: This does not belong in the LGNode class
755
        """
756
        that_gh = that_lgn.group_hierarchy
2✔
757
        this_gh = self.group_hierarchy
2✔
758
        if len(that_gh) + len(this_gh) <= 1:
2✔
759
            # at least one is at the root level
760
            return True
×
761

762
        return that_gh.find(this_gh) > -1 or this_gh.find(that_gh) > -1
2✔
763

764
    def make_oid(self, iid="0"):
2✔
765
        """
766
        return:
767
            ssid_id_iid (string), where
768
            ssid:   session id
769
            id:     logical graph node key
770
            iid:    instance id (for the physical graph node)
771
        """
772
        # TODO: This is rather ugly, but a quick and dirty fix. The iid is the rank data we need
773
        rank = [int(x) for x in iid.split("/")]
2✔
774
        return "{0}_{1}_{2}".format(self._ssid, self.id, iid), rank
2✔
775

776
    def _update_key_value_attributes(self, kwargs):
2✔
777
        """
778
        get all the arguments from new fields dictionary in a backwards compatible way
779
        """
780
        if "fields" in self.jd:
2✔
781
            self.jd.update({"nodeAttributes": {}})
2✔
782
            kwargs.update({"nodeAttributes": {}})
2✔
783
            for je in self.jd["fields"]:
2✔
784
                # The field to be used is not the text, but the name field
785
                self.jd[je["name"]] = je["value"]
2✔
786
                kwargs[je["name"]] = je["value"]
2✔
787
                self.jd["nodeAttributes"].update({je["name"]: je})
2✔
788
                kwargs["nodeAttributes"].update({je["name"]: je})
2✔
789
        kwargs[
2✔
790
            "applicationArgs"
791
        ] = {}  # make sure the dict always exists downstream
792
        if "applicationArgs" in self.jd:  # and fill it if provided
2✔
793
            for je in self.jd["applicationArgs"]:
×
794
                j = {je["name"]: {k: je[k] for k in je if k not in ["name"]}}
×
795
                self.jd.update(j)
×
796
                kwargs["applicationArgs"].update(j)
×
797
        if "nodeAttributes" not in kwargs:
2✔
798
            kwargs.update({"nodeAttributes": {}})
×
799
        for k, na in kwargs["nodeAttributes"].items():
2✔
800
            if (
2✔
801
                "parameterType" in na
802
                and na["parameterType"] == "ApplicationArgument"
803
            ):
804
                kwargs["applicationArgs"].update({k: na})
2✔
805
        # NOTE: drop Argxx keywords
806

807
    def _getPortName(
2✔
808
        self, ports: str = "outputPorts", index: int = 0, portId=None
809
    ):
810
        """
811
        Return name of port if it exists
812
        """
813
        port_selector = {
2✔
814
            "inputPorts": ["InputPort", "InputOutput"],
815
            "outputPorts": ["OutputPort", "InputOutput"],
816
        }
817
        ports_dict = {}
2✔
818
        name = None
2✔
819
        # if portId is None and index >= 0:
820
        if index >= 0:
2✔
821
            if ports in port_selector:
2✔
822
                for field in self.jd["fields"]:
2✔
823
                    if "usage" not in field:  # fixes manual graphs
2✔
824
                        continue
2✔
825
                    if field["usage"] in port_selector[ports]:
2✔
826
                        if portId is None:
2✔
827
                            name = field["name"]
2✔
828
                        elif field["id"] == portId:
2✔
829
                            name = field["name"]
2✔
830
                        # can't be sure that name is unique
831
                        if name not in ports_dict:
2✔
832
                            ports_dict[name] = [field["id"]]
2✔
833
                        else:
834
                            ports_dict[name].append(field["id"])
2✔
835
        else:
836
            # TODO: This is not really correct, but maybe not needed at all?
837
            for port in port_selector[ports]:
×
NEW
838
                name = [
×
839
                    p["name"]
840
                    for p in self.jd[port]
841
                    if port in self.jd and p["Id"] == portId
842
                ]
NEW
843
                name = name[0] if len(name) > 0 else None
×
NEW
844
                if name is not None:
×
UNCOV
845
                    break
×
846
        return name if index >= 0 else ports_dict
2✔
847

848
    def _create_groupby_drops(self, drop_spec):
2✔
849
        drop_spec.update(
2✔
850
            {
851
                "dropclass": "dlg.apps.simple.SleepApp",
852
                "categoryType": "Application",
853
            }
854
        )
855
        sij = self.inputs[0]
2✔
856
        if not sij.is_data:
2✔
NEW
857
            raise GInvalidNode(
×
858
                "GroupBy should be connected to a DataDrop, not '%s'"
859
                % sij.category
860
            )
861
        dw = sij.weight * self.groupby_width
2✔
862

863
        # additional generated drop
864
        dropSpec_grp = dropdict(
2✔
865
            {
866
                "oid": "{0}-grp-data".format(drop_spec["oid"]),
867
                "categoryType": CategoryType.DATA,
868
                "dropclass": "dlg.data.drops.memory.InMemoryDROP",
869
                "name": "grpdata",
870
                "weight": dw,
871
                "rank": drop_spec["rank"],
872
                "reprodata": self.jd.get("reprodata", {}),
873
            }
874
        )
875
        kwargs = {}
2✔
876
        kwargs["grp-data_drop"] = dropSpec_grp
2✔
877
        kwargs[
2✔
878
            "weight"
879
        ] = 1  # barrier literarlly takes no time for its own computation
880
        kwargs["sleep_time"] = 1
2✔
881
        drop_spec.update(kwargs)
2✔
882
        drop_spec.addOutput(dropSpec_grp, name="grpdata")
2✔
883
        dropSpec_grp.addProducer(drop_spec, name="grpdata")
2✔
884
        return drop_spec
2✔
885

886
    def _create_gather_drops(self, drop_spec):
2✔
887
        drop_spec.update(
2✔
888
            {
889
                "dropclass": "dlg.apps.simple.SleepApp",
890
                "categoryType": "Application",
891
            }
892
        )
893
        gi = self.inputs[0]
2✔
894
        if gi.is_groupby:
2✔
895
            gii = gi.inputs[0]
2✔
896
            dw = (
2✔
897
                int(gii.jd["data_volume"])
898
                * gi.groupby_width
899
                * self.gather_width
900
            )
901
        else:  # data
902
            dw = gi.weight * self.gather_width
2✔
903

904
            # additional generated drop
905
        dropSpec_gather = dropdict(
2✔
906
            {
907
                "oid": "{0}-gather-data".format(drop_spec["oid"]),
908
                "categoryType": CategoryType.DATA,
909
                "dropclass": "dlg.data.drops.memory.InMemoryDROP",
910
                "name": "gthrdt",
911
                "weight": dw,
912
                "rank": drop_spec["rank"],
913
                "reprodata": self.jd.get("reprodata", {}),
914
            }
915
        )
916
        kwargs = {}
2✔
917
        kwargs["gather-data_drop"] = dropSpec_gather
2✔
918
        kwargs["weight"] = 1
2✔
919
        kwargs["sleep_time"] = 1
2✔
920
        drop_spec.update(kwargs)
2✔
921
        drop_spec.addOutput(dropSpec_gather, name="gthrdata")
2✔
922
        dropSpec_gather.addProducer(drop_spec, name="gthrdata")
2✔
923
        return drop_spec
2✔
924

925
    def _create_listener_drops(self, drop_spec):
2✔
926
        # create socket listener DROP first
NEW
927
        drop_spec.update(
×
928
            {
929
                "oid": drop_spec["oid"],
930
                "categoryType": CategoryType.DATA,
931
                "dropclass": "dlg.data.drops.memory.InMemoryDROP",
932
            }
933
        )
934

935
        # additional generated drop
NEW
936
        dropSpec_socket = dropdict(
×
937
            {
938
                "oid": "{0}-s".format(drop_spec["oid"]),
939
                "categoryType": CategoryType.APPLICATION,
940
                "category": "PythonApp",
941
                "dropclass": "dlg.apps.simple.SleepApp",
942
                "name": "lstnr",
943
                "weigth": 5,
944
                "sleep_time": 1,
945
                "reprodata": self.jd.get("reprodata", {}),
946
            }
947
        )
948
        # tw -- task weight
NEW
949
        dropSpec_socket["autostart"] = 1
×
NEW
950
        drop_spec.update({"listener_drop": dropSpec_socket})
×
NEW
951
        dropSpec_socket.addOutput(
×
952
            drop_spec, name=self._getPortName(ports="outputPorts")
953
        )
NEW
954
        return drop_spec
×
955

956
    def _create_app_drop(self, drop_spec):
2✔
957
        # default generic component becomes "sleep and copy"
958
        kwargs = {}
2✔
959
        if "appclass" in self.jd:
2✔
NEW
960
            app_class = self.jd["appclass"]
×
961
        elif self.dropclass is None or self.dropclass == "":
2✔
962
            logger.debug("No dropclass found in: %s", self)
2✔
963
            app_class = "dlg.apps.simple.SleepApp"
2✔
964
        else:
965
            app_class = self.dropclass
2✔
966
        if self.dropclass == "dlg.apps.simple.SleepApp":
2✔
967
            if self.category == "BashShellApp":
2✔
968
                app_class = "dlg.apps.bash_shell_app.BashShellApp"
2✔
969
            elif self.category == "Docker":
2✔
970
                app_class = "dlg.apps.dockerapp.DockerApp"
2✔
971
                drop_spec["name"] = self.jd["command"]
2✔
972
            else:
973
                logger.debug("Might be a problem with this node: %s", self.jd)
2✔
974

975
        self.dropclass = app_class
2✔
976
        execTime = self.weight
2✔
977
        self.jd["dropclass"] = app_class
2✔
978
        self.dropclass = app_class
2✔
979
        logger.debug(
2✔
980
            "Creating app drop using class: %s, %s",
981
            app_class,
982
            drop_spec["name"],
983
        )
984
        if self.dropclass is None or self.dropclass == "":
2✔
NEW
985
            logger.warning(f"Something wrong with this node: {self.jd}")
×
986
        if self.weight is not None:
2✔
987
            execTime = self.weight
2✔
988
            if execTime < 0:
2✔
NEW
989
                raise GraphException(
×
990
                    "Execution_time must be greater"
991
                    " than 0 for Node '%s'" % self.name
992
                )
993
        else:
NEW
994
            execTime = random.randint(3, 8)
×
995
        kwargs["weight"] = execTime
2✔
996
        if app_class == "dlg.apps.simple.SleepApp":
2✔
997
            kwargs["sleep_time"] = execTime
2✔
998

999
        kwargs["dropclass"] = app_class
2✔
1000
        kwargs["num_cpus"] = int(self.jd.get("num_cpus", 1))
2✔
1001
        if "mkn" in self.jd:
2✔
NEW
1002
            kwargs["mkn"] = self.jd["mkn"]
×
1003
        drop_spec.update(kwargs)
2✔
1004
        return drop_spec
2✔
1005

1006
    def _create_data_drop(self, drop_spec):
2✔
1007
        # backwards compatibility
1008
        kwargs = {}
2✔
1009
        if "dataclass" in self.jd:
2✔
NEW
1010
            self.dropclass = self.jd["dataclass"]
×
1011
        # Backwards compatibility
1012
        if (
2✔
1013
            not hasattr(self, "dropclass")
1014
            or self.dropclass == "dlg.apps.simple.SleepApp"
1015
        ):
1016
            if self.category == "File":
2✔
1017
                self.dropclass = "dlg.data.drops.file.FileDROP"
2✔
1018
            elif self.category == "Memory":
2✔
1019
                self.dropclass = "dlg.data.drops.memory.InMemoryDROP"
2✔
1020
            elif self.category == "SharedMemory":
2✔
1021
                self.dropclass = "dlg.data.drops.memory.SharedMemoryDROP"
2✔
NEW
1022
            elif self.category == "S3":
×
NEW
1023
                self.dropclass = "dlg.data.drops.s3_drop.S3DROP"
×
NEW
1024
            elif self.category == "NGAS":
×
NEW
1025
                self.dropclass = "dlg.data.drops.ngas.NgasDROP"
×
1026
            else:
NEW
1027
                raise TypeError("Unknown dropclass for drop: %s", self.jd)
×
1028
        logger.debug("Creating data drop using class: %s", self.dropclass)
2✔
1029
        kwargs["dropclass"] = self.dropclass
2✔
1030
        kwargs["weight"] = self.weight
2✔
1031
        if self.is_start_listener:
2✔
NEW
1032
            drop_spec = self._create_listener_drops(drop_spec)
×
1033
        drop_spec.update(kwargs)
2✔
1034
        return drop_spec
2✔
1035

1036
    def make_single_drop(self, iid="0", **kwargs):
2✔
1037
        """
1038
        make only one drop from a LG nodes
1039
        one-one mapping
1040

1041
        Dummy implementation as of 09/12/15
1042
        """
1043
        if self.is_loop:
2✔
NEW
1044
            return {}
×
1045

1046
        oid, rank = self.make_oid(iid)
2✔
1047
        # default spec
1048
        drop_spec = dropdict(
2✔
1049
            {
1050
                "oid": oid,
1051
                "name": self.name,
1052
                "categoryType": self.categoryType,
1053
                "category": self.category,
1054
                "dropclass": self.dropclass,
1055
                "storage": self.category,
1056
                "rank": rank,
1057
                "reprodata": self.jd.get("reprodata", {}),
1058
            }
1059
        )
1060
        drop_spec.update(kwargs)
2✔
1061
        if self.is_data:
2✔
1062
            drop_spec = self._create_data_drop(drop_spec)
2✔
1063
        elif self.is_app:
2✔
1064
            drop_spec = self._create_app_drop(drop_spec)
2✔
1065
        elif self.category == Categories.GROUP_BY:
2✔
1066
            drop_spec = self._create_groupby_drops(drop_spec)
2✔
1067
        elif self.category == Categories.GATHER:
2✔
1068
            drop_spec = self._create_gather_drops(drop_spec)
2✔
1069
        elif self.is_service or self.is_branch:
2✔
1070
            kwargs["categoryType"] = "Application"
2✔
1071
            self.jd["categoryType"] = "Application"
2✔
1072
            drop_spec = self._create_app_drop(drop_spec)
2✔
1073
        self._update_key_value_attributes(kwargs)
2✔
1074
        kwargs["iid"] = iid
2✔
1075
        kwargs["lg_key"] = self.id
2✔
1076
        if self.is_branch:
2✔
1077
            kwargs["categoryType"] = "Application"
2✔
1078
        kwargs["name"] = self.name
2✔
1079
        # Behaviour is that child-nodes inherit reproducibility data from their parents.
1080
        if self._reprodata is not None:
2✔
1081
            kwargs["reprodata"] = self._reprodata.copy()
2✔
1082
        drop_spec.update(kwargs)
2✔
1083
        return drop_spec
2✔
1084

1085
    @staticmethod
2✔
1086
    def str_to_bool(value, default_value=False):
2✔
UNCOV
1087
        res = True if value in ["1", "true", "True", "yes"] else default_value
×
UNCOV
1088
        return res
×
1089

1090
    @staticmethod
2✔
1091
    def _mkn_substitution(mkn, value):
2✔
1092
        if "%m" in value:
×
1093
            value = value.replace("%m", str(mkn[0]))
×
1094
        if "%k" in value:
×
1095
            value = value.replace("%k", str(mkn[1]))
×
1096
        if "%n" in value:
×
1097
            value = value.replace("%n", str(mkn[2]))
×
1098

1099
        return value
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc