• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ICRAR / daliuge / 10363213063

13 Aug 2024 03:39AM UTC coverage: 79.63% (-0.09%) from 79.722%
10363213063

Pull #271

github

web-flow
Merge branch 'master' into liu-377
Pull Request #271: Liu 377

70 of 122 new or added lines in 13 files covered. (57.38%)

12 existing lines in 6 files now uncovered.

15375 of 19308 relevant lines covered (79.63%)

1.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.35
/daliuge-translator/dlg/dropmake/pgtp.py
1
#
2
#    ICRAR - International Centre for Radio Astronomy Research
3
#    (c) UWA - The University of Western Australia, 2015
4
#    Copyright by UWA (in the framework of the ICRAR)
5
#    All rights reserved
6
#
7
#    This library is free software; you can redistribute it and/or
8
#    modify it under the terms of the GNU Lesser General Public
9
#    License as published by the Free Software Foundation; either
10
#    version 2.1 of the License, or (at your option) any later version.
11
#
12
#    This library is distributed in the hope that it will be useful,
13
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
14
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15
#    Lesser General Public License for more details.
16
#
17
#    You should have received a copy of the GNU Lesser General Public
18
#    License along with this library; if not, write to the Free Software
19
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston,
20
#    MA 02111-1307  USA
21
#
22

23
from collections import defaultdict
2✔
24
import json
2✔
25
import logging
2✔
26

27
import networkx as nx
2✔
28

29
from dlg.dropmake.pgt import PGT, GPGTException
2✔
30
from dlg.dropmake.scheduler import (
2✔
31
    MySarkarScheduler,
32
    DAGUtil,
33
    MinNumPartsScheduler,
34
    PSOScheduler,
35
)
36
from dlg.common import CategoryType
2✔
37

38
logger = logging.getLogger(__name__)
2✔
39

40

41
class MetisPGTP(PGT):
2✔
42
    """
43
    DROP and GOJS representations of Physical Graph Template with Partitions
44
    Based on METIS
45
    http://glaros.dtc.umn.edu/gkhome/metis/metis/overview
46
    """
47

48
    def __init__(
2✔
49
        self,
50
        drop_list,
51
        num_partitions=1,
52
        min_goal=0,
53
        par_label="Partition",
54
        ptype=0,
55
        ufactor=10,
56
        merge_parts=False,
57
    ):
58
        """
59
        num_partitions:  number of partitions supplied by users (int)
60
        TODO - integrate from within PYTHON module (using C API) soon!
61
        """
62
        super(MetisPGTP, self).__init__(drop_list, build_dag=False)
2✔
63
        self._metis_path = "gpmetis"  # assuming it is installed at the sys path
2✔
64
        if num_partitions <= 0:
2✔
65
            # self._num_parts = self.get_opt_num_parts()
NEW
66
            raise GPGTException("Invalid num_partitions {0}".format(num_partitions))
×
67
        else:
68
            self._num_parts = num_partitions
2✔
69
        if 1 == min_goal:
2✔
70
            self._obj_type = "vol"
×
71
        else:
72
            self._obj_type = "cut"
2✔
73

74
        if 0 == ptype:
2✔
75
            self._ptype = "kway"
2✔
76
        else:
77
            self._ptype = "rb"
×
78

79
        self._par_label = par_label
2✔
80
        self._u_factor = ufactor
2✔
81
        self._metis_logs = []
2✔
82
        self._G = self.to_partition_input()
2✔
83
        self._metis = DAGUtil.import_metis()
2✔
84
        self._group_workloads = dict()  # k - gid, v - a tuple of (tw, sz)
2✔
85
        self._merge_parts = merge_parts
2✔
86
        self._metis_out = None  # initial internal partition result
2✔
87

88
    def to_partition_input(self, outf=None):
2✔
89
        """
90
        Convert to METIS format for mapping and decomposition
91
        NOTE - Since METIS only supports Undirected Graph, we have to produce
92
        both upstream and downstream nodes to fit its input format
93
        """
94
        key_dict = dict()  # key - oid, value - GOJS key
2✔
95
        droplist = self._drop_list
2✔
96

97
        G = nx.Graph()
2✔
98
        G.graph["edge_weight_attr"] = "weight"
2✔
99
        G.graph["node_weight_attr"] = "weight"
2✔
100
        G.graph["node_size_attr"] = "size"
2✔
101

102
        for i, drop in enumerate(droplist):
2✔
103
            try:
2✔
104
                oid = drop["oid"]
2✔
105
            except KeyError:
×
106
                logger.debug("Drop does not have oid: %s", drop)
×
107
                droplist.pop(i)
×
108
                continue
×
109
            key_dict[oid] = i + 1  # METIS index starts from 1
2✔
110

111
        logger.info("Metis partition input progress - dropdict is built")
2✔
112

113
        if self._drop_list_len > 1e7:
2✔
114
            import resource
×
115

116
            logger.info(
×
117
                "self._drop_list, max RSS: %.2f GB",
118
                resource.getrusage(resource.RUSAGE_SELF)[2] / 1024.0**2,
119
            )
120
        tw = 1
2✔
121
        sz = 1
2✔
122
        dst = "outputs"
2✔
123
        ust = "inputs"
2✔
124
        for i, drop in enumerate(droplist):
2✔
125
            oid = drop["oid"]
2✔
126
            myk = i + 1
2✔
127
            tt = drop["categoryType"]
2✔
128
            if tt in [CategoryType.DATA, "data"]:
2✔
129
                dst = "consumers"  # outbound keyword
2✔
130
                ust = "producers"
2✔
131
                tw = 1  # task weight is zero for a Data DROP
2✔
132
                sz = drop.get("weight", 1)  # size
2✔
133
            elif tt in [CategoryType.APPLICATION, "app"]:
2✔
134
                dst = "outputs"
2✔
135
                ust = "inputs"
2✔
136
                tw = drop.get("weight", 1)
2✔
137
                sz = 1
2✔
138
            G.add_node(myk, weight=tw, size=sz, oid=oid)
2✔
139
            adj_drops = []  # adjacent drops (all neighbours)
2✔
140
            if dst in drop:
2✔
141
                adj_drops += drop[dst]
2✔
142
            if ust in drop:
2✔
143
                adj_drops += drop[ust]
2✔
144

145
            for inp in adj_drops:
2✔
146
                key = list(inp.keys())[0] if isinstance(inp, dict) else inp
2✔
147
                if tt in [CategoryType.DATA, "data"]:
2✔
148
                    lw = drop["weight"]
2✔
149
                elif tt in [CategoryType.APPLICATION, "app"]:
2✔
150
                    # get the weight of the previous drop
151
                    lw = droplist[key_dict[key] - 1].get("weight", 1)
2✔
152
                if lw <= 0:
2✔
153
                    lw = 1
×
154
                G.add_edge(myk, key_dict[key], weight=lw)
2✔
155
        # for e in G.edges(data=True):
156
        #     if (e[2]['weight'] == 0):
157
        #         e[2]['weight'] = 1
158
        if self._drop_list_len > 1e7:
2✔
159
            import resource
×
160

161
            logger.info(
×
162
                "Max RSS after creating the Graph: %.2f GB",
163
                resource.getrusage(resource.RUSAGE_SELF)[2] / 1024.0**2,
164
            )
165
        return G
2✔
166

167
    def _set_metis_log(self, logtext):
2✔
168
        self._metis_logs = logtext.split("\n")
×
169

170
    def get_partition_info(self):
2✔
171
        """
172
        partition parameter and log entry
173
        return a string
174
        """
175
        return "METIS_LB{0}".format(101 - self._u_factor)
2✔
176

177
    def _extra_result(self, ret):
2✔
178
        ret["num_parts"] = self._num_parts
2✔
179

180
    def _parse_metis_output(self, metis_out, jsobj):
2✔
181
        """
182
        1. parse METIS result, and add group node into the GOJS json
183
        2. also update edge weight for self._dag
184
        """
185
        # key_dict = dict() #k - gojs key, v - gojs group id
186
        groups = set()
2✔
187
        ogm = self._oid_gid_map
2✔
188
        group_weight = self._group_workloads  # k - gid, v - a tuple of (tw, sz)
2✔
189
        G = self._G
2✔
190
        # start_k = len(self._drop_list) + 1
191
        start_k = self._drop_list_len + 1
2✔
192
        for gnode, gid in zip(G.nodes(data=True), metis_out):
2✔
193
            groups.add(gid)
2✔
194
            gnode = gnode[1]
2✔
195
            ogm[gnode["oid"]] = gid
2✔
196
            gnode["gid"] = gid
2✔
197

198
        # house keeping after partitioning
199
        self._num_parts_done = len(groups)
2✔
200
        if self.dag is not None:
2✔
201
            for e in self.dag.edges(data=True):
2✔
202
                gid0 = metis_out[e[0] - 1]
2✔
203
                gid1 = metis_out[e[1] - 1]
2✔
204
                if gid0 == gid1:
2✔
205
                    e[2]["weight"] = 0
2✔
206

207
        # the following is for potential partition merging into islands
208
        if self._merge_parts:
2✔
209
            for gid in groups:
2✔
210
                group_weight[gid] = [0, 0]
2✔
211
            for gnode in G.nodes(data=True):
2✔
212
                tt = group_weight[gnode[1]["gid"]]
2✔
213
                try:
2✔
214
                    tt[0] += int(gnode[1]["weight"])
2✔
215
                    tt[1] += int(gnode[1]["size"])
2✔
216
                except ValueError:
×
217
                    tt[0] = 1
×
218
                    tt[1] = 1
×
219
        # the following is for visualisation using GOJS
220
        if jsobj is not None:
2✔
221
            node_list = jsobj["nodeDataArray"]
2✔
222
            for node in node_list:
2✔
223
                nid = int(node["key"])
2✔
224
                gid = metis_out[nid - 1]
2✔
225
                node["group"] = gid + start_k
2✔
226

227
            inner_parts = []
2✔
228
            for gid in groups:
2✔
229
                gn = dict()
2✔
230
                gn["key"] = start_k + gid
2✔
231
                gn["isGroup"] = True
2✔
232
                gn["name"] = "{1}_{0}".format(gid + 1, self._par_label)
2✔
233
                node_list.append(gn)
2✔
234
                inner_parts.append(gn)
2✔
235

236
            self._inner_parts = inner_parts
2✔
237
            self._node_list = node_list
2✔
238

239
    def to_gojs_json(self, string_rep=True, outdict=None, visual=False):
2✔
240
        """
241
        Partition the PGT into a real "PGT with Partitions", thus PGTP, using
242
        METIS built-in functions
243

244
        See METIS usage:
245
            http://metis.readthedocs.io/en/latest/index.html
246
        """
247
        if self._num_parts == 1:
2✔
248
            edgecuts = 0
2✔
249
            metis_parts = [0] * len(self._G.nodes())
2✔
250
        else:
251
            # prepare METIS parameters
252
            recursive_param = False if self._ptype == "kway" else True
2✔
253
            if recursive_param and self._obj_type == "vol":
2✔
254
                raise GPGTException(
×
255
                    "Recursive partitioning does not support\
256
                 total volume minimisation."
257
                )
258

259
            if self._drop_list_len > 1e7:
2✔
260
                import resource
×
261

262
                logger.info(
×
263
                    "RSS before METIS partitioning: %.2f GB",
264
                    resource.getrusage(resource.RUSAGE_SELF)[2] / 1024.0**2,
265
                )
266

267
            # Call METIS C-lib
268
            (edgecuts, metis_parts) = self._metis.part_graph(
2✔
269
                self._G,
270
                nparts=self._num_parts,
271
                recursive=recursive_param,
272
                objtype=self._obj_type,
273
                ufactor=self._u_factor,
274
            )
275

276
        # Output some partitioning result metadata
277
        if outdict is not None:
2✔
278
            outdict["edgecuts"] = edgecuts
×
279
        # self._set_metis_log(" - Data movement: {0}".format(edgecuts))
280
        self._data_movement = edgecuts
2✔
281

282
        if visual:
2✔
283
            if self.dag is None:
2✔
284
                self._dag = DAGUtil.build_dag_from_drops(self._drop_list)
2✔
285
            jsobj = super(MetisPGTP, self).to_gojs_json(string_rep=False, visual=visual)
2✔
286
        else:
287
            jsobj = None
2✔
288
        self._parse_metis_output(metis_parts, jsobj)
2✔
289
        self._metis_out = metis_parts
2✔
290
        self._gojs_json_obj = jsobj  # could be none if not visual
2✔
291
        if string_rep and jsobj is not None:
2✔
292
            return json.dumps(jsobj, indent=2)
×
293
        else:
294
            return jsobj
2✔
295

296
    def merge_partitions(
2✔
297
        self, new_num_parts, form_island=False, island_type=0, visual=False
298
    ):
299
        """
300
        This is called during resource mapping - deploying partitioned PGT to
301
        a list of nodes
302

303
        form_island:    If True, the merging will form `new_num_parts` logical
304
                        islands on top of existing partitions (i.e. nodes). this
305
                        is also known as "reference-based merging"
306

307
                        If False, the merging will physically merge current
308
                        partitions into `new_num_parts` new partitions (i.e. nodes)
309
                        Thus, there will be no node-island 'hierarchies' created
310

311
        island_type:    integer, 0 - data island, 1 - compute island
312

313
        """
314
        # 0. parse the output and get all the partitions
315
        if not self._can_merge(new_num_parts):
2✔
316
            return
×
317

318
        GG = self._G
2✔
319
        part_edges = defaultdict(int)  # k: from_gid + to_gid, v: sum_of_weight
2✔
320
        for e in GG.edges(data=True):
2✔
321
            from_gid = GG.nodes[e[0]]["gid"]
2✔
322
            to_gid = GG.nodes[e[1]]["gid"]
2✔
323
            k = "{0}**{1}".format(from_gid, to_gid)
2✔
324
            part_edges[k] += e[2]["weight"]
2✔
325

326
        # 1. build the bi-directional graph again
327
        # with each partition being a node
328
        G = nx.Graph()
2✔
329
        G.graph["edge_weight_attr"] = "weight"
2✔
330
        G.graph["node_weight_attr"] = "weight"
2✔
331
        G.graph["node_size_attr"] = "size"
2✔
332
        for gid, v in self._group_workloads.items():
2✔
333
            # for compute islands, we need to count the # of nodes instead of
334
            # the actual workload
335
            twv = 1 if (island_type == 1) else v[0]
2✔
336
            G.add_node(gid, weight=twv, size=v[1])
2✔
337
        for glinks, v in part_edges.items():
2✔
338
            gl = glinks.split("**")
2✔
339
            G.add_edge(int(gl[0]), int(gl[1]), weight=v)
2✔
340

341
        if new_num_parts == 1:
2✔
342
            (edgecuts, metis_parts) = (0, [0] * len(G.nodes()))
×
343
        else:
344
            (edgecuts, metis_parts) = self._metis.part_graph(
2✔
345
                G, nparts=new_num_parts, ufactor=1
346
            )
347
        tmp_map = self._gid_island_id_map
2✔
348
        islands = set()
2✔
349
        for gid, island_id in zip(G.nodes(), metis_parts):
2✔
350
            tmp_map[gid] = island_id
2✔
351
            islands.add(island_id)
2✔
352
        if not form_island:
2✔
353
            ogm = self._oid_gid_map
×
354
            gnodes = GG.nodes(data=True)
×
355
            for gnode in gnodes:
×
356
                gnode = gnode[1]
×
357
                oid = gnode["oid"]
×
358
                old_gid = gnode["gid"]
×
359
                new_gid = tmp_map[old_gid]
×
360
                ogm[oid] = new_gid
×
361
                gnode["gid"] = new_gid
×
362
            self._num_parts_done = new_num_parts
×
363
        else:
364
            if (
2✔
365
                island_type == 1
366
                and (self.dag is not None)
367
                and (self._metis_out is not None)
368
            ):
369
                # update intra-comp_island edge weight given it has a different
370
                # bandwith compared to inter-comp_island
371
                metis_out = self._metis_out
×
372
                for e in self.dag.edges(data=True):
×
373
                    gid0 = metis_out[e[0] - 1]
×
374
                    gid1 = metis_out[e[1] - 1]
×
375
                    if tmp_map[gid0] == tmp_map[gid1]:
×
376
                        e[2]["weight"] /= self._bw_ratio
×
377
                self._data_movement = None  # force to refresh data_movment
×
378
            # add GOJS groups for visualisation
379
            self._partition_merged = new_num_parts
2✔
380
            if visual:
2✔
381
                island_label = "%s_Island" % (
×
382
                    self._island_labels[island_type % len(self._island_labels)]
383
                )
384
                start_k = self._drop_list_len + 1
×
385
                # start_i = len(node_list) + 1
386
                start_i = start_k + self._num_parts_done
×
387
                node_list = self._node_list
×
388
                for island_id in islands:
×
389
                    # print('island_id = ', island_id)
390
                    gn = dict()
×
391
                    gn["key"] = island_id + start_i
×
392
                    gn["isGroup"] = True
×
393
                    gn["name"] = "{1}_{0}".format(island_id + 1, island_label)
×
394
                    node_list.append(gn)
×
395
                inner_parts = self._inner_parts
×
396
                for ip in inner_parts:
×
397
                    ip["group"] = tmp_map[ip["key"] - start_k] + start_i
×
398

399

400
class MySarkarPGTP(PGT):
2✔
401
    """
402
    use the MySarkarScheduler to produce the PGTP
403
    """
404

405
    def __init__(
2✔
406
        self,
407
        drop_list,
408
        num_partitions=0,
409
        par_label="Partition",
410
        max_dop=8,
411
        merge_parts=False,
412
    ):
413
        """
414
        num_partitions: 0 - only do the initial logical partition
415
                        >1 - does logical partition, partition mergeing and
416
                        physical mapping
417
                This parameter will simply ignored
418
                To control the number of partitions, please call
419
                def merge_partitions(self, new_num_parts, form_island=False)
420
        """
421
        super(MySarkarPGTP, self).__init__(drop_list, build_dag=False)
2✔
422
        self._dag = DAGUtil.build_dag_from_drops(self._drop_list, fake_super_root=False)
2✔
423
        self._num_parts = num_partitions
2✔
424
        self._max_dop = max_dop  # max dop per partition
2✔
425
        self._par_label = par_label
2✔
426
        # self._lpl = None # longest path
427
        self._ptime = None  # partition time
2✔
428
        self._merge_parts = merge_parts
2✔
429
        # self._edge_cuts = None
430
        self._partitions = None
2✔
431

432
        self.init_scheduler()
2✔
433

434
    def init_scheduler(self):
2✔
435
        self._scheduler = MySarkarScheduler(
2✔
436
            self._drop_list, self._max_dop, dag=self.dag
437
        )
438

439
    def get_partition_info(self):
2✔
440
        """
441
        partition parameter and log entry
442
        return a string
443
        """
444
        return "Edge Zero"
2✔
445

446
    def to_partition_input(self, outf):
2✔
447
        pass
×
448

449
    def _extra_result(self, ret):
2✔
450
        ret["num_parts"] = self._num_parts_done
2✔
451

452
    def merge_partitions(
2✔
453
        self, new_num_parts, form_island=False, island_type=0, visual=False
454
    ):
455
        """
456
        This is called during resource mapping - deploying partitioned PGT to
457
        a list of nodes
458

459
        form_island:    If True, the merging will form `new_num_parts` logical
460
                        islands on top of existing partitions (i.e. nodes)
461

462
                        If False, the merging will physically merge current
463
                        partitions into `new_num_parts` new partitions (i.e. nodes)
464
                        Thus, there will be no node-island 'hierarchies' created
465

466
        island_type:    integer, 0 - data island, 1 - compute island
467
        """
468
        if not self._can_merge(new_num_parts):
2✔
469
            return
×
470

471
        # G = self._scheduler._dag
472
        G = self.dag
2✔
473
        inner_parts = self._inner_parts
2✔
474
        parts = self._partitions
2✔
475
        groups = self._groups
2✔
476
        key_dict = self._grp_key_dict
2✔
477
        in_out_part_map = dict()
2✔
478
        outer_groups = set()
2✔
479
        if new_num_parts > 1:
2✔
480
            self._scheduler.merge_partitions(new_num_parts, bal_cond=island_type)
2✔
481
        else:
482
            # all parts share the same outer group (island) when # of island == 1
483
            ppid = self._drop_list_len + len(groups) + 1
×
484
            for part in parts:
×
485
                part.parent_id = ppid
×
486

487
        start_k = self._drop_list_len + 1
2✔
488
        start_i = start_k + self._num_parts_done
2✔
489

490
        for part in parts:
2✔
491
            # parent_id starts from
492
            # len(self._drop_list) + len(self._parts) + 1, which is the same as
493
            # start_i
494
            island_id = part.parent_id - start_i  # make sure island_id starts from 0
2✔
495
            outer_groups.add(island_id)
2✔
496
            in_out_part_map[part.partition_id - start_k] = island_id
2✔
497

498
        self._gid_island_id_map = in_out_part_map
2✔
499
        if not form_island:
2✔
500
            # update to new gid
501
            self_oid_gid_map = self._oid_gid_map
2✔
502
            for gnode_g in G.nodes(data=True):
2✔
503
                gnode = gnode_g[1]
2✔
504
                oid = gnode["drop_spec"]["oid"]
2✔
505
                ggid = gnode["gid"] - start_k
2✔
506
                new_ggid = in_out_part_map[ggid]
2✔
507
                self_oid_gid_map[oid] = new_ggid
2✔
508
            # for node in node_list:
509
            #     ggid = node.get('group', None)
510
            #     if (ggid is not None):
511
            #         new_ggid = in_out_part_map[ggid - start_k]
512
            #         self._oid_gid_map[node['oid']] = new_ggid
513
            self._num_parts_done = new_num_parts
2✔
514
        else:
515
            self._partition_merged = new_num_parts
2✔
516
            if island_type == 1:
2✔
517
                for e in self.dag.edges(data=True):
×
518
                    # update edege weights within the same compute island
519
                    if in_out_part_map.get(
×
520
                        key_dict[e[0]] - start_k, -0.1
521
                    ) == in_out_part_map.get(key_dict[e[1]] - start_k, -0.2):
522
                        # print("e[2]['weight'] =", e[2]['weight'])
523
                        e[2]["weight"] /= self._bw_ratio
×
524
            if visual:
2✔
525
                island_label = "%s_Island" % (
×
526
                    self._island_labels[island_type % len(self._island_labels)]
527
                )
528
                node_list = self._node_list
×
529
                for island_id in outer_groups:
×
530
                    gn = dict()
×
531
                    gn["key"] = island_id + start_i
×
532
                    gn["isGroup"] = True
×
533
                    gn["name"] = "{1}_{0}".format(island_id + 1, island_label)
×
534
                    node_list.append(gn)
×
535

536
                for ip in inner_parts:
×
NEW
537
                    ip["group"] = in_out_part_map[ip["key"] - start_k] + start_i
×
538

539
    def to_gojs_json(self, string_rep=True, outdict=None, visual=False):
2✔
540
        """
541
        Partition the PGT into a real "PGT with Partitions", thus PGTP
542
        """
543
        # import inspect
544
        # print("gojs_json called within MyKarkarPGTP from {0}".format(inspect.stack()[1][3]))
545

546
        if self._num_parts_done == 0 and self._partitions is None:
2✔
547
            (
2✔
548
                self._num_parts_done,
549
                _,
550
                self._ptime,
551
                self._partitions,
552
            ) = self._scheduler.partition_dag()
553
            # print("%s: _num_parts_done = %d" % (self.__class__.__name__, self._num_parts_done))
554
            # print("len(self._partitions) = %d" % (len(self._partitions)))
555
            # for part in self._partitions:
556
            #     print('Partition: {0}, Actual DoP = {1}, Required DoP = {2}'.\
557
            #                 format(part._gid, part._max_dop, part._ask_max_dop))
558
        G = self.dag
2✔
559
        # logger.debug("The same DAG? ", (G == self.dag))
560
        start_k = self._drop_list_len + 1  # starting gojs group_id
2✔
561
        self_oid_gid_map = self._oid_gid_map
2✔
562
        groups = set()
2✔
563
        key_dict = dict()  # k - gojs key, v - gojs group_id
2✔
564
        root_gid = None
2✔
565
        for gnode_g in G.nodes(data=True):
2✔
566
            gnode = gnode_g[1]
2✔
567
            oid = gnode["drop_spec"]["oid"]
2✔
568
            if "-92" == oid:
2✔
569
                root_gid = gnode["gid"]
×
570
                # print("hit fake super root, gid = {0}".format(root_gid))
571
                continue  # super_fake_root, so skip it
×
572
            gid = gnode["gid"]
2✔
573
            key_dict[gnode_g[0]] = gid
2✔
574
            self_oid_gid_map[oid] = gid - start_k
2✔
575
            groups.add(gid)
2✔
576
        if root_gid not in groups:
2✔
577
            # the super root has its own partition, which has no other Drops
578
            # so ditch this extra partition!
579
            new_partitions = []
2✔
580
            for part in self._partitions:
2✔
581
                if part._gid != root_gid:
2✔
582
                    new_partitions.append(part)
2✔
583
            self._partitions = new_partitions
2✔
584
            self._num_parts_done = len(new_partitions)
2✔
585
        self._groups = groups
2✔
586
        self._grp_key_dict = key_dict
2✔
587
        # print("group length = %d" % len(groups))
588
        # print('groups', groups)
589

590
        if visual:
2✔
591
            jsobj = super(MySarkarPGTP, self).to_gojs_json(
2✔
592
                string_rep=False, visual=visual
593
            )
594

595
            node_list = jsobj["nodeDataArray"]
2✔
596
            for node in node_list:
2✔
597
                gid = G.nodes[node["key"]]["gid"]  # gojs group_id
2✔
598
                if gid is None:
2✔
599
                    raise GPGTException(
×
600
                        "Node {0} does not have a Partition".format(node["key"])
601
                    )
602
                node["group"] = gid
2✔
603
                # key_dict[node['key']] = gid
604
                # self._oid_gid_map[node['oid']] = gid - start_k # real gid starts from 0
605
            inner_parts = []
2✔
606
            for gid in groups:
2✔
607
                gn = dict()
2✔
608
                gn["key"] = gid
2✔
609
                # logger.debug("group key = {0}".format(gid))
610
                gn["isGroup"] = True
2✔
611
                # gojs group_id label starts from 1
612
                # so "gid - leng" instead of "gid - start_k"
613
                gn["name"] = "{1}_{0}".format((gid - start_k + 1), self._par_label)
2✔
614
                node_list.append(gn)
2✔
615
                inner_parts.append(gn)
2✔
616

617
            self._node_list = node_list
2✔
618
            self._inner_parts = inner_parts
2✔
619
            self._gojs_json_obj = jsobj
2✔
620
            if string_rep and jsobj is not None:
2✔
621
                return json.dumps(jsobj, indent=2)
×
622
            else:
623
                return jsobj
2✔
624
        else:
625
            self._gojs_json_obj = None
2✔
626
            return None
2✔
627

628

629
class MinNumPartsPGTP(MySarkarPGTP):
2✔
630
    def __init__(
2✔
631
        self,
632
        drop_list,
633
        deadline,
634
        num_partitions=0,
635
        par_label="Partition",
636
        max_dop=8,
637
        merge_parts=False,
638
        optimistic_factor=0.5,
639
    ):
640
        """
641
        num_partitions: 0 - only do the initial logical partition
642
                        >1 - does logical partition, partition mergeing and
643
                        physical mapping
644
        """
645
        self._deadline = deadline
2✔
646
        self._opf = optimistic_factor
2✔
647
        super(MinNumPartsPGTP, self).__init__(
2✔
648
            drop_list, num_partitions, par_label, max_dop, merge_parts
649
        )
650
        # force it to re-calculate the extra drops due to extra links
651
        # during linearisation
652
        self._extra_drops = None
2✔
653

654
    def get_partition_info(self):
2✔
655
        return "Lookahead"
2✔
656

657
    def init_scheduler(self):
2✔
658
        self._scheduler = MinNumPartsScheduler(
2✔
659
            self._drop_list,
660
            self._deadline,
661
            max_dop=self._max_dop,
662
            dag=self.dag,
663
            optimistic_factor=self._opf,
664
        )
665

666

667
class PSOPGTP(MySarkarPGTP):
2✔
668
    def __init__(
2✔
669
        self,
670
        drop_list,
671
        par_label="Partition",
672
        max_dop=8,
673
        deadline=None,
674
        topk=30,
675
        swarm_size=40,
676
        merge_parts=False,
677
    ):
678
        """
679
        PSO-based PGTP
680
        """
681
        self._deadline = deadline
×
682
        self._topk = topk
×
683
        self._swarm_size = swarm_size
×
NEW
684
        super(PSOPGTP, self).__init__(drop_list, 0, par_label, max_dop, merge_parts)
×
UNCOV
685
        self._extra_drops = None
×
686

687
    def get_partition_info(self):
2✔
688
        return "Particle Swarm"
×
689

690
    def init_scheduler(self):
2✔
691
        self._scheduler = PSOScheduler(
×
692
            self._drop_list,
693
            max_dop=self._max_dop,
694
            deadline=self._deadline,
695
            dag=self.dag,
696
            topk=self._topk,
697
            swarm_size=self._swarm_size,
698
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc