• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ICRAR / daliuge / 4911681207

pending completion
4911681207

Pull #231

github

GitHub
Merge 9186e10d1 into e48989cce
Pull Request #231: Liu 355

180 of 229 new or added lines in 17 files covered. (78.6%)

26 existing lines in 5 files now uncovered.

15345 of 19059 relevant lines covered (80.51%)

1.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.41
/daliuge-translator/dlg/dropmake/lg.py
1
#
2
#    ICRAR - International Centre for Radio Astronomy Research
3
#    (c) UWA - The University of Western Australia, 2015
4
#    Copyright by UWA (in the framework of the ICRAR)
5
#    All rights reserved
6
#
7
#    This library is free software; you can redistribute it and/or
8
#    modify it under the terms of the GNU Lesser General Public
9
#    License as published by the Free Software Foundation; either
10
#    version 2.1 of the License, or (at your option) any later version.
11
#
12
#    This library is distributed in the hope that it will be useful,
13
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
14
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15
#    Lesser General Public License for more details.
16
#
17
#    You should have received a copy of the GNU Lesser General Public
18
#    License along with this library; if not, write to the Free Software
19
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston,
20
#    MA 02111-1307  USA
21
#
22
"""
2✔
23
The DALiuGE resource manager uses the requested logical graphs, the available resources and
24
the profiling information and turns it into the partitioned physical graph,
25
which will then be deployed and monitored by the Physical Graph Manager
26
"""
27

28
if __name__ == "__main__":
2✔
29
    __package__ = "dlg.dropmake"
×
30

31
import collections
2✔
32
import datetime
2✔
33
import logging
2✔
34
import time
2✔
35
from itertools import product
2✔
36
from dataclasses import asdict
2✔
37

38
import numpy as np
2✔
39
from dlg.common import CategoryType
2✔
40
from dlg.common import dropdict
2✔
41
from dlg.dropmake.dm_utils import (
2✔
42
    LG_APPREF,
43
    getNodesKeyDict,
44
    get_lg_ver_type,
45
    convert_construct,
46
    convert_fields,
47
    convert_mkn,
48
    LG_VER_EAGLE,
49
    LG_VER_EAGLE_CONVERTED,
50
    GraphException,
51
    GInvalidLink,
52
    GInvalidNode,
53
    load_lg,
54
)
55
from .definition_classes import Categories
2✔
56
from .lg_node import LGNode
2✔
57

58
logger = logging.getLogger(__name__)
2✔
59

60

61
class LG:
2✔
62
    """
63
    An object representation of a Logical Graph
64

65
    TODO: This is a lot more than just a LG class,
66
    it is doing all the conversion inside __init__
67
    """
68

69
    def __init__(self, f, ssid=None):
2✔
70
        """
71
        parse JSON into LG object graph first
72
        """
73
        self._g_var = []
2✔
74
        lg = load_lg(f)
2✔
75
        if ssid is None:
2✔
76
            ts = time.time()
2✔
77
            ssid = datetime.datetime.fromtimestamp(ts).strftime(
2✔
78
                "%Y-%m-%dT%H:%M:%S"
79
            )
80
        self._session_id = ssid
2✔
81
        self._loop_aware_set = set()
2✔
82

83
        # key - gather drop oid, value - a tuple with two elements
84
        # input drops list and output drops list
85
        self._gather_cache = dict()
2✔
86

87
        lgver = get_lg_ver_type(lg)
2✔
88
        logger.info("Loading graph: %s", lg["modelData"]["filePath"])
2✔
89
        logger.info("Found LG version: %s", lgver)
2✔
90

91
        if LG_VER_EAGLE == lgver:
2✔
92
            lg = convert_mkn(lg)
2✔
93
            lg = convert_fields(lg)
2✔
94
            lg = convert_construct(lg)
2✔
95
        elif LG_VER_EAGLE_CONVERTED == lgver:
×
96
            lg = convert_construct(lg)
×
97
        elif LG_APPREF == lgver:
×
98
            lg = convert_fields(lg)
×
99
            lgk = getNodesKeyDict(lg)
×
100
        # This ensures that future schema version mods are catched early
101
        else:
102
            raise GraphException(
×
103
                "Logical graph version '{0}' not supported!".format(lgver)
104
            )
105
        self._done_dict = dict()
2✔
106
        self._group_q = collections.defaultdict(list)
2✔
107
        self._output_q = collections.defaultdict(list)
2✔
108
        self._start_list = []
2✔
109
        self._lgn_list = []
2✔
110
        stream_output_ports = dict()  # key - port_id, value - construct key
2✔
111
        for jd in lg["nodeDataArray"]:
2✔
112
            lgn = LGNode(jd, self._group_q, self._done_dict, ssid)
2✔
113
            self._lgn_list.append(lgn)
2✔
114
            node_ouput_ports = jd.get("outputPorts", [])
2✔
115
            node_ouput_ports += jd.get("outputLocalPorts", [])
2✔
116
            # check all the outports of this node, and store "stream" output
117
            if len(node_ouput_ports) > 0:
2✔
118
                for out_port in node_ouput_ports:
×
119
                    if out_port.get("name", "").lower().endswith("stream"):
×
120
                        stream_output_ports[out_port["Id"]] = jd["key"]
×
121
        # Need to go through the list again, since done_dict is recursive
122
        for lgn in self._lgn_list:
2✔
123
            if lgn.is_start and lgn.jd["category"] not in [
2✔
124
                Categories.COMMENT,
125
                Categories.DESCRIPTION,
126
            ]:
127
                if lgn.jd["category"] == Categories.VARIABLES:
2✔
128
                    self._g_var.append(lgn)
×
129
                else:
130
                    self._start_list.append(lgn)
2✔
131

132
        self._lg_links = lg["linkDataArray"]
2✔
133

134
        for lk in self._lg_links:
2✔
135
            src = self._done_dict[lk["from"]]
2✔
136
            tgt = self._done_dict[lk["to"]]
2✔
137
            self.validate_link(src, tgt)
2✔
138
            src.add_output(tgt)
2✔
139
            tgt.add_input(src)
2✔
140
            # check stream links
141
            from_port = lk.get("fromPort", "__None__")
2✔
142
            if stream_output_ports.get(from_port, None) == lk["from"]:
2✔
143
                lk["is_stream"] = True
×
144
                logger.debug(
×
145
                    "Found stream from %s to %s", lk["from"], lk["to"]
146
                )
147
            else:
148
                lk["is_stream"] = False
2✔
149
            if "1" == lk.get("loop_aware", "0"):
2✔
150
                self._loop_aware_set.add("%s-%s" % (lk["from"], lk["to"]))
2✔
151

152
        # key - lgn id, val - a list of pgns associated with this lgn
153
        self._drop_dict = collections.defaultdict(list)
2✔
154
        self._reprodata = lg.get("reprodata", {})
2✔
155

156
    def validate_link(self, src, tgt):
2✔
157
        # print("validate_link()", src.id, src.is_scatter(), tgt.id, tgt.is_scatter())
158
        if src.is_scatter or tgt.is_scatter:
2✔
159
            prompt = (
×
160
                "Remember to specify Input App Type for the Scatter construct!"
161
            )
162
            raise GInvalidLink(
×
163
                "Scatter construct {0} or {1} cannot be linked. {2}".format(
164
                    src.name, tgt.name, prompt
165
                )
166
            )
167

168
        if src.is_loop or tgt.is_loop:
2✔
169
            raise GInvalidLink(
×
170
                "Loop construct {0} or {1} cannot be linked".format(
171
                    src.name, tgt.name
172
                )
173
            )
174

175
        if src.is_gather:
2✔
176
            if not (
×
177
                tgt.jd["categoryType"] in ["app", "application", "Application"]
178
                and tgt.is_group_start
179
                and src.inputs[0].h_level == tgt.h_level
180
            ):
181
                raise GInvalidLink(
×
182
                    "Gather {0}'s output {1} must be a Group-Start Component inside a Group with the same H level as Gather's input".format(
183
                        src.id, tgt.id
184
                    )
185
                )
186
            # raise GInvalidLink("Gather {0} cannot be the input".format(src.id))
187
        if tgt.is_groupby:
2✔
188
            if src.is_group:
2✔
189
                raise GInvalidLink(
×
190
                    "GroupBy {0} input must not be a group {1}".format(
191
                        tgt.id, src.id
192
                    )
193
                )
194
            elif len(tgt.inputs) > 0:
2✔
195
                raise GInvalidLink(
×
196
                    "GroupBy {0} already has input {2} other than {1}".format(
197
                        tgt.id, src.id, tgt.inputs[0].id
198
                    )
199
                )
200
            elif src.gid == 0:
2✔
201
                raise GInvalidLink(
×
202
                    "GroupBy {0} requires at least one Scatter around input {1}".format(
203
                        tgt.id, src.id
204
                    )
205
                )
206
        elif tgt.is_gather:
2✔
207
            if "categoryType" not in src.jd:
2✔
208
                src.jd["categoryType"] = "Data"
×
209
            if (
2✔
210
                not src.jd["categoryType"].lower() == "data"
211
                and not src.is_groupby
212
            ):
213
                raise GInvalidLink(
×
214
                    "Gather {0}'s input {1} should be either a GroupBy or Data. {2}".format(
215
                        tgt.id, src.id, src.jd
216
                    )
217
                )
218

219
        if src.is_groupby and not tgt.is_gather:
2✔
220
            raise GInvalidLink(
×
221
                "Output {1} from GroupBy {0} must be Gather, otherwise embbed {1} inside GroupBy {0}".format(
222
                    src.id, tgt.id
223
                )
224
            )
225

226
        if not src.h_related(tgt):
2✔
227
            ll = src.group
×
228
            rl = tgt.group
×
229
            if ll.is_loop and rl.is_loop:
×
230
                valid_loop_link = True
×
231
                while True:
232
                    if ll is None or rl is None:
×
233
                        break
×
234
                    if ll.is_loop and rl.is_loop:
×
235
                        if ll.dop != rl.dop:
×
236
                            valid_loop_link = False
×
237
                            break
×
238
                        else:
239
                            ll = ll.group
×
240
                            rl = rl.group
×
241
                    else:
242
                        break
×
243
                if not valid_loop_link:
×
244
                    raise GInvalidLink(
×
245
                        "{0} and {1} are not loop synchronised: {2} <> {3}".format(
246
                            ll.id, rl.id, ll.dop, rl.dop
247
                        )
248
                    )
249
            else:
250
                raise GInvalidLink(
×
251
                    "{0} and {1} are not hierarchically related: {2}-({4}) and {3}-({5})".format(
252
                        src.id,
253
                        tgt.id,
254
                        src.group_hierarchy,
255
                        tgt.group_hierarchy,
256
                        src.name,
257
                        tgt.name,
258
                    )
259
                )
260

261
    def get_child_lp_ctx(self, lgn, lpcxt, idx):
2✔
262
        if lgn.is_loop:
2✔
263
            if lpcxt is None:
2✔
264
                return "{0}".format(idx)
2✔
265
            else:
266
                return "{0}/{1}".format(lpcxt, idx)
2✔
267
        else:
268
            return None
2✔
269

270
    def lgn_to_pgn(self, lgn, iid="0", lpcxt=None):
2✔
271
        """
272
        convert a logical graph node to physical graph node(s)
273
        without considering pg links. This is a recursive method, creating also
274
        all child nodes required by constructs.
275

276
        iid:    instance id (string)
277
        lpcxt:  Loop context
278
        """
279
        if lgn.is_group:
2✔
280
            # group nodes are replaced with the input application of the
281
            # construct
282
            extra_links_drops = not lgn.is_scatter
2✔
283
            if extra_links_drops:
2✔
284
                non_inputs = []
2✔
285
                grp_starts = []
2✔
286
                grp_ends = []
2✔
287
                for child in lgn.children:
2✔
288
                    if len(child.inputs) == 0:
2✔
289
                        non_inputs.append(child)
2✔
290
                    if child.is_group_start:
2✔
291
                        grp_starts.append(child)
2✔
292
                    elif child.is_group_end:
2✔
293
                        grp_ends.append(child)
2✔
294
                if len(grp_starts) == 0:
2✔
295
                    gs_list = non_inputs
2✔
296
                else:
297
                    gs_list = grp_starts
2✔
298
                if lgn.is_loop:
2✔
299
                    if len(grp_starts) == 0 or len(grp_ends) == 0:
2✔
300
                        raise GInvalidNode(
×
301
                            "Loop '{0}' should have at least one Start "
302
                            + "Component and one End Data".format(lgn.name)
303
                        )
304
                    for ge in grp_ends:
2✔
305
                        for gs in grp_starts:  # make an artificial circle
2✔
306
                            lk = dict()
2✔
307
                            if gs not in ge._outputs:
2✔
308
                                ge.add_output(gs)
2✔
309
                            if ge not in gs._inputs:
2✔
310
                                gs.add_input(ge)
2✔
311
                            lk["from"] = ge.id
2✔
312
                            lk["to"] = gs.id
2✔
313
                            self._lg_links.append(lk)
2✔
314
                            logger.debug("Loop constructed: %s", gs._inputs)
2✔
315
                else:
316
                    for (
2✔
317
                        gs
318
                    ) in (
319
                        gs_list
320
                    ):  # add artificial logical links to the "first" children
321
                        lgn.add_input(gs)
2✔
322
                        gs.add_output(lgn)
2✔
323
                        lk = dict()
2✔
324
                        lk["from"] = lgn.id
2✔
325
                        lk["to"] = gs.id
2✔
326
                        self._lg_links.append(lk)
2✔
327

328
            multikey_grpby = False
2✔
329
            lgk = lgn.group_keys
2✔
330
            if lgk is not None and len(lgk) > 1:
2✔
331
                multikey_grpby = True
×
332
                scatters = lgn.group_by_scatter_layers[
×
333
                    2
334
                ]  # inner most scatter to outer most scatter
335
                shape = [
×
336
                    x.dop for x in scatters
337
                ]  # inner most is also the slowest running index
338

339
            lgn_is_loop = lgn.is_loop
2✔
340
            for i in range(lgn.dop):
2✔
341
                miid = "{0}/{1}".format(iid, i)
2✔
342
                if multikey_grpby:
2✔
343
                    # set up more refined hierarchical context for group by with multiple keys
344
                    # recover multl-dimension indexes from i
345
                    grp_h = np.unravel_index(i, shape)
×
346
                    grp_h = [str(x) for x in grp_h]
×
347
                    miid += "${0}".format("-".join(grp_h))
×
348

349
                if extra_links_drops and not lgn.is_loop:
2✔
350
                    # make GroupBy and Gather drops
351
                    src_drop = lgn.make_single_drop(miid)
2✔
352
                    self._drop_dict[lgn.id].append(src_drop)
2✔
353
                    if lgn.is_groupby:
2✔
354
                        self._drop_dict["new_added"].append(
2✔
355
                            src_drop["grp-data_drop"]
356
                        )
357
                    elif lgn.is_gather:
2✔
358
                        pass
359
                        # self._drop_dict['new_added'].append(src_drop['gather-data_drop'])
360
                for child in lgn.children:
2✔
361
                    self.lgn_to_pgn(
2✔
362
                        child, miid, self.get_child_lp_ctx(lgn, lpcxt, i)
363
                    )
364
        elif lgn.is_mpi:
2✔
365
            for i in range(lgn.dop):
2✔
366
                miid = "{0}/{1}".format(iid, i)
2✔
367
                src_drop = lgn.make_single_drop(
2✔
368
                    miid, loop_ctx=lpcxt, proc_index=i
369
                )
370
                self._drop_dict[lgn.id].append(src_drop)
2✔
371
        elif lgn.is_service:
2✔
372
            # no action required, inputapp node aleady created and marked with "isService"
373
            pass
×
374
        else:
375
            src_drop = lgn.make_single_drop(iid, loop_ctx=lpcxt)
2✔
376
            self._drop_dict[lgn.id].append(src_drop)
2✔
377
            if lgn.is_start_listener:
2✔
378
                self._drop_dict["new_added"].append(src_drop["listener_drop"])
×
379

380
    @staticmethod
2✔
381
    def _split_list(l, n):
2✔
382
        """
383
        Yield successive n-sized chunks from l.
384
        """
385
        for i in range(0, len(l), n):
2✔
386
            yield l[i : i + n]
2✔
387

388
    def _unroll_gather_as_output(
2✔
389
        self, slgn, tlgn, sdrops, tdrops, chunk_size, llink
390
    ):
391
        if slgn.h_level < tlgn.h_level:
2✔
392
            raise GraphException(
×
393
                "Gather {0} has higher h-level than its input {1}".format(
394
                    tlgn.id, slgn.id
395
                )
396
            )
397
        # src must be data
398
        for i, chunk in enumerate(self._split_list(sdrops, chunk_size)):
2✔
399
            for sdrop in chunk:
2✔
400
                self._link_drops(slgn, tlgn, sdrop, tdrops[i], llink)
2✔
401

402
    def _get_chunk_size(self, s, t):
2✔
403
        """
404
        Assumption:
405
        s or t cannot be Scatter as Scatter does not convert into DROPs
406
        """
407
        if t.is_gather:
2✔
408
            ret = t.gather_width
2✔
409
        elif t.is_groupby:
2✔
410
            ret = t.groupby_width
2✔
411
        else:
412
            ret = s.dop_diff(t)
2✔
413
        return ret
2✔
414

415
    def _is_stream_link(self, s_type, t_type):
2✔
416
        return s_type in [
2✔
417
            Categories.COMPONENT,
418
            Categories.DYNLIB_APP,
419
            Categories.DYNLIB_PROC_APP,
420
            Categories.PYTHON_APP,
421
        ] and t_type in [
422
            Categories.COMPONENT,
423
            Categories.DYNLIB_APP,
424
            Categories.DYNLIB_PROC_APP,
425
            Categories.PYTHON_APP,
426
        ]
427

428
    def _link_drops(self, slgn, tlgn, src_drop, tgt_drop, llink):
2✔
429
        """ """
430
        sdrop = None
2✔
431
        if slgn.is_gather:
2✔
432
            # sdrop = src_drop['gather-data_drop']
433
            pass
2✔
434
        elif slgn.is_groupby:
2✔
435
            sdrop = src_drop["grp-data_drop"]
2✔
436
        else:
437
            sdrop = src_drop
2✔
438

439
        if tlgn.is_gather:
2✔
440
            gather_oid = tgt_drop["oid"]
2✔
441
            if gather_oid not in self._gather_cache:
2✔
442
                # [self, input_list, output_list]
443
                self._gather_cache[gather_oid] = [tgt_drop, [], [], llink]
2✔
444
            tup = self._gather_cache[gather_oid]
2✔
445
            tup[1].append(sdrop)
2✔
446
            logger.debug(
2✔
447
                "Hit gather, link is from %s to %s", llink["from"], llink["to"]
448
            )
449
            return
2✔
450

451
        tdrop = tgt_drop
2✔
452
        s_type = slgn.jd["categoryType"]
2✔
453
        t_type = tlgn.jd["categoryType"]
2✔
454

455
        if self._is_stream_link(s_type, t_type):
2✔
456
            """
457
            1. create a null_drop in the middle
458
            2. link sdrop to null_drop
459
            3. link tdrop to null_drop as a streamingConsumer
460
            """
461
            dropSpec_null = dropdict(
×
462
                {
463
                    "oid": "{0}-{1}-stream".format(
464
                        sdrop["oid"],
465
                        tdrop["oid"].replace(self._session_id, ""),
466
                    ),
467
                    "categoryType": CategoryType.DATA,
468
                    "dropclass": "dlg.data.drops.data_base.NullDROP",
469
                    "name": "StreamNull",
470
                    "weight": 0,
471
                }
472
            )
NEW
473
            sdrop.addOutput(dropSpec_null, name="stream")
×
NEW
474
            dropSpec_null.addProducer(sdrop, name="stream")
×
NEW
475
            dropSpec_null.addStreamingConsumer(tdrop, name="stream")
×
NEW
476
            tdrop.addStreamingInput(dropSpec_null, name="stream")
×
UNCOV
477
            self._drop_dict["new_added"].append(dropSpec_null)
×
478
        elif s_type in ["Application", "Control"]:
2✔
479
            sname = slgn._getPortName("outputPorts")
2✔
480
            tname = tlgn._getPortName("inputPorts")
2✔
481
            logger.debug("Found port names: IN: %s, OUT: %s", sname, tname)
2✔
482
            sdrop.addOutput(tdrop, name=sname)
2✔
483
            tdrop.addProducer(sdrop, name=tname)
2✔
484
            if Categories.BASH_SHELL_APP == s_type:
2✔
485
                bc = src_drop["command"]
×
486
                bc.add_output_param(tlgn.id, tgt_drop["oid"])
×
487
        else:
488
            if slgn.is_gather:  # don't really add them
2✔
489
                gather_oid = src_drop["oid"]
2✔
490
                if gather_oid not in self._gather_cache:
2✔
491
                    # [self, input_list, output_list]
492
                    self._gather_cache[gather_oid] = [src_drop, [], [], llink]
×
493
                tup = self._gather_cache[gather_oid]
2✔
494
                tup[2].append(tgt_drop)
2✔
495
            else:  # sdrop is a data drop
496
                # there should be only one port, get the name
497
                portId = llink["fromPort"] if "fromPort" in llink else None
2✔
498
                sname = slgn._getPortName("outputPorts", portId=portId)
2✔
499
                # could be multiple ports, need to identify
500
                portId = llink["toPort"] if "toPort" in llink else None
2✔
501
                tname = tlgn._getPortName("inputPorts", portId=portId)
2✔
502
                logger.debug("Found port names: IN: %s, OUT: %s", sname, tname)
2✔
503
                # logger.debug(
504
                #     ">>> link from %s to %s (%s) (%s)",
505
                #     sname,
506
                #     tname,
507
                #     llink,
508
                #     portId,
509
                # )
510
                if llink.get("is_stream", False):
2✔
511
                    logger.debug(
×
512
                        "link stream connection %s to %s",
513
                        sdrop["oid"],
514
                        tdrop["oid"],
515
                    )
NEW
516
                    sdrop.addStreamingConsumer(tdrop, name=sname)
×
NEW
517
                    tdrop.addStreamingInput(sdrop, name=sname)
×
518
                else:
519
                    # logger.debug(
520
                    #     ">>> adding consumer %s to %s",
521
                    #     tdrop["categoryType"],
522
                    #     sdrop["categoryType"],
523
                    # )
524
                    sdrop.addConsumer(tdrop, name=sname)
2✔
525
                    tdrop.addInput(sdrop, name=tname)
2✔
526
            if Categories.BASH_SHELL_APP == t_type:
2✔
527
                bc = tgt_drop["command"]
×
528
                bc.add_input_param(slgn.id, src_drop["oid"])
×
529

530
    def unroll_to_tpl(self):
2✔
531
        """
532
        Not thread-safe!
533

534
        1. just create pgn anyway
535
        2. sort out the links
536
        """
537
        # each pg node needs to be taggged with iid
538
        # based purely on its h-level
539
        for lgn in self._start_list:
2✔
540
            self.lgn_to_pgn(lgn)
2✔
541

542
        logger.debug(
2✔
543
            "Unroll progress - lgn_to_pgn done %d for session %s",
544
            len(self._start_list),
545
            self._session_id,
546
        )
547
        self_loop_aware_set = self._loop_aware_set
2✔
548
        for lk in self._lg_links:
2✔
549
            sid = lk["from"]  # source key
2✔
550
            tid = lk["to"]  # target key
2✔
551
            slgn = self._done_dict[sid]
2✔
552
            tlgn = self._done_dict[tid]
2✔
553
            sdrops = self._drop_dict[sid]
2✔
554
            tdrops = self._drop_dict[tid]
2✔
555
            chunk_size = self._get_chunk_size(slgn, tlgn)
2✔
556
            if slgn.is_group and not tlgn.is_group:
2✔
557
                # this link must be artifically added (within group link)
558
                # since
559
                # 1. GroupBy's "natural" output must be a Scatter (i.e. group)
560
                # 2. Scatter "naturally" does not have output
561
                if (
2✔
562
                    slgn.is_gather and tlgn.gid != sid
563
                ):  # not the artifical link between gather and its own start child
564
                    # gather iteration case, tgt must be a Group-Start Component
565
                    # this is a way to manually sequentialise a Scatter that has a high DoP
566
                    for i, ga_drop in enumerate(sdrops):
×
567
                        if ga_drop["oid"] not in self._gather_cache:
×
568
                            logger.warning(
×
569
                                "Gather %s Drop not yet in cache, sequentialisation may fail!",
570
                                slgn.name,
571
                            )
572
                            continue
×
573
                        j = (i + 1) * slgn.gather_width
×
574
                        if j >= tlgn.group.dop and j % tlgn.group.dop == 0:
×
575
                            continue
×
576
                        while j < (
×
577
                            i + 2
578
                        ) * slgn.gather_width and j < tlgn.group.dop * (i + 1):
579
                            gather_input_list = self._gather_cache[
×
580
                                ga_drop["oid"]
581
                            ][1]
582
                            # TODO merge this code into the function
583
                            # def _link_drops(self, slgn, tlgn, src_drop, tgt_drop, llink)
NEW
584
                            tname = tlgn._getPortName(port="inputPorts")
×
585
                            for gddrop in gather_input_list:
×
NEW
586
                                gddrop.addConsumer(tdrops[j], name=tname)
×
NEW
587
                                tdrops[j].addInput(gddrop, name=tname)
×
UNCOV
588
                                j += 1
×
589

590
                            # if 'gather-data_drop' in ga_drop:
591
                            #     gddrop = ga_drop['gather-data_drop'] # this is the "true" target (not source!) drop
592
                            #     gddrop.addConsumer(tdrops[j])
593
                            #     tdrops[j].addInput(gddrop)
594
                            #     j += 1
595
                else:
596
                    if len(sdrops) != len(tdrops):
2✔
597
                        err_info = "For within-group links, # {2} Group Inputs {0} must be the same as # {3} of Component Outputs {1}".format(
×
598
                            slgn.id, tlgn.id, len(sdrops), len(tdrops)
599
                        )
600
                        raise GraphException(err_info)
×
601
                    for i, sdrop in enumerate(sdrops):
2✔
602
                        self._link_drops(slgn, tlgn, sdrop, tdrops[i], lk)
2✔
603
            elif slgn.is_group and tlgn.is_group:
2✔
604
                # slgn must be GroupBy and tlgn must be Gather
605
                self._unroll_gather_as_output(
2✔
606
                    slgn, tlgn, sdrops, tdrops, chunk_size, lk
607
                )
608
            elif not slgn.is_group and (not tlgn.is_group):
2✔
609
                if slgn.is_start_node:
2✔
610
                    continue
2✔
611
                elif (
2✔
612
                    (slgn.group is not None)
613
                    and slgn.group.is_loop
614
                    and slgn.gid == tlgn.gid
615
                    and slgn.is_group_end
616
                    and tlgn.is_group_start
617
                ):
618
                    # Re-link to the next iteration's start
619
                    lsd = len(sdrops)
2✔
620
                    if lsd != len(tdrops):
2✔
621
                        raise GraphException(
×
622
                            "# of sdrops '{0}' != # of tdrops '{1}'for Loop '{2}'".format(
623
                                slgn.name, tlgn.name, slgn.group.name
624
                            )
625
                        )
626
                    # first add the outer construct (scatter, gather, group-by) boundary
627
                    # oc = slgn.group.group
628
                    # if (oc is not None and (not oc.is_loop())):
629
                    #     pass
630
                    loop_chunk_size = slgn.group.dop
2✔
631
                    for i, chunk in enumerate(
2✔
632
                        self._split_list(sdrops, loop_chunk_size)
633
                    ):
634
                        for j, sdrop in enumerate(chunk):
2✔
635
                            if j < loop_chunk_size - 1:
2✔
636
                                self._link_drops(
2✔
637
                                    slgn,
638
                                    tlgn,
639
                                    sdrop,
640
                                    tdrops[i * loop_chunk_size + j + 1],
641
                                    lk,
642
                                )
643

644
                    # for i, sdrop in enumerate(sdrops):
645
                    #     if (i < lsd - 1):
646
                    #         self._link_drops(slgn, tlgn, sdrop, tdrops[i + 1])
647
                elif (
2✔
648
                    slgn.group is not None
649
                    and slgn.group.is_loop
650
                    and tlgn.group is not None
651
                    and tlgn.group.is_loop
652
                    and (not slgn.h_related(tlgn))
653
                ):
654
                    # stepwise locking for links between two Loops
655
                    for sdrop, tdrop in product(sdrops, tdrops):
×
656
                        if sdrop["loop_ctx"] == tdrop["loop_ctx"]:
×
657
                            self._link_drops(slgn, tlgn, sdrop, tdrop, lk)
×
658
                else:
659
                    lpaw = ("%s-%s" % (sid, tid)) in self_loop_aware_set
2✔
660
                    if (
2✔
661
                        slgn.group is not None
662
                        and slgn.group.is_loop
663
                        and lpaw
664
                        and slgn.h_level > tlgn.h_level
665
                    ):
666
                        loop_iter = slgn.group.dop
2✔
667
                        for i, chunk in enumerate(
2✔
668
                            self._split_list(sdrops, chunk_size)
669
                        ):
670
                            for j, sdrop in enumerate(chunk):
2✔
671
                                # only link drops in the last loop iteration
672
                                if j % loop_iter == loop_iter - 1:
2✔
673
                                    self._link_drops(
2✔
674
                                        slgn, tlgn, sdrop, tdrops[i], lk
675
                                    )
676
                    elif (
2✔
677
                        tlgn.group is not None
678
                        and tlgn.group.is_loop
679
                        and lpaw
680
                        and slgn.h_level < tlgn.h_level
681
                    ):
682
                        loop_iter = tlgn.group.dop
2✔
683
                        for i, chunk in enumerate(
2✔
684
                            self._split_list(tdrops, chunk_size)
685
                        ):
686
                            for j, tdrop in enumerate(chunk):
2✔
687
                                # only link drops in the first loop iteration
688
                                if j % loop_iter == 0:
2✔
689
                                    self._link_drops(
2✔
690
                                        slgn, tlgn, sdrops[i], tdrop, lk
691
                                    )
692

693
                    elif slgn.h_level >= tlgn.h_level:
2✔
694
                        for i, chunk in enumerate(
2✔
695
                            self._split_list(sdrops, chunk_size)
696
                        ):
697
                            # distribute slgn evenly to tlgn
698
                            for sdrop in chunk:
2✔
699
                                self._link_drops(
2✔
700
                                    slgn, tlgn, sdrop, tdrops[i], lk
701
                                )
702
                    else:
703
                        for i, chunk in enumerate(
2✔
704
                            self._split_list(tdrops, chunk_size)
705
                        ):
706
                            # distribute tlgn evenly to slgn
707
                            for tdrop in chunk:
2✔
708
                                self._link_drops(
2✔
709
                                    slgn, tlgn, sdrops[i], tdrop, lk
710
                                )
711
            else:  # slgn is not group, but tlgn is group
712
                if tlgn.is_groupby:
2✔
713
                    grpby_dict = collections.defaultdict(list)
2✔
714
                    layer_index = tlgn.group_by_scatter_layers[1]
2✔
715
                    for gdd in sdrops:
2✔
716
                        src_ctx = gdd["iid"].split("/")
2✔
717
                        if tlgn.group_keys is None:
2✔
718
                            # the last bit of iid (current h id) is the local GrougBy key, i.e. inner most loop context id
719
                            gby = src_ctx[-1]
2✔
720
                            if (
2✔
721
                                slgn.h_level - 2 == tlgn.h_level
722
                                and tlgn.h_level > 0
723
                            ):  # groupby itself is nested inside a scatter
724
                                # group key consists of group context id + inner most loop context id
725
                                gctx = "/".join(src_ctx[0:-2])
×
726
                                gby = gctx + "/" + gby
×
727
                        else:
728
                            # find the "group by" scatter level
729
                            gbylist = []
×
730
                            if slgn.group.is_groupby:  # a chain of group bys
×
731
                                try:
×
732
                                    src_ctx = (
×
733
                                        gdd["iid"].split("$")[1].split("-")
734
                                    )
735
                                except IndexError:
×
736
                                    raise GraphException(
×
737
                                        "The group by hiearchy in the multi-key group by '{0}' is not specified for node '{1}'".format(
738
                                            slgn.group.name, slgn.name
739
                                        )
740
                                    )
741
                            else:
742
                                src_ctx.reverse()
×
743
                            for lid in layer_index:
×
744
                                gbylist.append(src_ctx[lid])
×
745
                            gby = "/".join(gbylist)
×
746
                        grpby_dict[gby].append(gdd)
2✔
747
                    grp_keys = grpby_dict.keys()
2✔
748
                    if len(grp_keys) != len(tdrops):
2✔
749
                        # this happens when groupby itself is nested inside a scatter
750
                        raise GraphException(
×
751
                            "# of Group keys {0} != # of Group Drops {1} for LGN {2}".format(
752
                                len(grp_keys), len(tdrops), tlgn.id
753
                            )
754
                        )
755
                    grp_keys = sorted(grp_keys)
2✔
756
                    for i, gk in enumerate(grp_keys):
2✔
757
                        grpby_drop = tdrops[i]
2✔
758
                        drop_list = grpby_dict[gk]
2✔
759
                        for drp in drop_list:
2✔
760
                            self._link_drops(slgn, tlgn, drp, grpby_drop, lk)
2✔
761
                            """
2✔
762
                            drp.addOutput(grpby_drop)
763
                            grpby_drop.addInput(drp)
764
                            """
765
                elif tlgn.is_gather:
2✔
766
                    self._unroll_gather_as_output(
2✔
767
                        slgn, tlgn, sdrops, tdrops, chunk_size, lk
768
                    )
769
                elif tlgn.is_service:
×
770
                    # Only the service node's inputApplication will be translated
771
                    # to the physical graph as a node of type SERVICE_APP instead of APP
772
                    # per compute instance
NEW
773
                    tlgn["categoryType"] = "Application"
×
NEW
774
                    tlgn["category"] = "PythonApp"
×
775
                else:
776
                    raise GraphException(
×
777
                        "Unsupported target group {0}".format(tlgn.jd.category)
778
                    )
779

780
        for _, v in self._gather_cache.items():
2✔
781
            input_list = v[1]
2✔
782
            try:
2✔
783
                output_drop = v[2][
2✔
784
                    0
785
                ]  # "peek" the first element of the output list
786
            except:
×
787
                continue  # the gather hasn't got output drops, just move on
×
788
            llink = v[-1]
2✔
789
            for data_drop in input_list:
2✔
790
                # TODO merge this code into the function
791
                # def _link_drops(self, slgn, tlgn, src_drop, tgt_drop, llink)
792
                sname = slgn._getPortName(ports="outputPorts")
2✔
793
                if llink.get("is_stream", False):
2✔
794
                    logger.debug(
×
795
                        "link stream connection %s to %s",
796
                        data_drop["oid"],
797
                        output_drop["oid"],
798
                    )
NEW
799
                    data_drop.addStreamingConsumer(output_drop, name=sname)
×
NEW
800
                    output_drop.addStreamingInput(data_drop, name=sname)
×
801
                else:
802
                    data_drop.addConsumer(output_drop, name=sname)
2✔
803
                    output_drop.addInput(data_drop, name=sname)
2✔
804
                # print(data_drop['nm'], data_drop['oid'], '-->', output_drop['nm'], output_drop['oid'])
805

806
        logger.info(
2✔
807
            "Unroll progress - %d links done for session %s",
808
            len(self._lg_links),
809
            self._session_id,
810
        )
811

812
        # clean up extra drops
813
        for lid, lgn in self._done_dict.items():
2✔
814
            if (lgn.is_start_node) and lid in self._drop_dict:
2✔
815
                del self._drop_dict[lid]
2✔
816
            elif lgn.is_start_listener:
2✔
817
                for sl_drop in self._drop_dict[lid]:
×
818
                    if "listener_drop" in sl_drop:
×
819
                        del sl_drop["listener_drop"]
×
820
            elif lgn.is_groupby:
2✔
821
                for sl_drop in self._drop_dict[lid]:
2✔
822
                    if "grp-data_drop" in sl_drop:
2✔
823
                        del sl_drop["grp-data_drop"]
2✔
824
            elif lgn.is_gather:
2✔
825
                # lid_sub = "{0}-gather-data".format(lid)
826
                del self._drop_dict[lid]
2✔
827
                # for sl_drop in self._drop_dict[lid]:
828
                #     if 'gather-data_drop' in sl_drop:
829
                #         del sl_drop['gather-data_drop']
830

831
        logger.info(
2✔
832
            "Unroll progress - extra drops done for session %s",
833
            self._session_id,
834
        )
835
        ret = []
2✔
836
        for drop_list in self._drop_dict.values():
2✔
837
            ret += drop_list
2✔
838

839
        return ret
2✔
840

841
    @property
2✔
842
    def reprodata(self):
2✔
843
        return self._reprodata
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc