• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ICRAR / daliuge / 4911681207

pending completion
4911681207

Pull #231

github

GitHub
Merge 9186e10d1 into e48989cce
Pull Request #231: Liu 355

180 of 229 new or added lines in 17 files covered. (78.6%)

26 existing lines in 5 files now uncovered.

15345 of 19059 relevant lines covered (80.51%)

1.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.99
/daliuge-translator/dlg/dropmake/pgt.py
1
#
2
#    ICRAR - International Centre for Radio Astronomy Research
3
#    (c) UWA - The University of Western Australia, 2015
4
#    Copyright by UWA (in the framework of the ICRAR)
5
#    All rights reserved
6
#
7
#    This library is free software; you can redistribute it and/or
8
#    modify it under the terms of the GNU Lesser General Public
9
#    License as published by the Free Software Foundation; either
10
#    version 2.1 of the License, or (at your option) any later version.
11
#
12
#    This library is distributed in the hope that it will be useful,
13
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
14
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15
#    Lesser General Public License for more details.
16
#
17
#    You should have received a copy of the GNU Lesser General Public
18
#    License along with this library; if not, write to the Free Software
19
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston,
20
#    MA 02111-1307  USA
21
#
22
"""
2✔
23
The DALiuGE resource manager uses the requested logical graphs, the available resources and
24
the profiling information and turns it into the partitioned physical graph,
25
which will then be deployed and monitored by the Physical Graph Manager
26
"""
27

28
if __name__ == "__main__":
2✔
29
    __package__ = "dlg.dropmake"
×
30

31

32
import json
2✔
33
import logging
2✔
34
import math
2✔
35

36
from dlg.dropmake.lg import GraphException
2✔
37
from dlg.dropmake.scheduler import DAGUtil
2✔
38
from dlg.common import CategoryType, dropdict
2✔
39

40
logger = logging.getLogger(__name__)
2✔
41

42

43
class GPGTNoNeedMergeException(GraphException):
2✔
44
    pass
2✔
45

46

47
class GPGTException(GraphException):
2✔
48
    pass
2✔
49

50

51
class PGT(object):
2✔
52
    """
53
    A DROP representation of Physical Graph Template
54
    """
55

56
    def __init__(self, drop_list, build_dag=True):
2✔
57
        self._drop_list = drop_list
2✔
58
        self._drop_list_len = len(drop_list)
2✔
59
        self._extra_drops = []  # artifacts DROPs produced during L2G mapping
2✔
60
        self._dag = (
2✔
61
            DAGUtil.build_dag_from_drops(self._drop_list)
62
            if build_dag
63
            else None
64
        )
65
        self._json_str = None
2✔
66
        self._oid_gid_map = dict()
2✔
67
        self._gid_island_id_map = dict()
2✔
68
        self._num_parts_done = 0
2✔
69
        self._partition_merged = 0
2✔
70
        self._inner_parts = []  # a list of inner partitions (e.g. nodes)
2✔
71
        # for visualisation only
72
        self._bw_ratio = 10.0  # bandwidth ratio between instra-comp_island
2✔
73
        # and inter-comp_island
74
        self._merge_parts = False
2✔
75
        self._island_labels = ["Data", "Compute"]
2✔
76
        self._data_movement = None
2✔
77
        self._reprodata = {}
2✔
78

79
    def _can_merge(self, new_num_parts):
2✔
80
        if new_num_parts <= 0:
2✔
81
            raise GPGTException(
×
82
                "Invalid new_num_parts {0}".format(new_num_parts)
83
            )
84
        if not self._merge_parts:
2✔
85
            raise GPGTException(
×
86
                "This {0} PGTP is not made for merging".format(
87
                    self.__class__.__name__
88
                )
89
            )
90
        if self._num_parts_done <= new_num_parts:
2✔
91
            raise GPGTNoNeedMergeException(
2✔
92
                "No need to merge this {0} PGTP: {1} <= {2}".format(
93
                    self.__class__.__name__,
94
                    self._num_parts_done,
95
                    new_num_parts,
96
                )
97
            )
98
        if self._partition_merged == new_num_parts:
2✔
99
            return False
×
100
        else:
101
            return True
2✔
102

103
    @property
2✔
104
    def drops(self):
2✔
105
        if self._extra_drops is None:
2✔
106
            return self._drop_list
×
107
        else:
108
            return self._drop_list + self._extra_drops
2✔
109

110
    def to_partition_input(self, outf):
2✔
111
        """
112
        Convert to format for mapping and decomposition
113
        """
114
        raise GPGTException("Must be implemented by PGTP sub-class only")
×
115

116
    def get_opt_num_parts(self):
2✔
117
        """
118
        dummy for now
119
        """
120
        leng = len(self._drop_list)
×
121
        return int(math.ceil(leng / 10.0))
×
122

123
    def get_partition_info(self):
2✔
124
        # return "No partitioning. - Completion time: {0} - "\
125
        # "Data movement: {2} - Min exec time: {1}"\
126
        # .format(self.pred_exec_time(),
127
        # self.pred_exec_time(app_drop_only=True),
128
        # 0)
129
        return "Raw_unrolling"
×
130

131
    def result(self, lazy=True):
2✔
132
        ret = {}
2✔
133
        ret["algo"] = self.get_partition_info()
2✔
134
        ret["min_exec_time"] = self.pred_exec_time(
2✔
135
            app_drop_only=True, force_answer=(not lazy)
136
        )
137
        ret["total_data_movement"] = self.data_movement
2✔
138
        ret["exec_time"] = self.pred_exec_time(force_answer=(not lazy))
2✔
139
        if self._merge_parts and self._partition_merged > 0:
2✔
140
            ret["num_islands"] = self._partition_merged
2✔
141
        self._extra_result(ret)
2✔
142
        return ret
2✔
143

144
    def _extra_result(self, ret):
2✔
145
        pass
×
146

147
    @property
2✔
148
    def dag(self):
2✔
149
        """
150
            Return the networkx nx.DiGraph object
151

152
        The weight of the same edge (u, v) also depends.
153
        If it is called after the partitioning, it could have been zeroed
154
        if both u and v is allocated to the same DropIsland
155
        """
156
        return self._dag
2✔
157

158
    @property
2✔
159
    def data_movement(self):
2✔
160
        """
161
        Return the TOTAL data movement
162
        """
163
        if self._data_movement is not None:
2✔
164
            return self._data_movement
2✔
165
        elif self.dag is not None:
2✔
166
            G = self.dag
2✔
167
            self._data_movement = sum(
2✔
168
                e[2].get("weight", 0) for e in G.edges(data=True)
169
            )
170
        return self._data_movement
2✔
171

172
    def pred_exec_time(
2✔
173
        self, app_drop_only=False, wk="weight", force_answer=False
174
    ):
175
        """
176
        Predict execution time using the longest path length
177
        """
178
        G = self.dag
2✔
179
        if G is None:
2✔
180
            if force_answer:
2✔
181
                self._dag = DAGUtil.build_dag_from_drops(self._drop_list)
2✔
182
                G = self.dag
2✔
183
            else:
184
                return None
×
185
        if app_drop_only:
2✔
186
            lp = DAGUtil.get_longest_path(G, show_path=True)[0]
2✔
187
            return sum(G.nodes[u].get(wk, 0) for u in lp)
2✔
188
        else:
189
            return DAGUtil.get_longest_path(G, show_path=False)[1]
2✔
190

191
    @property
2✔
192
    def json(self):
2✔
193
        """
194
        Return the JSON string representation of the PGT
195
        for visualisation
196
        """
197
        if self._json_str is None:
2✔
198
            self._json_str = self.to_gojs_json(visual=True)
2✔
199

200
        return self._json_str
2✔
201
        # return self.to_gojs_json()
202

203
    @property
2✔
204
    def reprodata(self):
2✔
205
        return self._reprodata
2✔
206

207
    @reprodata.setter
2✔
208
    def reprodata(self, value):
2✔
209
        self._reprodata = value
2✔
210

211
    def merge_partitions(
2✔
212
        self, new_num_parts, form_island=False, island_type=0, visual=False
213
    ):
214
        raise Exception("Not implemented. Call sub-class")
×
215

216
    def to_pg_spec(
2✔
217
        self,
218
        node_list,
219
        ret_str=True,
220
        num_islands=1,
221
        tpl_nodes_len=0,
222
        co_host_dim=True,
223
    ):
224
        """
225
        convert pgt to pg specification, and map that to the hardware resources
226

227
        node_list:
228
            A list of nodes (list), whose length == (num_islands + num_node_mgrs)
229
            if co_locate_islands = False.
230
            We assume that the MasterDropManager's node is NOT in the node_list
231

232
        num_islands:
233
            - >1 Partitions are "conceptually" clustered into Islands
234
            - 1 Partitions MAY BE physically merged without generating islands
235
              depending on the length of node_list
236
            - num_islands can't be < 1
237

238
        tpl_nodes_len: if this is given we generate a pg_spec template
239
            The pg_spec template is what needs to be send to a deferred deployemnt
240
            where the daliuge system is started up afer submission (e.g. SLURM)
241
        """
242
        logger.debug(
2✔
243
            "tpl_nodes_len: %s, node_list: %s", tpl_nodes_len, node_list
244
        )
245
        if tpl_nodes_len > 0:  # generate pg_spec template
2✔
246
            node_list = range(tpl_nodes_len)  # create a fake list for now
2✔
247

248
        if 0 == self._num_parts_done:
2✔
249
            raise GPGTException("The graph has not been partitioned yet")
×
250

251
        if node_list is None or 0 == len(node_list):
2✔
252
            raise GPGTException("Node list is empty!")
×
253
        nodes_len = len(node_list)
2✔
254

255
        try:
2✔
256
            num_islands = int(num_islands)
2✔
257
        except:
×
258
            raise GPGTException(
×
259
                "Invalid num_islands spec: {0}".format(num_islands)
260
            )
261
        if num_islands < 1:
2✔
262
            num_islands = 1  # need at least one island manager
2✔
263
        if num_islands > nodes_len:
2✔
264
            raise GPGTException(
×
265
                "Number of islands must be <= number of specified nodes!"
266
            )
267
        form_island = num_islands > 1
2✔
268
        if nodes_len < 1:  # we allow to run everything on a single node now!
2✔
269
            raise GPGTException("Too few nodes: {0}".format(nodes_len))
×
270

271
        num_parts = self._num_parts_done
2✔
272
        drop_list = self.drops
2✔
273

274
        # deal with the co-hosting of DIMs
275
        if not co_host_dim:
2✔
276
            if form_island and num_parts > nodes_len:
×
277
                raise GPGTException(
×
278
                    "Insufficient number of nodes: {0}".format(nodes_len)
279
                )
280
            is_list = node_list[0:num_islands]
×
281
            nm_list = node_list[num_islands:]
×
282
        else:
283
            if form_island and num_islands + num_parts > nodes_len:
2✔
284
                raise GPGTException(
×
285
                    "Insufficient number of nodes: {0}".format(nodes_len)
286
                )
287
            is_list = node_list
2✔
288
            nm_list = node_list
2✔
289
        nm_len = len(nm_list)
2✔
290
        logger.info(
2✔
291
            "Drops count: %d, partitions count: %d, nodes count: %d, island count: %d",
292
            len(drop_list),
293
            num_parts,
294
            nodes_len,
295
            len(is_list),
296
        )
297

298
        if form_island:
2✔
299
            self.merge_partitions(num_islands, form_island=True)
2✔
300
            # from Eq.1 we know that num_parts <= nm_len
301
            # so no need to update its value
302
        elif nm_len < num_parts:
2✔
303
            self.merge_partitions(nm_len, form_island=False)
2✔
304
            num_parts = nm_len
2✔
305

306
        lm = self._oid_gid_map
2✔
307
        lm2 = self._gid_island_id_map
2✔
308
        # when #partitions < #nodes the lm values are spread around range(#nodes)
309
        # which leads to index out of range errors (TODO: find how _oid_gid_map is
310
        # constructed). The next three lines are attempting to fix this, however
311
        # then the test_metis_pgtp_gen_pg_island fails. This needs more investigation
312
        # but is a corner case.
313
        # values = set(dict(lm).values()) # old unique values
314
        # values = dict(zip(values,range(len(values)))) # dict with new values
315
        # lm = {k:values[v] for (k, v) in lm.items()} # replace old values with new
316

317
        if tpl_nodes_len:
2✔
318
            nm_list = [
2✔
319
                "#%s" % x for x in range(nm_len)
320
            ]  # so that nm_list[i] == '#i'
321
            is_list = [
2✔
322
                "#%s" % x for x in range(len(is_list))
323
            ]  # so that is_list[i] == '#i'
324

325
        for drop in drop_list:
2✔
326
            oid = drop["oid"]
2✔
327
            # For now, simply round robin, but need to consider
328
            # nodes cross COMPUTE islands which has
329
            # TODO consider distance between a pair of nodes
330
            gid = lm[oid]
2✔
331
            drop["node"] = nm_list[gid]
2✔
332
            isid = lm2[gid] % num_islands if form_island else 0
2✔
333
            drop["island"] = is_list[isid]
2✔
334

335
        if ret_str:
2✔
336
            return json.dumps(drop_list, indent=2)
2✔
337
        else:
338
            return drop_list
2✔
339

340
    def to_gojs_json(self, string_rep=True, outdict=None, visual=False):
2✔
341
        """
342
        Convert PGT (without any partitions) to JSON for visualisation in GOJS
343

344
        Sub-class PGTPs will override this function, and replace this with
345
        actual partitioning, and the visulisation becomes an option
346
        """
347
        G = self.dag
2✔
348
        ret = dict()
2✔
349
        ret["class"] = "go.GraphLinksModel"
2✔
350
        nodes = []
2✔
351
        links = []
2✔
352
        key_dict = dict()  # key - oid, value - GOJS key
2✔
353

354
        for i, drop in enumerate(self._drop_list):
2✔
355
            oid = drop["oid"]
2✔
356
            node = dict()
2✔
357
            node["key"] = i + 1
2✔
358
            key_dict[oid] = i + 1
2✔
359
            node["oid"] = oid
2✔
360
            tt = drop["categoryType"]
2✔
361
            if CategoryType.DATA == tt:
2✔
362
                node["category"] = "Data"
2✔
363
            elif CategoryType.APPLICATION == tt:
2✔
364
                node["category"] = "Application"
2✔
365
            node["name"] = drop["name"]
2✔
366
            nodes.append(node)
2✔
367

368
        if self._extra_drops is None:
2✔
369
            extra_drops = []
2✔
370
            remove_edges = []
2✔
371
            add_edges = []  # a list of tuples
2✔
372
            add_nodes = []
2✔
373
            for drop in self._drop_list:
2✔
374
                oid = drop["oid"]
2✔
375
                myk = key_dict[oid]
2✔
376
                for i, oup in enumerate(G.successors(myk)):
2✔
377
                    link = dict()
2✔
378
                    link["from"] = myk
2✔
379
                    from_dt = (
2✔
380
                        0
381
                        if drop["categoryType"] in [CategoryType.DATA, "data"]
382
                        else 1
383
                    )
384
                    to_dt = G.nodes[oup]["drop_type"]
2✔
385
                    if from_dt == to_dt:
2✔
386
                        to_drop = G.nodes[oup]["drop_spec"]
2✔
387
                        if from_dt == 0:
2✔
388
                            # add an extra app DROP
389
                            extra_oid = "{0}_TransApp_{1}".format(oid, i)
×
390
                            dropSpec = dropdict(
×
391
                                {
392
                                    "oid": extra_oid,
393
                                    "categoryType": CategoryType.APPLICATION,
394
                                    "dropclass": "dlg.drop.BarrierAppDROP",
395
                                    "name": "go_app",
396
                                    "weight": 1,
397
                                }
398
                            )
399
                            # create links
400
                            drop.addConsumer(dropSpec)
×
401
                            dropSpec.addInput(drop)
×
402
                            dropSpec.addOutput(to_drop)
×
403
                            to_drop.addProducer(dropSpec)
×
404
                            mydt = 1
×
405
                        else:
406
                            # add an extra data DROP
407
                            extra_oid = "{0}_TransData_{1}".format(oid, i)
2✔
408
                            dropSpec = dropdict(
2✔
409
                                {
410
                                    "oid": extra_oid,
411
                                    "categoryType": CategoryType.DATA,
412
                                    "dropclass": "dlg.data.drops.memory.InMemoryDROP",
413
                                    "name": "go_data",
414
                                    "weight": 1,
415
                                }
416
                            )
417
                            drop.addOutput(dropSpec)
2✔
418
                            dropSpec.addProducer(drop)
2✔
419
                            dropSpec.addConsumer(to_drop)
2✔
420
                            to_drop.addInput(dropSpec)
2✔
421
                            mydt = 0
2✔
422
                        extra_drops.append(dropSpec)
2✔
423
                        lid = len(extra_drops) * -1
2✔
424
                        link["to"] = lid
2✔
425
                        endlink = dict()
2✔
426
                        endlink["from"] = lid
2✔
427
                        endlink["to"] = oup
2✔
428
                        links.append(endlink)
2✔
429
                        # global graph updates
430
                        # the new drop must have the same gid as the to_drop
431
                        add_nodes.append(
2✔
432
                            (
433
                                lid,
434
                                1,
435
                                mydt,
436
                                dropSpec,
437
                                G.nodes[oup].get("gid", None),
438
                            )
439
                        )
440
                        remove_edges.append((myk, oup))
2✔
441
                        add_edges.append((myk, lid))
2✔
442
                        add_edges.append((lid, oup))
2✔
443
                    else:
444
                        link["to"] = oup
2✔
445
                    links.append(link)
2✔
446
            for gn in add_nodes:
2✔
447
                # logger.debug("added gid = {0} for new node {1}".format(gn[4], gn[0]))
448
                G.add_node(
2✔
449
                    gn[0],
450
                    weight=gn[1],
451
                    drop_type=gn[2],
452
                    drop_spec=gn[3],
453
                    gid=gn[4],
454
                )
455
            G.remove_edges_from(remove_edges)
2✔
456
            G.add_edges_from(add_edges)
2✔
457
            self._extra_drops = extra_drops
2✔
458
        else:
459
            for drop in self._drop_list:
2✔
460
                oid = drop["oid"]
2✔
461
                myk = key_dict[oid]
2✔
462
                for oup in G.successors(myk):
2✔
463
                    link = dict()
2✔
464
                    link["from"] = myk
2✔
465
                    link["to"] = oup
2✔
466
                    links.append(link)
2✔
467

468
        # going through the extra_drops
469
        for i, drop in enumerate(self._extra_drops):
2✔
470
            oid = drop["oid"]
2✔
471
            node = dict()
2✔
472
            node["key"] = (i + 1) * -1
2✔
473
            node["oid"] = oid
2✔
474
            tt = drop["categoryType"]
2✔
475
            if tt == CategoryType.DATA:
2✔
476
                node["category"] = "Data"
2✔
NEW
477
            elif tt == CategoryType.APPLICATION:
×
478
                node["category"] = "PythonApp"  # might not be correct
×
479
            node["name"] = drop["name"]
2✔
480
            nodes.append(node)
2✔
481

482
        ret["nodeDataArray"] = nodes
2✔
483
        ret["linkDataArray"] = links
2✔
484
        self._gojs_json_obj = ret
2✔
485
        if string_rep:
2✔
486
            return json.dumps(ret, indent=2)
2✔
487
        else:
488
            return ret
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc