• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ICRAR / daliuge / 4911681207

pending completion
4911681207

Pull #231

github

GitHub
Merge 9186e10d1 into e48989cce
Pull Request #231: Liu 355

180 of 229 new or added lines in 17 files covered. (78.6%)

26 existing lines in 5 files now uncovered.

15345 of 19059 relevant lines covered (80.51%)

1.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.87
/daliuge-translator/dlg/dropmake/pgtp.py
1
#
2
#    ICRAR - International Centre for Radio Astronomy Research
3
#    (c) UWA - The University of Western Australia, 2015
4
#    Copyright by UWA (in the framework of the ICRAR)
5
#    All rights reserved
6
#
7
#    This library is free software; you can redistribute it and/or
8
#    modify it under the terms of the GNU Lesser General Public
9
#    License as published by the Free Software Foundation; either
10
#    version 2.1 of the License, or (at your option) any later version.
11
#
12
#    This library is distributed in the hope that it will be useful,
13
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
14
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15
#    Lesser General Public License for more details.
16
#
17
#    You should have received a copy of the GNU Lesser General Public
18
#    License along with this library; if not, write to the Free Software
19
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston,
20
#    MA 02111-1307  USA
21
#
22

23
from collections import defaultdict
2✔
24
import json
2✔
25
import logging
2✔
26

27
import networkx as nx
2✔
28

29
from dlg.dropmake.pgt import PGT, GPGTException
2✔
30
from dlg.dropmake.scheduler import (
2✔
31
    MySarkarScheduler,
32
    DAGUtil,
33
    MinNumPartsScheduler,
34
    PSOScheduler,
35
)
36
from dlg.common import CategoryType
2✔
37

38
logger = logging.getLogger(__name__)
2✔
39

40

41
class MetisPGTP(PGT):
2✔
42
    """
43
    DROP and GOJS representations of Physical Graph Template with Partitions
44
    Based on METIS
45
    http://glaros.dtc.umn.edu/gkhome/metis/metis/overview
46
    """
47

48
    def __init__(
2✔
49
        self,
50
        drop_list,
51
        num_partitions=1,
52
        min_goal=0,
53
        par_label="Partition",
54
        ptype=0,
55
        ufactor=10,
56
        merge_parts=False,
57
    ):
58
        """
59
        num_partitions:  number of partitions supplied by users (int)
60
        TODO - integrate from within PYTHON module (using C API) soon!
61
        """
62
        super(MetisPGTP, self).__init__(drop_list, build_dag=False)
2✔
63
        self._metis_path = (
2✔
64
            "gpmetis"  # assuming it is installed at the sys path
65
        )
66
        if num_partitions <= 0:
2✔
67
            # self._num_parts = self.get_opt_num_parts()
68
            raise GPGTException(
×
69
                "Invalid num_partitions {0}".format(num_partitions)
70
            )
71
        else:
72
            self._num_parts = num_partitions
2✔
73
        if 1 == min_goal:
2✔
74
            self._obj_type = "vol"
×
75
        else:
76
            self._obj_type = "cut"
2✔
77

78
        if 0 == ptype:
2✔
79
            self._ptype = "kway"
2✔
80
        else:
81
            self._ptype = "rb"
×
82

83
        self._par_label = par_label
2✔
84
        self._u_factor = ufactor
2✔
85
        self._metis_logs = []
2✔
86
        self._G = self.to_partition_input()
2✔
87
        self._metis = DAGUtil.import_metis()
2✔
88
        self._group_workloads = dict()  # k - gid, v - a tuple of (tw, sz)
2✔
89
        self._merge_parts = merge_parts
2✔
90
        self._metis_out = None  # initial internal partition result
2✔
91

92
    def to_partition_input(self, outf=None):
2✔
93
        """
94
        Convert to METIS format for mapping and decomposition
95
        NOTE - Since METIS only supports Undirected Graph, we have to produce
96
        both upstream and downstream nodes to fit its input format
97
        """
98
        key_dict = dict()  # key - oid, value - GOJS key
2✔
99
        droplist = self._drop_list
2✔
100

101
        G = nx.Graph()
2✔
102
        G.graph["edge_weight_attr"] = "weight"
2✔
103
        G.graph["node_weight_attr"] = "weight"
2✔
104
        G.graph["node_size_attr"] = "size"
2✔
105

106
        for i, drop in enumerate(droplist):
2✔
107
            try:
2✔
108
                oid = drop["oid"]
2✔
109
            except KeyError:
×
110
                logger.debug("Drop does not have oid: %s", drop)
×
111
                droplist.pop(i)
×
112
            key_dict[oid] = i + 1  # METIS index starts from 1
2✔
113

114
        logger.info("Metis partition input progress - dropdict is built")
2✔
115

116
        if self._drop_list_len > 1e7:
2✔
117
            import resource
×
118

119
            logger.info(
×
120
                "self._drop_list, max RSS: %.2f GB",
121
                resource.getrusage(resource.RUSAGE_SELF)[2] / 1024.0**2,
122
            )
123

124
        for i, drop in enumerate(droplist):
2✔
125
            oid = drop["oid"]
2✔
126
            myk = i + 1
2✔
127
            tt = drop["categoryType"]
2✔
128
            if tt in [CategoryType.DATA, "data"]:
2✔
129
                dst = "consumers"  # outbound keyword
2✔
130
                ust = "producers"
2✔
131
                tw = 1  # task weight is zero for a Data DROP
2✔
132
                sz = drop.get("weight", 1)  # size
2✔
133
            elif tt in [CategoryType.APPLICATION, "app"]:
2✔
134
                dst = "outputs"
2✔
135
                ust = "inputs"
2✔
136
                tw = drop.get("weight", 1)
2✔
137
                sz = 1
2✔
138
            G.add_node(myk, weight=tw, size=sz, oid=oid)
2✔
139
            adj_drops = []  # adjacent drops (all neighbours)
2✔
140
            if dst in drop:
2✔
141
                adj_drops += drop[dst]
2✔
142
            if ust in drop:
2✔
143
                adj_drops += drop[ust]
2✔
144

145
            for inp in adj_drops:
2✔
146
                key = list(inp.keys())[0] if isinstance(inp, dict) else inp
2✔
147
                if tt in [CategoryType.DATA, "data"]:
2✔
148
                    lw = drop["weight"]
2✔
149
                elif tt in [CategoryType.APPLICATION, "app"]:
2✔
150
                    # get the weight of the previous drop
151
                    lw = droplist[key_dict[key] - 1].get("weight", 1)
2✔
152
                if lw <= 0:
2✔
153
                    lw = 1
×
154
                G.add_edge(myk, key_dict[key], weight=lw)
2✔
155
        # for e in G.edges(data=True):
156
        #     if (e[2]['weight'] == 0):
157
        #         e[2]['weight'] = 1
158
        if self._drop_list_len > 1e7:
2✔
159
            import resource
×
160

161
            logger.info(
×
162
                "Max RSS after creating the Graph: %.2f GB",
163
                resource.getrusage(resource.RUSAGE_SELF)[2] / 1024.0**2,
164
            )
165
        return G
2✔
166

167
    def _set_metis_log(self, logtext):
2✔
168
        self._metis_logs = logtext.split("\n")
×
169

170
    def get_partition_info(self):
2✔
171
        """
172
        partition parameter and log entry
173
        return a string
174
        """
175
        return "METIS_LB{0}".format(101 - self._u_factor)
2✔
176

177
    def _extra_result(self, ret):
2✔
178
        ret["num_parts"] = self._num_parts
2✔
179

180
    def _parse_metis_output(self, metis_out, jsobj):
2✔
181
        """
182
        1. parse METIS result, and add group node into the GOJS json
183
        2. also update edge weight for self._dag
184
        """
185
        # key_dict = dict() #k - gojs key, v - gojs group id
186
        groups = set()
2✔
187
        ogm = self._oid_gid_map
2✔
188
        group_weight = (
2✔
189
            self._group_workloads
190
        )  # k - gid, v - a tuple of (tw, sz)
191
        G = self._G
2✔
192
        # start_k = len(self._drop_list) + 1
193
        start_k = self._drop_list_len + 1
2✔
194
        for gnode, gid in zip(G.nodes(data=True), metis_out):
2✔
195
            groups.add(gid)
2✔
196
            gnode = gnode[1]
2✔
197
            ogm[gnode["oid"]] = gid
2✔
198
            gnode["gid"] = gid
2✔
199

200
        # house keeping after partitioning
201
        self._num_parts_done = len(groups)
2✔
202
        if self.dag is not None:
2✔
203
            for e in self.dag.edges(data=True):
2✔
204
                gid0 = metis_out[e[0] - 1]
2✔
205
                gid1 = metis_out[e[1] - 1]
2✔
206
                if gid0 == gid1:
2✔
207
                    e[2]["weight"] = 0
2✔
208

209
        # the following is for potential partition merging into islands
210
        if self._merge_parts:
2✔
211
            for gid in groups:
2✔
212
                group_weight[gid] = [0, 0]
2✔
213
            for gnode in G.nodes(data=True):
2✔
214
                tt = group_weight[gnode[1]["gid"]]
2✔
215
                try:
2✔
216
                    tt[0] += int(gnode[1]["weight"])
2✔
217
                    tt[1] += int(gnode[1]["size"])
2✔
NEW
218
                except ValueError:
×
NEW
219
                    tt[0] = 1
×
NEW
220
                    tt[1] = 1
×
221
        # the following is for visualisation using GOJS
222
        if jsobj is not None:
2✔
223
            node_list = jsobj["nodeDataArray"]
2✔
224
            for node in node_list:
2✔
225
                nid = int(node["key"])
2✔
226
                gid = metis_out[nid - 1]
2✔
227
                node["group"] = gid + start_k
2✔
228

229
            inner_parts = []
2✔
230
            for gid in groups:
2✔
231
                gn = dict()
2✔
232
                gn["key"] = start_k + gid
2✔
233
                gn["isGroup"] = True
2✔
234
                gn["name"] = "{1}_{0}".format(gid + 1, self._par_label)
2✔
235
                node_list.append(gn)
2✔
236
                inner_parts.append(gn)
2✔
237

238
            self._inner_parts = inner_parts
2✔
239
            self._node_list = node_list
2✔
240

241
    def to_gojs_json(self, string_rep=True, outdict=None, visual=False):
2✔
242
        """
243
        Partition the PGT into a real "PGT with Partitions", thus PGTP, using
244
        METIS built-in functions
245

246
        See METIS usage:
247
            http://metis.readthedocs.io/en/latest/index.html
248
        """
249
        if self._num_parts == 1:
2✔
250
            edgecuts = 0
2✔
251
            metis_parts = [0] * len(self._G.nodes())
2✔
252
        else:
253
            # prepare METIS parameters
254
            recursive_param = False if self._ptype == "kway" else True
2✔
255
            if recursive_param and self._obj_type == "vol":
2✔
256
                raise GPGTException(
×
257
                    "Recursive partitioning does not support\
258
                 total volume minimisation."
259
                )
260

261
            if self._drop_list_len > 1e7:
2✔
262
                import resource
×
263

264
                logger.info(
×
265
                    "RSS before METIS partitioning: %.2f GB",
266
                    resource.getrusage(resource.RUSAGE_SELF)[2] / 1024.0**2,
267
                )
268

269
            # Call METIS C-lib
270
            (edgecuts, metis_parts) = self._metis.part_graph(
2✔
271
                self._G,
272
                nparts=self._num_parts,
273
                recursive=recursive_param,
274
                objtype=self._obj_type,
275
                ufactor=self._u_factor,
276
            )
277

278
        # Output some partitioning result metadata
279
        if outdict is not None:
2✔
280
            outdict["edgecuts"] = edgecuts
×
281
        # self._set_metis_log(" - Data movement: {0}".format(edgecuts))
282
        self._data_movement = edgecuts
2✔
283

284
        if visual:
2✔
285
            if self.dag is None:
2✔
286
                self._dag = DAGUtil.build_dag_from_drops(self._drop_list)
2✔
287
            jsobj = super(MetisPGTP, self).to_gojs_json(
2✔
288
                string_rep=False, visual=visual
289
            )
290
        else:
291
            jsobj = None
2✔
292
        self._parse_metis_output(metis_parts, jsobj)
2✔
293
        self._metis_out = metis_parts
2✔
294
        self._gojs_json_obj = jsobj  # could be none if not visual
2✔
295
        if string_rep and jsobj is not None:
2✔
296
            return json.dumps(jsobj, indent=2)
2✔
297
        else:
298
            return jsobj
2✔
299

300
    def merge_partitions(
2✔
301
        self, new_num_parts, form_island=False, island_type=0, visual=False
302
    ):
303
        """
304
        This is called during resource mapping - deploying partitioned PGT to
305
        a list of nodes
306

307
        form_island:    If True, the merging will form `new_num_parts` logical
308
                        islands on top of existing partitions (i.e. nodes). this
309
                        is also known as "reference-based merging"
310

311
                        If False, the merging will physically merge current
312
                        partitions into `new_num_parts` new partitions (i.e. nodes)
313
                        Thus, there will be no node-island 'hierarchies' created
314

315
        island_type:    integer, 0 - data island, 1 - compute island
316

317
        """
318
        # 0. parse the output and get all the partitions
319
        if not self._can_merge(new_num_parts):
2✔
320
            return
×
321

322
        GG = self._G
2✔
323
        part_edges = defaultdict(int)  # k: from_gid + to_gid, v: sum_of_weight
2✔
324
        for e in GG.edges(data=True):
2✔
325
            from_gid = GG.nodes[e[0]]["gid"]
2✔
326
            to_gid = GG.nodes[e[1]]["gid"]
2✔
327
            k = "{0}**{1}".format(from_gid, to_gid)
2✔
328
            part_edges[k] += e[2]["weight"]
2✔
329

330
        # 1. build the bi-directional graph again
331
        # with each partition being a node
332
        G = nx.Graph()
2✔
333
        G.graph["edge_weight_attr"] = "weight"
2✔
334
        G.graph["node_weight_attr"] = "weight"
2✔
335
        G.graph["node_size_attr"] = "size"
2✔
336
        for gid, v in self._group_workloads.items():
2✔
337
            # for compute islands, we need to count the # of nodes instead of
338
            # the actual workload
339
            twv = 1 if (island_type == 1) else v[0]
2✔
340
            G.add_node(gid, weight=twv, size=v[1])
2✔
341
        for glinks, v in part_edges.items():
2✔
342
            gl = glinks.split("**")
2✔
343
            G.add_edge(int(gl[0]), int(gl[1]), weight=v)
2✔
344

345
        if new_num_parts == 1:
2✔
346
            (edgecuts, metis_parts) = (0, [0] * len(G.nodes()))
×
347
        else:
348
            (edgecuts, metis_parts) = self._metis.part_graph(
2✔
349
                G, nparts=new_num_parts, ufactor=1
350
            )
351
        tmp_map = self._gid_island_id_map
2✔
352
        islands = set()
2✔
353
        for gid, island_id in zip(G.nodes(), metis_parts):
2✔
354
            tmp_map[gid] = island_id
2✔
355
            islands.add(island_id)
2✔
356
        if not form_island:
2✔
357
            ogm = self._oid_gid_map
×
358
            gnodes = GG.nodes(data=True)
×
359
            for gnode in gnodes:
×
360
                gnode = gnode[1]
×
361
                oid = gnode["oid"]
×
362
                old_gid = gnode["gid"]
×
363
                new_gid = tmp_map[old_gid]
×
364
                ogm[oid] = new_gid
×
365
                gnode["gid"] = new_gid
×
366
            self._num_parts_done = new_num_parts
×
367
        else:
368
            if (
2✔
369
                island_type == 1
370
                and (self.dag is not None)
371
                and (self._metis_out is not None)
372
            ):
373
                # update intra-comp_island edge weight given it has a different
374
                # bandwith compared to inter-comp_island
375
                metis_out = self._metis_out
×
376
                for e in self.dag.edges(data=True):
×
377
                    gid0 = metis_out[e[0] - 1]
×
378
                    gid1 = metis_out[e[1] - 1]
×
379
                    if tmp_map[gid0] == tmp_map[gid1]:
×
380
                        e[2]["weight"] /= self._bw_ratio
×
381
                self._data_movement = None  # force to refresh data_movment
×
382
            # add GOJS groups for visualisation
383
            self._partition_merged = new_num_parts
2✔
384
            if visual:
2✔
385
                island_label = "%s_Island" % (
×
386
                    self._island_labels[island_type % len(self._island_labels)]
387
                )
388
                start_k = self._drop_list_len + 1
×
389
                # start_i = len(node_list) + 1
390
                start_i = start_k + self._num_parts_done
×
391
                node_list = self._node_list
×
392
                for island_id in islands:
×
393
                    # print('island_id = ', island_id)
394
                    gn = dict()
×
395
                    gn["key"] = island_id + start_i
×
396
                    gn["isGroup"] = True
×
397
                    gn["name"] = "{1}_{0}".format(island_id + 1, island_label)
×
398
                    node_list.append(gn)
×
399
                inner_parts = self._inner_parts
×
400
                for ip in inner_parts:
×
401
                    ip["group"] = tmp_map[ip["key"] - start_k] + start_i
×
402

403

404
class MySarkarPGTP(PGT):
2✔
405
    """
406
    use the MySarkarScheduler to produce the PGTP
407
    """
408

409
    def __init__(
2✔
410
        self,
411
        drop_list,
412
        num_partitions=0,
413
        par_label="Partition",
414
        max_dop=8,
415
        merge_parts=False,
416
    ):
417
        """
418
        num_partitions: 0 - only do the initial logical partition
419
                        >1 - does logical partition, partition mergeing and
420
                        physical mapping
421
                This parameter will simply ignored
422
                To control the number of partitions, please call
423
                def merge_partitions(self, new_num_parts, form_island=False)
424
        """
425
        super(MySarkarPGTP, self).__init__(drop_list, build_dag=False)
2✔
426
        self._dag = DAGUtil.build_dag_from_drops(
2✔
427
            self._drop_list, fake_super_root=False
428
        )
429
        self._num_parts = num_partitions
2✔
430
        self._max_dop = max_dop  # max dop per partition
2✔
431
        self._par_label = par_label
2✔
432
        # self._lpl = None # longest path
433
        self._ptime = None  # partition time
2✔
434
        self._merge_parts = merge_parts
2✔
435
        # self._edge_cuts = None
436
        self._partitions = None
2✔
437

438
        self.init_scheduler()
2✔
439

440
    def init_scheduler(self):
2✔
441
        self._scheduler = MySarkarScheduler(
2✔
442
            self._drop_list, self._max_dop, dag=self.dag
443
        )
444

445
    def get_partition_info(self):
2✔
446
        """
447
        partition parameter and log entry
448
        return a string
449
        """
450
        return "Edge Zero"
2✔
451

452
    def to_partition_input(self, outf):
2✔
453
        pass
×
454

455
    def _extra_result(self, ret):
2✔
456
        ret["num_parts"] = self._num_parts_done
2✔
457

458
    def merge_partitions(
2✔
459
        self, new_num_parts, form_island=False, island_type=0, visual=False
460
    ):
461
        """
462
        This is called during resource mapping - deploying partitioned PGT to
463
        a list of nodes
464

465
        form_island:    If True, the merging will form `new_num_parts` logical
466
                        islands on top of existing partitions (i.e. nodes)
467

468
                        If False, the merging will physically merge current
469
                        partitions into `new_num_parts` new partitions (i.e. nodes)
470
                        Thus, there will be no node-island 'hierarchies' created
471

472
        island_type:    integer, 0 - data island, 1 - compute island
473
        """
474
        if not self._can_merge(new_num_parts):
2✔
475
            return
×
476

477
        # G = self._scheduler._dag
478
        G = self.dag
2✔
479
        inner_parts = self._inner_parts
2✔
480
        parts = self._partitions
2✔
481
        groups = self._groups
2✔
482
        key_dict = self._grp_key_dict
2✔
483
        in_out_part_map = dict()
2✔
484
        outer_groups = set()
2✔
485
        if new_num_parts > 1:
2✔
486
            self._scheduler.merge_partitions(
2✔
487
                new_num_parts, bal_cond=island_type
488
            )
489
        else:
490
            # all parts share the same outer group (island) when # of island == 1
491
            ppid = self._drop_list_len + len(groups) + 1
×
492
            for part in parts:
×
493
                part.parent_id = ppid
×
494

495
        start_k = self._drop_list_len + 1
2✔
496
        start_i = start_k + self._num_parts_done
2✔
497

498
        for part in parts:
2✔
499
            # parent_id starts from
500
            # len(self._drop_list) + len(self._parts) + 1, which is the same as
501
            # start_i
502
            island_id = (
2✔
503
                part.parent_id - start_i
504
            )  # make sure island_id starts from 0
505
            outer_groups.add(island_id)
2✔
506
            in_out_part_map[part.partition_id - start_k] = island_id
2✔
507

508
        self._gid_island_id_map = in_out_part_map
2✔
509
        if not form_island:
2✔
510
            # update to new gid
511
            self_oid_gid_map = self._oid_gid_map
2✔
512
            for gnode_g in G.nodes(data=True):
2✔
513
                gnode = gnode_g[1]
2✔
514
                oid = gnode["drop_spec"]["oid"]
2✔
515
                ggid = gnode["gid"] - start_k
2✔
516
                new_ggid = in_out_part_map[ggid]
2✔
517
                self_oid_gid_map[oid] = new_ggid
2✔
518
            # for node in node_list:
519
            #     ggid = node.get('group', None)
520
            #     if (ggid is not None):
521
            #         new_ggid = in_out_part_map[ggid - start_k]
522
            #         self._oid_gid_map[node['oid']] = new_ggid
523
            self._num_parts_done = new_num_parts
2✔
524
        else:
525
            self._partition_merged = new_num_parts
2✔
526
            if island_type == 1:
2✔
527
                for e in self.dag.edges(data=True):
×
528
                    # update edege weights within the same compute island
529
                    if in_out_part_map.get(
×
530
                        key_dict[e[0]] - start_k, -0.1
531
                    ) == in_out_part_map.get(key_dict[e[1]] - start_k, -0.2):
532
                        # print("e[2]['weight'] =", e[2]['weight'])
533
                        e[2]["weight"] /= self._bw_ratio
×
534
            if visual:
2✔
535
                island_label = "%s_Island" % (
×
536
                    self._island_labels[island_type % len(self._island_labels)]
537
                )
538
                node_list = self._node_list
×
539
                for island_id in outer_groups:
×
540
                    gn = dict()
×
541
                    gn["key"] = island_id + start_i
×
542
                    gn["isGroup"] = True
×
543
                    gn["name"] = "{1}_{0}".format(island_id + 1, island_label)
×
544
                    node_list.append(gn)
×
545

546
                for ip in inner_parts:
×
547
                    ip["group"] = (
×
548
                        in_out_part_map[ip["key"] - start_k] + start_i
549
                    )
550

551
    def to_gojs_json(self, string_rep=True, outdict=None, visual=False):
2✔
552
        """
553
        Partition the PGT into a real "PGT with Partitions", thus PGTP
554
        """
555
        # import inspect
556
        # print("gojs_json called within MyKarkarPGTP from {0}".format(inspect.stack()[1][3]))
557

558
        if self._num_parts_done == 0 and self._partitions is None:
2✔
559
            (
2✔
560
                self._num_parts_done,
561
                _,
562
                self._ptime,
563
                self._partitions,
564
            ) = self._scheduler.partition_dag()
565
            # print("%s: _num_parts_done = %d" % (self.__class__.__name__, self._num_parts_done))
566
            # print("len(self._partitions) = %d" % (len(self._partitions)))
567
            # for part in self._partitions:
568
            #     print('Partition: {0}, Actual DoP = {1}, Required DoP = {2}'.\
569
            #                 format(part._gid, part._max_dop, part._ask_max_dop))
570
        G = self.dag
2✔
571
        # logger.debug("The same DAG? ", (G == self.dag))
572
        start_k = self._drop_list_len + 1  # starting gojs group_id
2✔
573
        self_oid_gid_map = self._oid_gid_map
2✔
574
        groups = set()
2✔
575
        key_dict = dict()  # k - gojs key, v - gojs group_id
2✔
576
        root_gid = None
2✔
577
        for gnode_g in G.nodes(data=True):
2✔
578
            gnode = gnode_g[1]
2✔
579
            oid = gnode["drop_spec"]["oid"]
2✔
580
            if "-92" == oid:
2✔
581
                root_gid = gnode["gid"]
×
582
                # print("hit fake super root, gid = {0}".format(root_gid))
583
                continue  # super_fake_root, so skip it
×
584
            gid = gnode["gid"]
2✔
585
            key_dict[gnode_g[0]] = gid
2✔
586
            self_oid_gid_map[oid] = gid - start_k
2✔
587
            groups.add(gid)
2✔
588
        if root_gid not in groups:
2✔
589
            # the super root has its own partition, which has no other Drops
590
            # so ditch this extra partition!
591
            new_partitions = []
2✔
592
            for part in self._partitions:
2✔
593
                if part._gid != root_gid:
2✔
594
                    new_partitions.append(part)
2✔
595
            self._partitions = new_partitions
2✔
596
            self._num_parts_done = len(new_partitions)
2✔
597
        self._groups = groups
2✔
598
        self._grp_key_dict = key_dict
2✔
599
        # print("group length = %d" % len(groups))
600
        # print('groups', groups)
601

602
        if visual:
2✔
603
            jsobj = super(MySarkarPGTP, self).to_gojs_json(
2✔
604
                string_rep=False, visual=visual
605
            )
606

607
            node_list = jsobj["nodeDataArray"]
2✔
608
            for node in node_list:
2✔
609
                gid = G.nodes[node["key"]]["gid"]  # gojs group_id
2✔
610
                if gid is None:
2✔
611
                    raise GPGTException(
×
612
                        "Node {0} does not have a Partition".format(
613
                            node["key"]
614
                        )
615
                    )
616
                node["group"] = gid
2✔
617
                # key_dict[node['key']] = gid
618
                # self._oid_gid_map[node['oid']] = gid - start_k # real gid starts from 0
619
            inner_parts = []
2✔
620
            for gid in groups:
2✔
621
                gn = dict()
2✔
622
                gn["key"] = gid
2✔
623
                # logger.debug("group key = {0}".format(gid))
624
                gn["isGroup"] = True
2✔
625
                # gojs group_id label starts from 1
626
                # so "gid - leng" instead of "gid - start_k"
627
                gn["name"] = "{1}_{0}".format(
2✔
628
                    (gid - start_k + 1), self._par_label
629
                )
630
                node_list.append(gn)
2✔
631
                inner_parts.append(gn)
2✔
632

633
            self._node_list = node_list
2✔
634
            self._inner_parts = inner_parts
2✔
635
            self._gojs_json_obj = jsobj
2✔
636
            if string_rep and jsobj is not None:
2✔
637
                return json.dumps(jsobj, indent=2)
2✔
638
            else:
639
                return jsobj
2✔
640
        else:
641
            self._gojs_json_obj = None
2✔
642
            return None
2✔
643

644

645
class MinNumPartsPGTP(MySarkarPGTP):
2✔
646
    def __init__(
2✔
647
        self,
648
        drop_list,
649
        deadline,
650
        num_partitions=0,
651
        par_label="Partition",
652
        max_dop=8,
653
        merge_parts=False,
654
        optimistic_factor=0.5,
655
    ):
656
        """
657
        num_partitions: 0 - only do the initial logical partition
658
                        >1 - does logical partition, partition mergeing and
659
                        physical mapping
660
        """
661
        self._deadline = deadline
2✔
662
        self._opf = optimistic_factor
2✔
663
        super(MinNumPartsPGTP, self).__init__(
2✔
664
            drop_list, num_partitions, par_label, max_dop, merge_parts
665
        )
666
        # force it to re-calculate the extra drops due to extra links
667
        # during linearisation
668
        self._extra_drops = None
2✔
669

670
    def get_partition_info(self):
2✔
671
        return "Lookahead"
2✔
672

673
    def init_scheduler(self):
2✔
674
        self._scheduler = MinNumPartsScheduler(
2✔
675
            self._drop_list,
676
            self._deadline,
677
            max_dop=self._max_dop,
678
            dag=self.dag,
679
            optimistic_factor=self._opf,
680
        )
681

682

683
class PSOPGTP(MySarkarPGTP):
2✔
684
    def __init__(
2✔
685
        self,
686
        drop_list,
687
        par_label="Partition",
688
        max_dop=8,
689
        deadline=None,
690
        topk=30,
691
        swarm_size=40,
692
        merge_parts=False,
693
    ):
694
        """
695
        PSO-based PGTP
696
        """
697
        self._deadline = deadline
×
698
        self._topk = topk
×
699
        self._swarm_size = swarm_size
×
700
        super(PSOPGTP, self).__init__(
×
701
            drop_list, 0, par_label, max_dop, merge_parts
702
        )
703
        self._extra_drops = None
×
704

705
    def get_partition_info(self):
2✔
706
        return "Particle Swarm"
×
707

708
    def init_scheduler(self):
2✔
709
        self._scheduler = PSOScheduler(
×
710
            self._drop_list,
711
            max_dop=self._max_dop,
712
            deadline=self._deadline,
713
            dag=self.dag,
714
            topk=self._topk,
715
            swarm_size=self._swarm_size,
716
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc