• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ICRAR / daliuge / 4908056526

pending completion
4908056526

push

github

Andreas Wicenec
Fixed small issues with existing graphs

20 of 25 new or added lines in 2 files covered. (80.0%)

88 existing lines in 5 files now uncovered.

15342 of 19053 relevant lines covered (80.52%)

1.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.61
/daliuge-translator/dlg/dropmake/scheduler.py
1
#    ICRAR - International Centre for Radio Astronomy Research
2
#    (c) UWA - The University of Western Australia, 2015
3
#    Copyright by UWA (in the framework of the ICRAR)
4
#    All rights reserved
5
#
6
#    This library is free software; you can redistribute it and/or
7
#    modify it under the terms of the GNU Lesser General Public
8
#    License as published by the Free Software Foundation; either
9
#    version 2.1 of the License, or (at your option) any later version.
10
#
11
#    This library is distributed in the hope that it will be useful,
12
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
13
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
#    Lesser General Public License for more details.
15
#
16
#    You should have received a copy of the GNU Lesser General Public
17
#    License along with this library; if not, write to the Free Software
18
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston,
19
#    MA 02111-1307  USA
20
#
21

22
import copy
2✔
23
import logging
2✔
24
import os
2✔
25
import platform
2✔
26
import time
2✔
27
from collections import defaultdict
2✔
28

29
import networkx as nx
2✔
30
import numpy as np
2✔
31
import pkg_resources
2✔
32
from pyswarm import pso
2✔
33

34
from .utils.antichains import get_max_weighted_antichain
2✔
35
from ..common import dropdict, get_roots, CategoryType
2✔
36

37
logger = logging.getLogger(__name__)
2✔
38

39
DEBUG = 0
2✔
40

41

42
class SchedulerException(Exception):
2✔
43
    pass
2✔
44

45

46
class Schedule(object):
2✔
47
    """
48
    The scheduling solution with schedule-related properties
49
    """
50

51
    def __init__(self, dag, max_dop):
2✔
52
        self._dag = dag
2✔
53
        self._max_dop = (
2✔
54
            max_dop if type(max_dop) == int else max_dop.get("num_cpus", 1)
55
        )
56
        DAGUtil.label_schedule(self._dag)
2✔
57
        self._lpl = DAGUtil.get_longest_path(
2✔
58
            self._dag, default_weight=0, show_path=True
59
        )
60
        self._wkl = None
2✔
61
        self._sma = None
2✔
62

63
    @property
2✔
64
    def makespan(self):
2✔
65
        return self._lpl[1]
2✔
66

67
    @property
2✔
68
    def longest_path(self):
2✔
69
        return self._lpl[0]
×
70

71
    @property
2✔
72
    def schedule_matrix(self):
2✔
73
        """
74
        Return: a self._lpl x self._max_dop matrix
75
                (X - time, Y - resource unit / parallel lane)
76
        """
77
        if self._sma is None:
2✔
78
            G = self._dag
2✔
79
            N = max(self.makespan, 1)
2✔
80
            if DEBUG:
2✔
81
                lpl_str = []
×
82
                lpl_c = 0
×
83
                for lpn in self.longest_path:
×
84
                    ww = G.nodes[lpn].get("num_cpus", 0)
×
85
                    lpl_str.append("{0}({1})".format(lpn, ww))
×
86
                    lpl_c += ww
×
87
                logger.debug("lpl: %s", " -> ".join(lpl_str))
×
88
                logger.debug("lplt = %d", int(lpl_c))
×
89

90
            M = self._max_dop
2✔
91
            # print("N (makespan) is ", N, "M is ", M)
92
            ma = np.zeros((M, N), dtype=int)
2✔
93
            pr = np.zeros((M), dtype=int)
2✔
94
            last_pid = -1
2✔
95
            prev_n = None
2✔
96

97
            topo_sort = nx.topological_sort(G)
2✔
98
            for n in topo_sort:
2✔
99
                node = G.nodes[n]
2✔
100
                try:
2✔
101
                    stt = node["stt"]
2✔
102
                    edt = node["edt"]
2✔
103
                except KeyError as ke:
×
104
                    raise SchedulerException(
×
105
                        "No schedule labels found: {0}".format(str(ke))
106
                    )
107
                if edt == stt:
2✔
108
                    continue
2✔
109
                if prev_n in G.predecessors(n):
2✔
110
                    curr_pid = last_pid
2✔
111
                else:
112
                    found = None
2✔
113
                    for i in range(M):
2✔
114
                        if pr[i] <= stt:
2✔
115
                            found = i
2✔
116
                            break
2✔
117
                    if found is None:
2✔
118
                        raise SchedulerException(
×
119
                            "Cannot find a idle PID, max_dop provided: {0}, actual max_dop: {1}\n Graph: {2}".format(
120
                                M, "DAGUtil.get_max_dop(G)", G.nodes(data=True)
121
                            )
122
                        )
123
                        # DAGUtil.get_max_dop(G), G.nodes(data=True)))
124
                    curr_pid = found
2✔
125
                ma[curr_pid, stt:edt] = n
2✔
126
                pr[curr_pid] = edt
2✔
127
                last_pid = curr_pid
2✔
128
                prev_n = n
2✔
129
            self._sma = ma
2✔
130
            # print(ma)
131
        return self._sma
2✔
132

133
    @property
2✔
134
    def workload(self):
2✔
135
        """
136
        Return: (integer)
137
            the mean # of resource units per time unit consumed by the graph/partition
138
        """
139
        if self._wkl is None:
2✔
140
            ma = self.schedule_matrix
2✔
141
            c = []
2✔
142
            for i in range(ma.shape[1]):
2✔
143
                c.append(np.count_nonzero(ma[:, i]))
2✔
144
            self._wkl = int(
2✔
145
                np.mean(np.array(c))
146
            )  # since METIS only accepts integer
147
        return self._wkl
2✔
148

149
    @property
2✔
150
    def efficiency(self):
2✔
151
        """
152
        resource usage percentage (integer)
153
        """
154
        return int(float(self.workload) / self._max_dop * 100)
2✔
155

156

157
class Partition(object):
2✔
158
    """
159
    Logical partition, multiple (1 ~ N) of these can be placed onto a single
160
    physical resource unit
161

162
    Logical partition can be nested, and it somewhat resembles the `dlg.manager.drop_manager`
163
    """
164

165
    def __init__(self, gid, max_dop):
2✔
166
        """
167
        gid:        cluster/partition id (string)
168
        max_dop:    maximum allowed degree of parallelism in this partition (int)
169
        """
170
        self._gid = gid
2✔
171
        self._dag = nx.DiGraph()
2✔
172
        self._ask_max_dop = max_dop
2✔
173
        self._max_antichains = None  # a list of max (width) antichains
2✔
174
        self._lpl = None
2✔
175
        self._schedule = None
2✔
176
        self._max_dop = None
2✔
177
        self._parent_id = None
2✔
178
        self._child_parts = None
2✔
179
        self._tmp_merge_dag = None
2✔
180
        self._tmp_new_ac = None
2✔
181

182
    @property
2✔
183
    def parent_id(self):
2✔
184
        return self._parent_id
2✔
185

186
    @parent_id.setter
2✔
187
    def parent_id(self, value):
2✔
188
        self._parent_id = value
2✔
189

190
    @property
2✔
191
    def partition_id(self):
2✔
192
        return self._gid
2✔
193

194
    @property
2✔
195
    def schedule(self):
2✔
196
        """
197
        Get the schedule assocaited with this partition
198
        """
199
        if self._schedule is None:
2✔
200
            self._schedule = Schedule(self._dag, self._max_dop)
2✔
201
        return self._schedule
2✔
202

203
    def recompute_schedule(self):
2✔
204
        self._schedule = None
×
205
        return self.schedule
×
206

207
    def can_merge(self, that):
2✔
208
        if self._max_dop + that._max_dop <= self._ask_max_dop:
×
209
            return True
×
210
        else:
211
            return False
×
212

213
        # TODO re-implement this performance hog!
214
        # self._tmp_merge_dag = nx.compose(self._dag, that._dag)
215
        # return DAGUtil.get_max_dop(self._tmp_merge_dag) <= self._ask_max_dop
216

217
    def merge(self, that):
2✔
218
        if self._tmp_merge_dag is not None:
×
219
            self._dag = self._tmp_merge_dag
×
220
            self._tmp_merge_dag = None
×
221
        else:
222
            self._dag = nx.compose(self._dag, that._dag)
×
223

224
        # self._max_dop
225

226
        # TODO add this performance hog!
227
        # self._max_antichains = None
228

229
    def can_add(self, u, v, gu, gv):
2✔
230
        """
231
        Check if nodes u and/or v can join this partition
232
        A node may be rejected due to reasons such as: DoP overflow or
233
        completion time deadline overdue, etc.
234
        """
235
        uw = gu["weight"]
×
236
        vw = gv["weight"]
×
237
        if len(self._dag.nodes()) == 0:
×
238
            return (True, False, False)
×
239

240
        unew = u not in self._dag.nodes
×
241
        vnew = v not in self._dag.nodes
×
242

243
        if DEBUG:
×
244
            slow_max = DAGUtil.get_max_antichains(self._dag)
×
245
            fast_max = self._max_antichains
×
246
            info = "Before: {0} - slow max: {1}, fast max: {2}, u: {3}, v: {4}, unew:{5}, vnew:{6}".format(
×
247
                self._dag.edges(), slow_max, fast_max, u, v, unew, vnew
248
            )
249
            logger.debug(info)
×
250
            if len(slow_max) != len(fast_max):
×
251
                raise SchedulerException("ERROR - {0}".format(info))
×
252

253
        self._dag.add_node(u, weight=uw)
×
254
        self._dag.add_node(v, weight=vw)
×
255
        self._dag.add_edge(u, v)
×
256

257
        if unew and vnew:
×
258
            mydop = DAGUtil.get_max_dop(self._dag)
×
259
        else:
260
            mydop = self.probe_max_dop(u, v, unew, vnew)
×
261
            # TODO - put the following code in a unit test!
262
            if DEBUG:
×
263
                mydop_slow = DAGUtil.get_max_dop(self._dag)  #
×
264
                if mydop_slow != mydop:
×
265
                    err_msg = (
×
266
                        "u = {0}, v = {1}, unew = {2}, vnew = {3}".format(
267
                            u, v, unew, vnew
268
                        )
269
                    )
270
                    raise SchedulerException(
×
271
                        "{2}: mydop = {0}, mydop_slow = {1}".format(
272
                            mydop, mydop_slow, err_msg
273
                        )
274
                    )
275
        ret = False if mydop > self._ask_max_dop.get("num_cpus", 1) else True
×
276
        if unew:
×
277
            self.remove(u)
×
278
        if vnew:
×
279
            self.remove(v)
×
280
        return (ret, unew, vnew)
×
281

282
    def add(self, u, v, gu, gv, sequential=False, global_dag=None):
2✔
283
        """
284
        Add nodes u and/or v to the partition
285
        if sequential is True, break antichains to sequential chains
286
        """
287
        # if (self.partition_id == 180):
288
        #     logger.debug("u = ", u, ", v = ", v, ", partition = ", self.partition_id)
289
        uw = gu["weight"]
×
290
        vw = gv["weight"]
×
291
        unew = u not in self._dag.nodes
×
292
        vnew = v not in self._dag.nodes
×
293
        self._dag.add_node(u, weight=uw, num_cpus=gu["num_cpus"])
×
294
        self._dag.add_node(v, weight=vw, num_cpus=gv["num_cpus"])
×
295
        self._dag.add_edge(u, v)
×
296

297
        if unew and vnew:  # we know this is fast
×
298
            self._max_antichains = DAGUtil.get_max_antichains(self._dag)
×
299
            self._max_dop = 1
×
300
        else:
301
            if sequential and (global_dag is not None):
×
302
                # break potential antichain to sequential chain
303
                if unew:
×
304
                    v_ups = nx.ancestors(self._dag, v)
×
305
                    for vup in v_ups:
×
306
                        if u == vup:
×
307
                            continue
×
308
                        if len(list(self._dag.predecessors(vup))) == 0:
×
309
                            # link u to "root" parent of v to break antichain
310
                            self._dag.add_edge(u, vup)
×
311
                            # change the original global graph
312
                            global_dag.add_edge(u, vup, weight=0)
×
313
                            if not nx.is_directed_acyclic_graph(global_dag):
×
314
                                global_dag.remove_edge(u, vup)
×
315
                else:
316
                    u_downs = nx.descendants(self._dag, u)
×
317
                    for udo in u_downs:
×
318
                        if udo == v:
×
319
                            continue
×
320
                        if len(list(self._dag.successors(udo))) == 0:
×
321
                            # link "leaf" children of u to v to break antichain
322
                            self._dag.add_edge(udo, v)
×
323
                            # change the original global graph
324
                            global_dag.add_edge(udo, v, weight=0)
×
325
                            if not nx.is_directed_acyclic_graph(global_dag):
×
326
                                global_dag.remove_edge(udo, v)
×
327

328
            self._max_dop = self.probe_max_dop(u, v, unew, vnew, update=True)
×
329
            # self._max_dop = DAGUtil.get_max_dop(self._dag)# this is too slow!
330

331
    def remove(self, n):
2✔
332
        """
333
        Remove node n from the partition
334
        """
335
        self._dag.remove_node(n)
×
336

337
    def add_node(self, u, weight):
2✔
338
        """
339
        Add a single node u to the partition
340
        """
341
        self._dag.add_node(u, weight=weight)
×
342
        self._max_dop = 1
×
343

344
    def probe_max_dop(self, u, v, unew, vnew, update=False):
2✔
345
        """
346
        An incremental antichain (which appears significantly more efficient than the networkx antichains)
347
        But only works for DoP, not for weighted width
348
        """
349
        if self._max_antichains is None:
2✔
350
            new_ac = DAGUtil.get_max_antichains(self._dag)
2✔
351
            if update:
2✔
352
                self._max_antichains = new_ac
2✔
353
            if len(new_ac) == 0:
2✔
354
                if update:
×
355
                    self._max_antichains = None
×
356
                return 0
×
357
            else:
358
                return len(new_ac[0])
2✔
359
        else:
360
            if update and self._tmp_new_ac is not None:
2✔
361
                self._max_antichains, md = self._tmp_new_ac
2✔
362
                self._tmp_new_ac = None
2✔
363
                return md
2✔
364

365
            if unew:
2✔
366
                ups = nx.descendants(self._dag, u)
×
367
                new_node = u
×
368
            elif vnew:
2✔
369
                ups = nx.ancestors(self._dag, v)
2✔
370
                new_node = v
2✔
371
            else:
372
                raise SchedulerException("u v are both new/old")
×
373
            new_ac = []
2✔
374
            md = 1
2✔
375
            for (
2✔
376
                ma
377
            ) in (
378
                self._max_antichains
379
            ):  # missing elements in the current max_antichains!
380
                # incremental updates
381
                found = False
2✔
382
                for n in ma:
2✔
383
                    if n in ups:
×
384
                        found = True
×
385
                        break
×
386
                if not found:
2✔
387
                    mma = list(ma)
2✔
388
                    mma.append(new_node)
2✔
389
                    new_ac.append(mma)
2✔
390
                    if len(mma) > md:
2✔
391
                        md = len(mma)
×
392
                elif len(ma) > md:
×
393
                    md = len(ma)
×
394
                new_ac.append(ma)  # carry over, then prune it
2✔
395
            if len(new_ac) > 0:
2✔
396
                self._tmp_new_ac = (new_ac, md)
2✔
397
                if update:
2✔
398
                    self._max_antichains = new_ac
2✔
399
                return md
2✔
400
            else:
401
                raise SchedulerException("No antichains")
×
402

403
    @property
2✔
404
    def cardinality(self):
2✔
405
        return len(self._dag.nodes())
×
406

407

408
class KFamilyPartition(Partition):
2✔
409
    """
410
    A special case (K = 1) of the Maximum Weighted K-families based on
411
    the Theorem 3.1 in
412
    http://fmdb.cs.ucla.edu/Treports/930014.pdf
413
    """
414

415
    def __init__(self, gid, max_dop, global_dag=None):
2✔
416
        """
417
        max_dop:    dict with key:   resource_attributes (string)
418
                              value: resource_capacity (integer)
419
        """
420
        mtype = type(max_dop)
2✔
421
        if mtype == int:
2✔
422
            # backward compatible
423
            max_dop = {"num_cpus": max_dop}
2✔
424
        elif mtype == dict:
2✔
425
            pass
2✔
426
        else:
427
            raise SchedulerException("Invalid max_dop type: %r" % mtype)
×
428

429
        super(KFamilyPartition, self).__init__(gid, max_dop)
2✔
430
        self._bpg = nx.DiGraph()
2✔
431
        self._global_dag = global_dag
2✔
432
        self._check_global_dag = global_dag is not None
2✔
433
        self._w_attr = max_dop.keys()
2✔
434
        self._tc = defaultdict(set)
2✔
435
        self._tmp_max_dop = None
2✔
436

437
    def add_node(self, u):
2✔
438
        """
439
        Add a single node u to the partition
440
        """
441
        kwargs = dict()
2✔
442
        if self._tmp_max_dop is None:
2✔
443
            self._tmp_max_dop = dict()
2✔
444
        self_global_dag = self._global_dag
2✔
445
        for _w_attr in self._w_attr:
2✔
446
            u_aw = self_global_dag.nodes[u].get(_w_attr, 1)
2✔
447
            kwargs[_w_attr] = u_aw
2✔
448
        kwargs["weight"] = self_global_dag.nodes[u].get("weight", 5)
2✔
449
        self._dag.add_node(u, **kwargs)
2✔
450
        for k in self._w_attr:
2✔
451
            self._tmp_max_dop[k] = get_max_weighted_antichain(
2✔
452
                self._dag, w_attr=k
453
            )[0]
454
        self._max_dop = self._tmp_max_dop
2✔
455

456
    def can_merge(self, that, u, v):
2✔
457
        """"""
458
        dag = nx.compose(self._dag, that._dag)
2✔
459
        if u is not None:
2✔
460
            dag.add_edge(u, v)
2✔
461
        tmp_max_dop = copy.deepcopy(self._tmp_max_dop)
2✔
462

463
        for _w_attr in self._w_attr:
2✔
464
            ask_max_dop = (
2✔
465
                self._ask_max_dop[_w_attr]
466
                if self._ask_max_dop[_w_attr] is not None
467
                else 1
468
            )
469
            mydop = get_max_weighted_antichain(dag, w_attr=_w_attr)[0]
2✔
470
            curr_max = max(ask_max_dop, that._max_dop[_w_attr])
2✔
471

472
            if mydop <= curr_max:
2✔
473
                # if you don't increase DoP, we accept that immediately
474
                tmp_max_dop[_w_attr] = curr_max
2✔
475
            elif mydop > ask_max_dop:
2✔
476
                return False
2✔
477
            else:
478
                tmp_max_dop[_w_attr] = mydop
×
479

480
        self._tmp_max_dop = tmp_max_dop  # only change it when returning True
2✔
481
        return True
2✔
482

483
    def merge(self, that, u, v):
2✔
484
        self._dag = nx.compose(self._dag, that._dag)
2✔
485
        if u is not None:
2✔
486
            self._dag.add_edge(u, v)
2✔
487
        if self._tmp_max_dop is not None:
2✔
488
            self._max_dop = self._tmp_max_dop
2✔
489
            # print("Gid %d just merged with DoP %d" % (self._gid, self._tmp_max_dop))
490
        else:
491
            # we could recalcuate it again, but we are lazy!
492
            raise SchedulerException("can_merge was not probed before add()")
×
493

494

495
class Scheduler(object):
2✔
496
    """
497
    Static Scheduling consists of three steps:
498
    1. partition the DAG into an optimal number (M) of partitions
499
    goal - minimising execution time while maintaining intra-partition DoP
500
    2. merge partitions into a given number (N) of partitions (if M > N)
501
    goal - minimise logical communication cost while maintaining load balancing
502
    3. map each merged partition to a resource unit
503
    goal - minimise physical communication cost amongst resource units
504
    """
505

506
    def __init__(self, drop_list, max_dop=8, dag=None):
2✔
507
        """
508
        turn drop_list into DAG, and check its validity
509
        """
510
        self._drop_list = drop_list
2✔
511
        if dag is None:
2✔
512
            self._dag = DAGUtil.build_dag_from_drops(self._drop_list)
2✔
513
        else:
514
            self._dag = dag
2✔
515
        self._max_dop = max_dop
2✔
516
        self._parts = None  # partitions
2✔
517
        self._part_dict = dict()  # {gid : part}
2✔
518
        self._part_edges = []  # edges amongst all partitions
2✔
519

520
    def partition_dag(self):
2✔
521
        raise SchedulerException("Not implemented. Try subclass instead")
×
522

523
    def merge_partitions(self, num_partitions, bal_cond=1):
2✔
524
        """
525
        Merge M partitions into N partitions where N < M
526
            implemented using METIS for now
527

528
        bal_cond:  load balance condition (integer):
529
                    0 - workload,
530
                    1 - CPU count (faster to evaluate than workload)
531
        """
532
        # 1. build the bi-directional graph (each partition is a node)
533
        metis = DAGUtil.import_metis()
2✔
534
        G = nx.Graph()
2✔
535
        st_gid = len(self._drop_list) + len(self._parts) + 1
2✔
536
        if bal_cond == 0:
2✔
537
            G.graph["node_weight_attr"] = ["wkl", "eff"]
2✔
538
            for part in self._parts:
2✔
539
                sc = part.schedule
2✔
540
                G.add_node(
2✔
541
                    part.partition_id, wkl=sc.workload, eff=sc.efficiency
542
                )
543
        else:
544
            G.graph["node_weight_attr"] = "cc"
×
545
            for part in self._parts:
×
546
                # sc = part.schedule
547
                pdop = part._max_dop
×
548
                # TODO add memory as one of the LB condition too
549
                cc_eval = (
×
550
                    pdop if type(pdop) == int else pdop.get("num_cpus", 1)
551
                )
552
                G.add_node(part.partition_id, cc=cc_eval)
×
553

554
        for e in self._part_edges:
2✔
555
            u = e[0]
2✔
556
            v = e[1]
2✔
557
            ugid = self._dag.nodes[u].get("gid", None)
2✔
558
            vgid = self._dag.nodes[v].get("gid", None)
2✔
559
            G.add_edge(ugid, vgid)  # repeating is fine
2✔
560
            ew = self._dag.adj[u][v]["weight"]
2✔
561
            try:
2✔
562
                G[ugid][vgid]["weight"] += ew
2✔
563
            except KeyError:
2✔
564
                G[ugid][vgid]["weight"] = ew
2✔
565
        # DAGUtil.metis_part(G, 15)
566
        # since METIS does not allow zero edge weight, reset them to one
567
        for e in G.edges(data=True):
2✔
568
            if e[2]["weight"] == 0:
2✔
569
                e[2]["weight"] = 1
×
570
        # logger.debug(G.nodes(data=True))
571
        (edgecuts, metis_parts) = metis.part_graph(
2✔
572
            G, nparts=num_partitions, ufactor=1
573
        )
574

575
        for node, pt in zip(G.nodes(), metis_parts):  # note min(pt) == 0
2✔
576
            parent_id = pt + st_gid
2✔
577
            child_part = self._part_dict[node]
2✔
578
            child_part.parent_id = parent_id
2✔
579
            # logger.debug("Part {0} --> Cluster {1}".format(child_part.partition_id, parent_id))
580
            # parent_part = Partition(parent_id, None)
581
            # self._parts.append(parent_part)
582
        # logger.debug("Edgecuts of merged partitions: ", edgecuts)
583
        return edgecuts
2✔
584

585
    def map_partitions(self):
2✔
586
        """
587
        map logical partitions to physical resources
588
        """
589
        pass
×
590

591

592
class MySarkarScheduler(Scheduler):
2✔
593
    """
594
    Based on "V. Sarkar, Partitioning and Scheduling Parallel Programs for Execution on
595
    Multiprocessors. Cambridge, MA: MIT Press, 1989."
596

597
    Main change
598
    We do not order independent tasks within the same cluster. This could blow the cluster, therefore
599
    we allow for a cost constraint on the number of concurrent tasks (e.g. # of cores) within each cluster
600

601
    Why
602
    1. we only need to topologically sort the DAG once since we do not add new edges in the cluster
603
    2. closer to requirements
604
    3. adjustable for local schedulers
605

606
    Similar ideas:
607
    http://stackoverflow.com/questions/3974731
608
    """
609

610
    def __init__(self, drop_list, max_dop=8, dag=None, dump_progress=False):
2✔
611
        super(MySarkarScheduler, self).__init__(
2✔
612
            drop_list, max_dop=max_dop, dag=dag
613
        )
614
        self._sspace = [3] * len(self._dag.edges())  # all edges are not zeroed
2✔
615
        self._dump_progress = dump_progress
2✔
616

617
    def override_cannot_add(self):
2✔
618
        """
619
        Whether this scheduler will override the False result from `Partition.can_add()`
620
        """
621
        return False
×
622

623
    def is_time_critical(self, u, uw, unew, v, vw, vnew, curr_lpl, ow, rem_el):
2✔
624
        """
625
        :return: True
626

627
        MySarkarScheduler always returns False
628
        """
629
        logger.debug("MySarkar time criticality called")
×
630
        return True
×
631

632
    def _merge_two_parts(self, ugid, vgid, u, v, gu, gv, g_dict, parts, G):
2✔
633
        """
634
        Merge two parts associated with u and v respectively
635

636
        Return: None if these two parts cannot be merged
637
                due to reasons such as DoP overflow
638
                A ``Part`` instance
639
        """
640
        # get the new part should we go ahead
641
        # the new part should be one  of partu or partv
642
        # print("\nMerging ugid %d and vgid %d, u %d and v %d" % (ugid, vgid, u, v))
643
        l_gid = min(ugid, vgid)
2✔
644
        r_gid = max(ugid, vgid)
2✔
645
        part_new = g_dict[l_gid]
2✔
646
        part_removed = g_dict[r_gid]
2✔
647

648
        if not part_new.can_merge(part_removed, u, v):
2✔
649
            return None
2✔
650

651
        part_new.merge(part_removed, u, v)
2✔
652

653
        # Get hold of all gnodes that belong to "part_removed"
654
        # and re-assign them to the new partitions
655
        for n in part_removed._dag.nodes():
2✔
656
            G.nodes[n]["gid"] = l_gid
2✔
657

658
        index = None
2✔
659
        for i, part in enumerate(parts):
2✔
660
            p_gid = part._gid
2✔
661
            if p_gid > r_gid:
2✔
662
                g_dict[p_gid - 1] = part
2✔
663
                part._gid -= 1
2✔
664
                for n in part._dag.nodes():
2✔
665
                    G.nodes[n]["gid"] = part._gid
2✔
666
            elif p_gid == r_gid:
2✔
667
                # index = len(parts) - i - 1
668
                index = i
2✔
669
                del g_dict[p_gid]
2✔
670

671
        if index is None:
2✔
672
            raise SchedulerException("Failed to find r_gid")
×
673
        parts[:] = parts[0:index] + parts[index + 1 :]
2✔
674

675
        return part_new
2✔
676

677
    def reduce_partitions(self, parts, g_dict, G):
2✔
678
        """
679
        further reduce the number of partitions by merging partitions whose max_dop
680
        is less than capacity
681

682
        step 1 - sort partition list based on their
683
                 _max_dop of num_cpus as default
684
        step 2 - enumerate each partition p to see merging
685
                 between p and its neighbour is feasible
686
        """
687
        done_reduction = False
2✔
688
        num_reductions = 0
2✔
689
        # TODO consider other w_attrs other than CPUs!
690
        parts.sort(key=lambda x: x._max_dop["num_cpus"])
2✔
691
        while not done_reduction:
2✔
692
            for i, partA in enumerate(parts):
2✔
693
                if i < len(parts) - 1:
2✔
694
                    partB = parts[i + 1]
2✔
695
                    new_part = self._merge_two_parts(
2✔
696
                        partA._gid,
697
                        partB._gid,
698
                        None,
699
                        None,
700
                        None,
701
                        None,
702
                        g_dict,
703
                        parts,
704
                        G,
705
                    )
706
                    if new_part is not None:
2✔
707
                        num_reductions += 1
×
708
                        break  # force re-sorting
×
709
                else:
710
                    done_reduction = True
2✔
711
                    logger.info(
2✔
712
                        "Performed reductions %d times", num_reductions
713
                    )
714
                    break
2✔
715

716
    def partition_dag(self):
2✔
717
        """
718
        Return a tuple of
719
            1. the # of partitions formed (int)
720
            2. the parallel time (longest path, int)
721
            3. partition time (seconds, float)
722
        """
723
        G = self._dag
2✔
724
        st_gid = len(self._drop_list) + 1
2✔
725
        init_c = st_gid
2✔
726
        el = sorted(G.edges(data=True), key=lambda ed: ed[2]["weight"] * -1)
2✔
727
        stt = time.time()
2✔
728
        topo_sorted = nx.topological_sort(G)
2✔
729
        g_dict = self._part_dict  # dict() #{gid : Partition}
2✔
730
        curr_lpl = None
2✔
731
        parts = []
2✔
732
        plots_data = []
2✔
733
        dump_progress = self._dump_progress
2✔
734

735
        for n in G.nodes(data=True):
2✔
736
            n[1]["gid"] = st_gid
2✔
737
            part = KFamilyPartition(st_gid, self._max_dop, global_dag=G)
2✔
738
            part.add_node(n[0])
2✔
739
            g_dict[st_gid] = part
2✔
740
            parts.append(part)  # will it get rejected?
2✔
741
            st_gid += 1
2✔
742

743
        for i, e in enumerate(el):
2✔
744
            u = e[0]
2✔
745
            gu = G.nodes[u]
2✔
746
            v = e[1]
2✔
747
            gv = G.nodes[v]
2✔
748
            ow = G.adj[u][v]["weight"]
2✔
749
            G.adj[u][v]["weight"] = 0  # edge zeroing
2✔
750
            ugid = gu.get("gid", None)
2✔
751
            vgid = gv.get("gid", None)
2✔
752
            if ugid != vgid:  # merge existing parts
2✔
753
                part = self._merge_two_parts(
2✔
754
                    ugid, vgid, u, v, gu, gv, g_dict, parts, G
755
                )
756
                if part is not None:
2✔
757
                    st_gid -= 1
2✔
758
                    self._sspace[i] = 1
2✔
759
                else:
760
                    G.adj[u][v]["weight"] = ow
2✔
761
                    self._part_edges.append(e)
2✔
762
            if dump_progress:
2✔
763
                bb = np.median([pp._tmp_max_dop for pp in parts])
×
764
                curr_lpl = DAGUtil.get_longest_path(
×
765
                    G, show_path=False, topo_sort=topo_sorted
766
                )[1]
767
                plots_data.append("%d,%d,%d" % (curr_lpl, len(parts), bb))
×
768
        self.reduce_partitions(parts, g_dict, G)
2✔
769
        edt = time.time() - stt
2✔
770
        self._parts = parts
2✔
771
        if dump_progress:
2✔
772
            with open("/tmp/%.3f_lpl_parts.csv" % time.time(), "w") as of:
×
773
                of.writelines(os.linesep.join(plots_data))
×
774
        if curr_lpl is None:
2✔
775
            curr_lpl = DAGUtil.get_longest_path(
2✔
776
                G, show_path=False, topo_sort=topo_sorted
777
            )[1]
778
        return ((st_gid - init_c), curr_lpl, edt, parts)
2✔
779

780

781
class MinNumPartsScheduler(MySarkarScheduler):
2✔
782
    """
783
    A special type of partition that aims to schedule the DAG on time but at minimum cost.
784
    In this particular case, the cost is the number of partitions that will be generated.
785
    The assumption is # of partitions (with certain DoP) more or less represents resource footprint.
786
    """
787

788
    def __init__(
2✔
789
        self, drop_list, deadline, max_dop=8, dag=None, optimistic_factor=0.5
790
    ):
791
        super(MinNumPartsScheduler, self).__init__(
2✔
792
            drop_list, max_dop=max_dop, dag=dag
793
        )
794
        self._deadline = deadline
2✔
795
        self._optimistic_factor = optimistic_factor
2✔
796

797
    def override_cannot_add(self):
2✔
798
        return True
×
799

800
    def is_time_critical(self, u, uw, unew, v, vw, vnew, curr_lpl, ow, rem_el):
2✔
801
        """
802
        This is called ONLY IF either can_add on partition has returned "False"
803
        or the new critical path is longer than the old one at each iteration
804

805
        Parameters:
806
            u - node u, v - node v, uw - weight of node u, vw - weight of node v
807
            curr_lpl - current longest path length, ow - current edge weight
808
            rem_el - remainig edges to be zeroed
809
            ow - original edge length
810
        Returns:
811
            Boolean
812

813
        It looks ahead to compute the probability of time being critical
814
        and compares that with the _optimistic_factor
815
        probility = (num of edges need to be zeroed to meet the deadline) /
816
        (num of remaining unzeroed edges)
817
        """
818
        if unew and vnew:
×
819
            return True
×
820
        # compute time criticality probility
821
        ttlen = float(len(rem_el))
×
822
        if ttlen == 0:
×
823
            return False
×
824
        c = 0
×
825
        for i, e in enumerate(rem_el):
×
826
            c = i
×
827
            edge_weight = self._dag.edge[e[0]][e[1]]["weight"]
×
828
            if (curr_lpl - edge_weight) <= self._deadline:
×
829
                break
×
830
        # probability that remaining edges will be zeroed in order to meet the deadline
831
        prob = (c + 1) / ttlen
×
832
        time_critical = True if (prob > self._optimistic_factor) else False
×
833
        # print "time criticality is {0}, prob is {1}".format(time_critical, prob)
834
        return time_critical
×
835
        # if (time_critical):
836
        #     # enforce sequentialisation
837
        #     # see Figure 3 in
838
        #     # Gerasoulis, A. and Yang, T., 1993. On the granularity and clustering of directed acyclic task graphs.
839
        #     # Parallel and Distributed Systems, IEEE Transactions on, 4(6), pp.686-701.
840
        #     #TODO 1. formal proof: u cannot be the leaf node in the partition otherwise ca would have been true
841
        #     #TODO 2. check if this is on the critical path at all?
842
        #     nw = uw if unew else vw
843
        #     return (ow >= nw) # assuming "stay out of partition == parallelism"
844
        # else: # join the partition to minimise num_part
845
        #     return True
846

847

848
class PSOScheduler(Scheduler):
2✔
849
    """
850
    Use the Particle Swarm Optimisation to guide the Sarkar algorithm
851
    https://en.wikipedia.org/wiki/Particle_swarm_optimization
852

853
    The idea is to let "edgezeroing" becomes the search variable X
854
    The number of dimensions of X is the number of edges in DAG
855
    Possible values for each dimension is a discrete set {1, 2, 3}
856
    where:
857
    * 10 - no zero (2 in base10) + 1
858
    * 00 - zero w/o linearisation (0 in base10) + 1
859
    * 01 - zero with linearisation (1 in base10) + 1
860

861
    if (deadline is present):
862
        the objective function sets up a partition scheme such that
863
            (1) DoP constrints for each partiiton are satisfied
864
                based on X[i] value, reject or linearisation
865
            (2) returns num_of_partitions
866

867
        constrain function:
868
            1. makespan < deadline
869
    else:
870
        the objective function sets up a partition scheme such that
871
            (1) DoP constrints for each partiiton are satisfied
872
                based on X[i] value, reject or linearisation
873
            (2) returns makespan
874
    """
875

876
    def __init__(
2✔
877
        self,
878
        drop_list,
879
        max_dop=8,
880
        dag=None,
881
        deadline=None,
882
        topk=30,
883
        swarm_size=40,
884
    ):
885
        super(PSOScheduler, self).__init__(drop_list, max_dop=max_dop, dag=dag)
×
886
        self._deadline = deadline
×
887
        # search space: key - combination of X[i] (string),
888
        # val - a tuple of (critical_path (int), num_parts (int))
889
        self._sspace_dict = dict()
×
890
        self._topk = topk
×
891
        self._swarm_size = swarm_size if swarm_size is not None else 40
×
892
        self._lite_dag = DAGUtil.build_dag_from_drops(
×
893
            self._drop_list, embed_drop=False
894
        )
895
        self._call_counts = 0
×
896
        leng = len(self._lite_dag.edges())
×
897
        self._leng = leng
×
898
        self._topk = (
×
899
            leng if self._topk is None or leng < self._topk else self._topk
900
        )
901

902
    def partition_dag(self):
2✔
903
        """
904
        Returns a tuple of:
905
            1. the # of partitions formed (int)
906
            2. the parallel time (longest path, int)
907
            3. partition time (seconds, float)
908
            4. a list of partitions (Partition)
909
        """
910
        # trigger the PSO algorithm
911
        G = self._dag
×
912
        lb = [0.99] * self._leng
×
913
        ub = [3.01] * self._leng
×
914
        stt = time.time()
×
915
        if self._deadline is None:
×
916
            xopt, fopt = pso(
×
917
                self.objective_func, lb, ub, swarmsize=self._swarm_size
918
            )
919
        else:
920
            xopt, fopt = pso(
×
921
                self.objective_func,
922
                lb,
923
                ub,
924
                ieqcons=[self.constrain_func],
925
                swarmsize=self._swarm_size,
926
            )
927

928
        curr_lpl, num_parts, parts, g_dict = self._partition_G(G, xopt)
×
929
        # curr_lpl, num_parts, parts, g_dict = self.objective_func(xopt)
930
        self._part_dict = g_dict
×
931
        edt = time.time()
×
932
        # print "PSO scheduler took {0} seconds".format(edt - stt)
933
        st_gid = len(self._drop_list) + 1 + num_parts
×
934
        for n in G.nodes(data=True):
×
935
            if not "gid" in n[1]:
×
936
                n[1]["gid"] = st_gid
×
937
                part = Partition(st_gid, self._max_dop)
×
938
                part.add_node(n[0], n[1].get("weight", 1))
×
939
                g_dict[st_gid] = part
×
940
                parts.append(part)  # will it get rejected?
×
941
                num_parts += 1
×
942
        self._parts = parts
×
943
        # print "call counts ", self._call_counts
944
        return (num_parts, curr_lpl, edt - stt, parts)
×
945

946
    def _partition_G(self, G, x):
2✔
947
        """
948
        A helper function to partition G based on a given scheme x
949
        subject to constraints imposed by each partition's DoP
950
        """
951
        # print x
952
        st_gid = len(self._drop_list) + 1
×
953
        init_c = st_gid
×
954
        el = sorted(G.edges(data=True), key=lambda ed: ed[2]["weight"] * -1)
×
955
        # topo_sorted = nx.topological_sort(G)
956
        # g_dict = self._part_dict#dict() #{gid : Partition}
957
        g_dict = dict()
×
958
        parts = []
×
959
        for i, e in enumerate(el):
×
960
            pos = int(round(x[i]))
×
961
            if pos == 3:  # 10 non_zero + 1
×
962
                continue
×
963
            elif pos == 2:  # 01 zero with linearisation + 1
×
964
                linear = True
×
965
            elif pos == 1:  # 00 zero without linearisation + 1
×
966
                linear = False
×
967
            else:
968
                raise SchedulerException(
×
969
                    "PSO position out of bound: {0}".format(pos)
970
                )
971

972
            u = e[0]
×
973
            gu = G.nodes[u]
×
974
            v = e[1]
×
975
            gv = G.nodes[v]
×
976
            ow = G.adj[u][v]["weight"]
×
977
            G.adj[u][v]["weight"] = 0  # edge zeroing
×
978
            recover_edge = False
×
979

980
            ugid = gu.get("gid", None)
×
981
            vgid = gv.get("gid", None)
×
982
            if ugid and (not vgid):
×
983
                part = g_dict[ugid]
×
984
            elif (not ugid) and vgid:
×
985
                part = g_dict[vgid]
×
986
            elif not ugid and (not vgid):
×
987
                part = Partition(st_gid, self._max_dop)
×
988
                g_dict[st_gid] = part
×
989
                parts.append(part)  # will it get rejected?
×
990
                st_gid += 1
×
991
            else:  # elif (ugid and vgid):
992
                # cannot change Partition once is in!
993
                part = None
×
994
            # uw = gu['weight']
995
            # vw = gv['weight']
996

997
            if part is None:
×
998
                recover_edge = True
×
999
            else:
1000
                ca, unew, vnew = part.can_add(u, v, gu, gv)
×
1001
                if ca:
×
1002
                    # ignore linear flag, add it anyway
1003
                    part.add(u, v, gu, gv)
×
1004
                    gu["gid"] = part._gid
×
1005
                    gv["gid"] = part._gid
×
1006
                else:
1007
                    if linear:
×
1008
                        part.add(u, v, gu, gv, sequential=True, global_dag=G)
×
1009
                        gu["gid"] = part._gid
×
1010
                        gv["gid"] = part._gid
×
1011
                    else:
1012
                        recover_edge = True  # outright rejection
×
1013
            if recover_edge:
×
1014
                G.adj[u][v]["weight"] = ow
×
1015
                self._part_edges.append(e)
×
1016
        self._call_counts += 1
×
1017
        # print "called {0} times, len parts = {1}".format(self._call_counts, len(parts))
1018
        return (
×
1019
            DAGUtil.get_longest_path(G, show_path=False)[1],
1020
            len(parts),
1021
            parts,
1022
            g_dict,
1023
        )
1024

1025
    def constrain_func(self, x):
2✔
1026
        """
1027
        Deadline - critical_path >= 0
1028
        """
1029
        if self._deadline is None:
×
1030
            raise SchedulerException(
×
1031
                "Deadline is None, cannot apply constraints!"
1032
            )
1033

1034
        sk = "".join([str(int(round(xi))) for xi in x[0 : self._topk]])
×
1035
        stuff = self._sspace_dict.get(sk, None)
×
1036
        if stuff is None:
×
1037
            G = self._lite_dag.copy()
×
1038
            stuff = self._partition_G(G, x)
×
1039
            self._sspace_dict[sk] = stuff[0:2]
×
1040
            del G
×
1041
        return self._deadline - stuff[0]
×
1042

1043
    def objective_func(self, x):
2✔
1044
        """
1045
        x is a list of values, each taking one of the 3 integers: 0,1,2 for an edge
1046
        indices of x is identical to the indices in G.edges().sort(key='weight')
1047
        """
1048
        # first check if the solution is already available in the search space
1049
        sk = "".join([str(int(round(xi))) for xi in x[0 : self._topk]])
×
1050
        stuff = self._sspace_dict.get(
×
1051
            sk, None
1052
        )  # TODO is this atomic operation?
1053
        if stuff is None:
×
1054
            # make a deep copy to avoid mix up multiple particles,
1055
            # each of which has multiple iterations
1056
            G = self._lite_dag.copy()
×
1057
            stuff = self._partition_G(G, x)
×
1058
            self._sspace_dict[sk] = stuff[0:2]
×
1059
            del G
×
1060
        if self._deadline is None:
×
1061
            return stuff[0]
×
1062
        else:
1063
            return stuff[1]
×
1064

1065

1066
class DAGUtil(object):
2✔
1067
    """
1068
    Helper functions dealing with DAG
1069
    """
1070

1071
    @staticmethod
2✔
1072
    def get_longest_path(
2✔
1073
        G, weight="weight", default_weight=1, show_path=True, topo_sort=None
1074
    ):
1075
        """
1076
        Ported from:
1077
        https://github.com/networkx/networkx/blob/master/networkx/algorithms/dag.py
1078
        Added node weight
1079

1080
        Returns the longest path in a DAG
1081
        If G has edges with 'weight' attribute the edge data are used as weight values.
1082
        :param: G Graph (NetworkX DiGraph)
1083
        :param: weight Edge data key to use for weight (string)
1084
        :param: default_weight The weight of edges that do not have a weight attribute (integer)
1085
        :return: a tuple with two elements: `path` (list), the longest path, and
1086
        `path_length` (float) the length of the longest path.
1087
        """
1088
        dist = {}  # stores {v : (length, u)}
2✔
1089
        if topo_sort is None:
2✔
1090
            topo_sort = nx.topological_sort(G)
2✔
1091
        for v in topo_sort:
2✔
1092
            us = [
2✔
1093
                (
1094
                    dist[u][0]
1095
                    + data.get(weight, default_weight)  # accumulate
1096
                    + G.nodes[u].get(weight, 0)  # edge weight
1097
                    + (  # u node weight
1098
                        G.nodes[v].get(weight, 0)
1099
                        if len(list(G.successors(v))) == 0
1100
                        else 0
1101
                    ),  # v node weight if no successor
1102
                    u,
1103
                )
1104
                for u, data in G.pred[v].items()
1105
            ]
1106
            # Use the best predecessor if there is one and its distance is non-negative, otherwise terminate.
1107
            maxu = max(us) if us else (0, v)
2✔
1108
            dist[v] = maxu if maxu[0] >= 0 else (0, v)
2✔
1109
        u = None
2✔
1110
        v = max(dist, key=dist.get)
2✔
1111
        lp = dist[v][0]
2✔
1112
        if not show_path:
2✔
1113
            path = None
2✔
1114
        else:
1115
            path = []
2✔
1116
            while u != v:
2✔
1117
                path.append(v)
2✔
1118
                u = v
2✔
1119
                v = dist[v][1]
2✔
1120
            path.reverse()
2✔
1121
        return (path, lp)
2✔
1122

1123
    @staticmethod
2✔
1124
    def get_max_width(G, weight="weight", default_weight=1):
2✔
1125
        """
1126
        Get the antichain with the maximum "weighted" width of this DAG
1127
        weight: float (for example, it could be RAM consumption in GB)
1128
        Return : float
1129
        """
1130
        max_width = 0
×
1131
        for antichain in nx.antichains(G):
×
1132
            t = 0
×
1133
            for n in antichain:
×
1134
                t += G.nodes[n].get(weight, default_weight)
×
1135
            if t > max_width:
×
1136
                max_width = t
×
1137
        return max_width
×
1138

1139
    @staticmethod
2✔
1140
    def get_max_dop(G):
2✔
1141
        """
1142
        Get the maximum degree of parallelism of this DAG
1143
        return : int
1144
        """
1145
        return max([len(antichain) for antichain in nx.antichains(G)])
2✔
1146
        """
1147
        max_dop = 0
1148
        for antichain in nx.antichains(G):
1149
            leng = len(antichain)
1150
            if (leng > max_dop):
1151
                max_dop = leng
1152
        return max_dop
1153
        """
1154

1155
    @staticmethod
2✔
1156
    def get_max_antichains(G):
2✔
1157
        """
1158
        return a list of antichains with Top-2 lengths
1159
        """
1160
        return DAGUtil.prune_antichains(nx.antichains(G))
2✔
1161

1162
    @staticmethod
2✔
1163
    def prune_antichains(antichains):
2✔
1164
        """
1165
        Prune a list of antichains to keep those with Top-2 lengths
1166
        antichains is a Generator (not a list!)
1167
        """
1168
        todo = []
2✔
1169
        for antichain in antichains:
2✔
1170
            todo.append(antichain)
2✔
1171
        todo.sort(key=lambda x: len(x), reverse=True)
2✔
1172
        return todo
2✔
1173

1174
    @staticmethod
2✔
1175
    def label_schedule(G, weight="weight", topo_sort=None):
2✔
1176
        """
1177
        for each node, label its start and end time
1178
        """
1179
        if topo_sort is None:
2✔
1180
            topo_sort = nx.topological_sort(G)
2✔
1181
        for v in topo_sort:
2✔
1182
            gv = G.nodes[v]
2✔
1183
            parents = list(G.predecessors(v))
2✔
1184
            if len(parents) == 0:
2✔
1185
                gv["stt"] = 0
2✔
1186
            else:
1187
                # get the latest end time of one of its parents
1188
                ledt = -1
2✔
1189
                for parent in parents:
2✔
1190
                    pedt = G.nodes[parent]["edt"] + G.adj[parent][v].get(
2✔
1191
                        weight, 0
1192
                    )
1193
                    if pedt > ledt:
2✔
1194
                        ledt = pedt
2✔
1195
                gv["stt"] = ledt
2✔
1196
            gv["edt"] = gv["stt"] + gv.get(weight, 0)
2✔
1197

1198
    @staticmethod
2✔
1199
    def ganttchart_matrix(G, topo_sort=None):
2✔
1200
        """
1201
        Return a M (# of DROPs) by N (longest path length) matrix
1202
        """
1203
        lpl = DAGUtil.get_longest_path(G, show_path=True)
×
1204
        # N = lpl[1] - (len(lpl[0]) - 1)
1205
        N = lpl[1]
×
1206
        M = G.number_of_nodes()
×
1207
        ma = np.zeros((M, N), dtype=np.int)
×
1208
        if topo_sort is None:
×
1209
            topo_sort = nx.topological_sort(G)
×
1210
        for i, n in enumerate(topo_sort):
×
1211
            node = G.nodes[n]
×
1212
            try:
×
1213
                stt = node["stt"]
×
1214
                edt = node["edt"]
×
1215
            except KeyError as ke:
×
1216
                raise SchedulerException(
×
1217
                    "No schedule labels found: {0}".format(str(ke))
1218
                )
1219
            # print i, n, stt, edt
1220
            leng = edt - stt
×
1221
            if edt == stt:
×
1222
                continue
×
1223
            try:
×
1224
                ma[i, stt:edt] = np.ones((1, leng))
×
1225
            except:
×
1226
                logger.error(
×
1227
                    "i, stt, edt, leng = %d, %d, %d, %d", i, stt, edt, leng
1228
                )
1229
                logger.error("N, M = %d, %d", M, N)
×
1230
                raise
×
1231
            # print ma[i, :]
1232
        return ma
×
1233

1234
    @staticmethod
2✔
1235
    def import_metis():
2✔
1236
        try:
2✔
1237
            import metis as mt
2✔
1238
        except:
2✔
1239
            pl = platform.platform()
2✔
1240
            if pl.startswith("Darwin"):  # a clumsy way
2✔
1241
                ext = "dylib"
×
1242
            else:
1243
                ext = "so"  # what about Microsoft??!!
2✔
1244
            os.environ["METIS_DLL"] = pkg_resources.resource_filename(
2✔
1245
                "dlg.dropmake", "lib/libmetis.{0}".format(ext)
1246
            )  # @UndefinedVariable
1247
            import metis as mt
2✔
1248
        if not hasattr(mt, "_dlg_patched"):
2✔
1249
            mt._part_graph = mt.part_graph
2✔
1250

1251
            def logged_part_graph(*args, **kwargs):
2✔
1252
                logger.info("Starting metis partitioning")
2✔
1253
                start = time.time()
2✔
1254
                ret = mt._part_graph(*args, **kwargs)  # @UndefinedVariable
2✔
1255
                logger.info(
2✔
1256
                    "Finished metis partitioning in %.3f [s]",
1257
                    time.time() - start,
1258
                )
1259
                return ret
2✔
1260

1261
            mt.part_graph = logged_part_graph
2✔
1262
            mt._dlg_patched = True
2✔
1263
        return mt
2✔
1264

1265
    @staticmethod
2✔
1266
    def build_dag_from_drops(
2✔
1267
        drop_list, embed_drop=True, fake_super_root=False
1268
    ):
1269
        """
1270
        return a networkx Digraph (DAG)
1271
        :param: fake_super_root whether to create a fake super root node in the DAG
1272
        If set to True, it enables edge zero-based scheduling agorithms to make
1273
        more aggressive merging
1274
        """
1275
        # tw - task weight
1276
        # dw - data weight / volume
1277
        key_dict = dict()  # {oid : node_id}
2✔
1278
        drop_dict = dict()  # {oid : drop}
2✔
1279
        out_bound_keys = ["streamingConsumers", "consumers", "outputs"]
2✔
1280
        for i, drop in enumerate(drop_list):
2✔
1281
            oid = drop["oid"]
2✔
1282
            key_dict[oid] = i + 1  # starting from 1
2✔
1283
            drop_dict[oid] = drop
2✔
1284
        G = nx.DiGraph()
2✔
1285
        for i, drop in enumerate(drop_list):
2✔
1286
            oid = drop["oid"]
2✔
1287
            myk = i + 1
2✔
1288
            tt = drop["categoryType"]
2✔
1289
            if tt in [CategoryType.DATA, "data"]:
2✔
1290
                # if (drop['nm'] == 'StreamNull'):
1291
                #     obk = 'streamingConsumers'
1292
                # else:
1293
                #     obk = 'consumers' # outbound keyword
1294
                tw = 0
2✔
1295
                dtp = 0
2✔
1296
            elif tt in [CategoryType.APPLICATION, "app"]:
2✔
1297
                # obk = 'outputs'
1298
                try:
2✔
1299
                    tw = int(drop["weight"])
2✔
UNCOV
1300
                except (ValueError, KeyError):
×
UNCOV
1301
                    tw = 1
×
1302
                dtp = 1
2✔
1303
            elif tt in [CategoryType.SERVICE, "serviceapp"]:
×
1304
                try:
×
1305
                    tw = int(drop["weight"])
×
1306
                except (ValueError, KeyError):
×
1307
                    tw = 1
×
1308
                dtp = 1
×
1309
            else:
1310
                raise SchedulerException(
×
1311
                    "Drop Type '{0}' not supported".format(tt)
1312
                )
1313
            num_cpus = drop.get("num_cpus", 1)
2✔
1314
            if embed_drop:
2✔
1315
                G.add_node(
2✔
1316
                    myk,
1317
                    weight=tw,
1318
                    text=drop["name"],
1319
                    drop_type=dtp,
1320
                    drop_spec=drop,
1321
                    num_cpus=num_cpus,
1322
                )
1323
            else:
1324
                G.add_node(
×
1325
                    myk,
1326
                    weight=tw,
1327
                    text=drop["name"],
1328
                    drop_type=dtp,
1329
                    num_cpus=num_cpus,
1330
                )
1331
            for obk in out_bound_keys:
2✔
1332
                if obk in drop:
2✔
1333
                    for oup in drop[obk]:
2✔
1334
                        key = (
2✔
1335
                            list(oup.keys())[0]
1336
                            if isinstance(oup, dict)
1337
                            else oup
1338
                        )
1339
                        if CategoryType.DATA == tt:
2✔
1340
                            G.add_weighted_edges_from(
2✔
1341
                                [(myk, key_dict[key], int(drop["weight"]))]
1342
                            )
1343
                        elif CategoryType.APPLICATION == tt:
2✔
1344
                            G.add_weighted_edges_from(
2✔
1345
                                [
1346
                                    (
1347
                                        myk,
1348
                                        key_dict[key],
1349
                                        int(drop_dict[key].get("weight", 5)),
1350
                                    )
1351
                                ]
1352
                            )
1353

1354
        if fake_super_root:
2✔
1355
            super_root = dropdict(
×
1356
                {
1357
                    "oid": "-92",
1358
                    "categoryType": CategoryType.DATA,
1359
                    "dropclass": "dlg.data.drops.data_base.NullDROP",
1360
                }
1361
            )
1362
            super_k = len(drop_list) + 1
×
1363
            G.add_node(
×
1364
                super_k,
1365
                weight=0,
1366
                drop_type=0,
1367
                drop_spec=super_root,
1368
                num_cpus=0,
1369
                text="fake_super_root",
1370
            )
1371

1372
            for oup in get_roots(drop_list):
×
1373
                G.add_weighted_edges_from([(super_k, key_dict[oup], 1)])
×
1374

1375
        return G
2✔
1376

1377
    @staticmethod
2✔
1378
    def metis_part(G, num_partitions):
2✔
1379
        """
1380
        Use metis binary executable (instead of library)
1381
        This is used only for testing when libmetis halts unexpectedly
1382
        """
1383
        outf = "/tmp/mm"
×
1384
        lines = []
×
1385
        part_id_line_dict = dict()  # {part_id: line_num}
×
1386
        line_part_id_dict = dict()
×
1387
        for i, n in enumerate(G.nodes()):
×
1388
            part_id_line_dict[n] = i + 1
×
1389
            line_part_id_dict[i + 1] = n
×
1390

1391
        for i, node in enumerate(G.nodes(data=True)):
×
1392
            n = node[0]
×
1393
            line = []
×
1394
            line.append(str(node[1]["wkl"]))
×
1395
            line.append(str(node[1]["eff"]))
×
1396
            for m in G.neighbors(n):
×
1397
                line.append(str(part_id_line_dict[m]))
×
1398
                a = G[m][n]["weight"]
×
1399
                if 0 == a:
×
1400
                    logger.debug("G[%d][%d]['weight'] = %f", m, n, a)
×
1401
                line.append(str(G[m][n]["weight"]))
×
1402
            lines.append(" ".join(line))
×
1403

1404
        header = "{0} {1} 011 2".format(len(G.nodes()), len(G.edges()))
×
1405
        lines.insert(0, header)
×
1406
        with open(outf, "w") as f:
×
1407
            f.write("\n".join(lines))
×
1408

1409

1410
if __name__ == "__main__":
2✔
1411
    G = nx.DiGraph()
×
1412
    G.add_weighted_edges_from([(4, 3, 1), (3, 2, 4), (2, 1, 2), (5, 3, 1)])
×
1413
    G.add_weighted_edges_from([(3, 6, 5), (6, 7, 2)])
×
1414
    G.add_weighted_edges_from([(9, 12, 2)])  # testing independent nodes
×
1415
    G.nodes[3]["weight"] = 65
×
1416
    print(G.pred[12].items())
×
1417
    print(G.nodes[G.predecessors(12)[0]])
×
1418

1419
    # print "prepre"
1420
    # print len(G.pred[7].items())
1421
    # print G.predecessors(7)
1422
    # print G.pred[7].items()
1423
    # print ""
1424
    #
1425
    # print G.nodes(data=True)
1426
    # print G.edges(data=True)
1427

1428
    print("topological sort\n")
×
1429
    print(nx.topological_sort(G))
×
1430
    # for i, v in enumerate(nx.topological_sort(G)):
1431
    #     print i, v
1432

1433
    lp = DAGUtil.get_longest_path(G)
×
1434
    print("The longest path is {0} with a length of {1}".format(lp[0], lp[1]))
×
1435
    mw = DAGUtil.get_max_width(G)
×
1436
    dop = DAGUtil.get_max_dop(G)
×
1437
    print(
×
1438
        "The max (weighted) width = {0}, and the max degree of parallelism = {1}".format(
1439
            mw, dop
1440
        )
1441
    )
1442
    DAGUtil.label_schedule(G)
×
1443
    print(G.nodes(data=True))
×
1444
    gantt_matrix = DAGUtil.ganttchart_matrix(G)
×
1445
    print(gantt_matrix)
×
1446
    print(gantt_matrix.shape)
×
1447
    # sch = Schedule(G, 5)
1448
    # sch_mat = sch.schedule_matrix
1449
    # print sch_mat
1450
    # print sch_mat.shape
1451

1452
    # print DAGUtil.prune_antichains([[], [64], [62], [62, 64], [61], [61, 64], [61, 62], [61, 62, 64], [5], [1]])
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc