• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ICRAR / daliuge / 4911681207

pending completion
4911681207

Pull #231

github

GitHub
Merge 9186e10d1 into e48989cce
Pull Request #231: Liu 355

180 of 229 new or added lines in 17 files covered. (78.6%)

26 existing lines in 5 files now uncovered.

15345 of 19059 relevant lines covered (80.51%)

1.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.6
/daliuge-engine/dlg/dask_emulation.py
1
#
2
#    ICRAR - International Centre for Radio Astronomy Research
3
#    (c) UWA - The University of Western Australia, 2017
4
#    Copyright by UWA (in the framework of the ICRAR)
5
#    All rights reserved
6
#
7
#    This library is free software; you can redistribute it and/or
8
#    modify it under the terms of the GNU Lesser General Public
9
#    License as published by the Free Software Foundation; either
10
#    version 2.1 of the License, or (at your option) any later version.
11
#
12
#    This library is distributed in the hope that it will be useful,
13
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
14
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15
#    Lesser General Public License for more details.
16
#
17
#    You should have received a copy of the GNU Lesser General Public
18
#    License along with this library; if not, write to the Free Software
19
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston,
20
#    MA 02111-1307  USA
21
#
22
"""Utilities to emulate the `dask.delayed` function"""
2✔
23

24
import base64
2✔
25
import contextlib
2✔
26
import logging
2✔
27
import pickle
2✔
28
import socket
2✔
29
import struct
2✔
30
import time
2✔
31

32
from . import utils, droputils
2✔
33
from .apps import pyfunc
2✔
34
from .common import dropdict
2✔
35
from .ddap_protocol import DROPStates
2✔
36
from .apps.app_base import BarrierAppDROP
2✔
37
from .exceptions import InvalidDropException
2✔
38

39
logger = logging.getLogger(__name__)
2✔
40

41

42
class ResultTransmitter(BarrierAppDROP):
2✔
43
    """Collects data from all inputs and transmits it to whomever connects to
44
    the given host/port"""
45

46
    def initialize(self, **kwargs):
2✔
47
        BarrierAppDROP.initialize(self, input_error_threshold=100, **kwargs)
2✔
48
        self.host = self._popArg(kwargs, "host", "localhost")
2✔
49
        self.port = self._popArg(kwargs, "port", None)
2✔
50
        if self.port is None:
2✔
51
            raise InvalidDropException(self, "Missing port parameter")
×
52

53
    def run(self):
2✔
54
        def read_result(x):
2✔
55
            if x.status == DROPStates.ERROR:
2✔
56
                return "Error"
×
57
            try:
2✔
58
                content = pickle.loads(droputils.allDropContents(x))
2✔
NEW
59
            except EOFError:
×
NEW
60
                content = None
×
61
            return content
2✔
62

63
        results = map(read_result, self.inputs)  # @UndefinedVariable
2✔
64
        results = list(results)
2✔
65
        if len(self.inputs) == 1:
2✔
66
            results = results[0]
2✔
67
        results = pickle.dumps(results)
2✔
68

69
        s = socket.socket()
2✔
70
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
2✔
71
        s.bind((self.host, self.port))
2✔
72
        s.listen(1)
2✔
73
        client, _ = s.accept()
2✔
74
        with contextlib.closing(client):
2✔
75
            client = client.makefile("wb")
2✔
76
            client.write(struct.pack(">i", len(results)))
2✔
77
            client.write(results)
2✔
78

79

80
def _get_client(**kwargs):
2✔
81
    if "client" in kwargs:
2✔
82
        return kwargs["client"]
×
83

84
    from .manager.client import NodeManagerClient
2✔
85
    from .manager import constants
2✔
86

87
    host = kwargs.get("host", "localhost")
2✔
88
    port = kwargs.get("port", constants.NODE_DEFAULT_REST_PORT)
2✔
89
    timeout = kwargs.get("timeout", None)
2✔
90
    return NodeManagerClient(host, port, timeout)
2✔
91

92

93
def _is_list_of_delayeds(x):
2✔
94
    return (
2✔
95
        isinstance(x, (list, tuple))
96
        and len(x) > 0
97
        and isinstance(x[0], _DataDrop)
98
    )
99

100

101
def compute(value, **kwargs):
2✔
102
    """Returns the result of the (possibly) delayed computation by sending
103
    the graph to a Drop Manager and waiting for the result to arrive back"""
104

105
    # Support calling compute with a list of DelayedDrops
106
    if _is_list_of_delayeds(value):
2✔
107
        value = _DelayedDrops(*value)
2✔
108

109
    graph = value.get_graph()
2✔
110
    port = 10000
2✔
111
    # Add one final application that will wait for all results
112
    # and transmit them back to us
113
    transmitter_oid = "-1"
2✔
114
    transmitter = dropdict(
2✔
115
        {
116
            "categoryType": "Application",
117
            #            "categoryType": CategoryType.APPLICATION,
118
            # "Application": "dlg.dask_emulation.ResultTransmitter",
119
            "dropclass": "dlg.dask_emulation.ResultTransmitter",
120
            "oid": transmitter_oid,
121
            "uid": transmitter_oid,
122
            "port": port,
123
            "name": "result transmitter",
124
        }
125
    )
126
    for leaf_oid in droputils.get_leaves(graph.values()):
2✔
127
        graph[leaf_oid].addConsumer(transmitter)
2✔
128
    graph[transmitter_oid] = transmitter
2✔
129

130
    graph = list(graph.values())
2✔
131

132
    # Submit and wait
133
    session_id = "session-%f" % time.time()
2✔
134
    client = _get_client(**kwargs)
2✔
135
    client.create_session(session_id)
2✔
136
    client.append_graph(session_id, graph)
2✔
137
    client.deploy_session(
2✔
138
        session_id, completed_uids=droputils.get_roots(graph)
139
    )
140

141
    timeout = kwargs.get("timeout", None)
2✔
142
    s = utils.connect_to("localhost", port, timeout)
2✔
143
    s.settimeout(timeout)
2✔
144
    with contextlib.closing(s):
2✔
145
        s = s.makefile("rb")
2✔
146
        nbytes = struct.unpack(">i", s.read(4))[0]
2✔
147
        ret = pickle.loads(s.read(nbytes))
2✔
148
        logger.info("Received %r from graph computation", ret)
2✔
149
        return ret
2✔
150

151

152
class _DelayedDrop(object):
2✔
153
    _drop_count = 0
2✔
154

155
    def __init__(self, producer=None):
2✔
156
        self._dropdict = None
2✔
157
        self.producer = producer
2✔
158
        self.inputs = []
2✔
159

160
    @property
2✔
161
    def next_drop_oid(self):
2✔
162
        i = _DelayedDrop._drop_count
2✔
163
        _DelayedDrop._drop_count += 1
2✔
164
        return i
2✔
165

166
    @property
2✔
167
    def dropdict(self):
2✔
168
        if self._dropdict is None:
2✔
169
            self._dropdict = self.make_dropdict()
2✔
170
        return self._dropdict
2✔
171

172
    def reset(self):
2✔
173
        self._dropdict = None
2✔
174

175
    @property
2✔
176
    def oid(self):
2✔
177
        return self.dropdict["oid"]
2✔
178

179
    def compute(self, **kwargs):
2✔
180
        return compute(self, **kwargs)
×
181

182
    def get_graph(self):
2✔
183
        _DelayedDrop._drop_count = 0
2✔
184
        graph = {}
2✔
185
        visited = set()
2✔
186
        self._to_physical_graph(visited, graph)
2✔
187
        for d in visited:
2✔
188
            d.reset()
2✔
189
        return graph
2✔
190

191
    def _append_to_graph(self, visited, graph):
2✔
192
        if self in visited:
2✔
193
            return
×
194
        oid = str(self.next_drop_oid)
2✔
195
        dd = self.dropdict
2✔
196
        dd["oid"] = oid
2✔
197
        visited.add(self)
2✔
198
        graph[oid] = dd
2✔
199
        logger.debug("Appended %r/%s to the Physical Graph", self, oid)
2✔
200

201
    def _to_physical_graph(self, visited, graph):
2✔
202
        self._append_to_graph(visited, graph)
2✔
203

204
        dependencies = list(self.inputs)
2✔
205
        if self.producer:
2✔
206
            dependencies.append(self.producer)
2✔
207
        for d in dependencies:
2✔
208
            if isinstance(d, list):
2✔
209
                d = tuple(d)
×
210
            if d in visited:
2✔
211
                self._add_upstream(d)
2✔
212
                continue
2✔
213

214
            d = d._to_physical_graph(visited, graph)
2✔
215
            self._add_upstream(d)
2✔
216

217
        return self
2✔
218

219
    def _add_upstream(self, upstream):
2✔
220
        """Link the given drop as either a producer or input of this drop"""
221
        self_dd = self.dropdict
2✔
222
        up_dd = upstream.dropdict
2✔
223
        if isinstance(self, _DataDrop):
2✔
224
            self_dd.addProducer(up_dd)
2✔
225
            logger.debug(
2✔
226
                "Set %r/%s as producer of %r/%s",
227
                upstream,
228
                upstream.oid,
229
                self,
230
                self.oid,
231
            )
232
        else:
233
            self_dd.addInput(up_dd)
2✔
234
            logger.debug(
2✔
235
                "Set %r/%s as input of %r/%s",
236
                upstream,
237
                upstream.oid,
238
                self,
239
                self.oid,
240
            )
241

242

243
class _Listifier(BarrierAppDROP):
2✔
244
    """Returns a list with all objects as contents"""
245

246
    def run(self):
2✔
247
        self.outputs[0].write(
2✔
248
            pickle.dumps(
249
                [
250
                    pickle.loads(droputils.allDropContents(x))
251
                    for x in self.inputs
252
                ]
253
            )
254
        )
255

256

257
class _DelayedDrops(_DelayedDrop):
2✔
258
    """One or more _DelayedDrops treated as a single item"""
259

260
    def __init__(self, *drops):
2✔
261
        super(_DelayedDrops, self).__init__()
2✔
262
        self.drops = drops
2✔
263
        self.inputs.extend(drops)
2✔
264
        logger.debug("Created %r", self)
2✔
265

266
    def _to_physical_graph(self, visited, graph):
2✔
267
        output = _DataDrop(producer=self)
2✔
268
        output._append_to_graph(visited, graph)
2✔
269

270
        self._append_to_graph(visited, graph)
2✔
271
        output._add_upstream(self)
2✔
272

273
        for d in self.drops:
2✔
274
            d._to_physical_graph(visited, graph)
2✔
275
            self._add_upstream(d)
2✔
276

277
        return output
2✔
278

279
    def __iter__(self):
2✔
280
        return iter(self.drops)
×
281

282
    def __len__(self):
2✔
283
        return len(self.drops)
2✔
284

285
    def __getitem__(self, i):
2✔
286
        return self.drops[i]
×
287

288
    def make_dropdict(self):
2✔
289
        return dropdict(
2✔
290
            {
291
                # "oid": uuid.uuid1(),
292
                "categoryType": "Application",
293
                "dropclass": "dlg.dask_emulation._Listifier",
294
                "name": "listifier",
295
            }
296
        )
297

298
    def __repr__(self):
2✔
299
        return "<_DelayedDrops n=%d>" % (len(self.drops),)
×
300

301

302
class _AppDrop(_DelayedDrop):
2✔
303
    """Defines a PyFuncApp drop for a given function `f`"""
304

305
    def __init__(self, f, nout):
2✔
306
        _DelayedDrop.__init__(self)
2✔
307
        self.f = f
2✔
308
        self.fname = None
2✔
309
        if hasattr(f, "__name__"):
2✔
310
            self.fname = f.__name__
2✔
311
        self.fcode, self.fdefaults = pyfunc.serialize_func(f)
2✔
312
        self.original_kwarg_names = []
2✔
313
        self.nout = nout
2✔
314
        logger.debug("Created %r", self)
2✔
315

316
    def make_dropdict(self):
2✔
317
        self.kwarg_names = list(self.original_kwarg_names)
2✔
318
        self.kwarg_names.reverse()
2✔
319
        my_dropdict = dropdict(
2✔
320
            {
321
                # "oid": uuid.uuid1(),
322
                "categoryType": "Application",
323
                "dropclass": "dlg.apps.pyfunc.PyFuncApp",
324
                "func_arg_mapping": {},
325
            }
326
        )
327
        if self.fname is not None:
2✔
328
            simple_fname = self.fname.split(".")[-1]
2✔
329
            my_dropdict["func_name"] = self.fname
2✔
330
            my_dropdict["name"] = simple_fname
2✔
331
        if self.fcode is not None:
2✔
332
            my_dropdict["func_code"] = utils.b2s(base64.b64encode(self.fcode))
2✔
333
        if self.fdefaults:
2✔
334
            my_dropdict["func_defaults"] = self.fdefaults
2✔
335
        return my_dropdict
2✔
336

337
    def _add_upstream(self, dep):
2✔
338
        _DelayedDrop._add_upstream(self, dep)
2✔
339
        if self.kwarg_names:
2✔
340
            name = self.kwarg_names.pop()
2✔
341
            if name is not None:
2✔
342
                logger.debug(
×
343
                    "Adding %s/%s to function mapping for %s",
344
                    name,
345
                    dep.oid,
346
                    self.fname,
347
                )
348
                self.dropdict["func_arg_mapping"][name] = dep.oid
×
349

350
    def _to_delayed_arg(self, arg):
2✔
351
        logger.info("Turning into delayed arg for %r: %r", self, arg)
2✔
352
        if isinstance(arg, _DelayedDrop):
2✔
353
            return arg
2✔
354

355
        # Turn lists/tuples of _DataDrop objects into a _DelayedDrops
356
        if _is_list_of_delayeds(arg):
2✔
357
            return _DelayedDrops(*arg)
2✔
358

359
        # Plain data gets turned into a _DataDrop
360
        return _DataDrop(pydata=arg)
2✔
361

362
    def __call__(self, *args, **kwargs):
2✔
363
        logger.debug(
2✔
364
            "Delayed function %s called with %d args and %d kwargs",
365
            self.fname,
366
            len(args),
367
            len(kwargs),
368
        )
369
        for arg in args:
2✔
370
            self.inputs.append(self._to_delayed_arg(arg))
2✔
371
            self.original_kwarg_names.append(None)
2✔
372

373
        for name, arg in kwargs.items():
2✔
374
            self.inputs.append(self._to_delayed_arg(arg))
×
375
            self.original_kwarg_names.append(name)
×
376

377
        if self.nout is None:
2✔
378
            return _DataDrop(producer=self)
2✔
379

380
        return _DataDropSequence(nout=self.nout, producer=self)
2✔
381

382
    def __repr__(self):
2✔
383
        return "<_DelayedApp fname=%s, nout=%s>" % (self.fname, str(self.nout))
×
384

385

386
_no_data = object()
2✔
387

388

389
class _DataDrop(_DelayedDrop):
2✔
390
    """Defines an in-memory drop"""
391

392
    def __init__(self, producer=None, pydata=_no_data):
2✔
393
        _DelayedDrop.__init__(self, producer)
2✔
394

395
        if bool(producer is None) == bool(pydata is _no_data):
2✔
396
            raise ValueError("either producer or pydata must be not None")
×
397
        self.pydata = pydata
2✔
398
        logger.debug("Created %r", self)
2✔
399

400
    def make_dropdict(self):
2✔
401
        my_dropdict = dropdict(
2✔
402
            {
403
                # "oid": uuid.uuid1(),
404
                "categoryType": "Data",
405
                "dropclass": "dlg.data.drops.memory.InMemoryDROP",
406
            }
407
        )
408
        if not self.producer:
2✔
409
            my_dropdict["pydata"] = pyfunc.serialize_data(self.pydata)
2✔
410
        return my_dropdict
2✔
411

412
    def __repr__(self):
2✔
413
        if not self.producer:
×
414
            return "<_DataDrop, pydata=%r>" % (self.pydata,)
×
415
        return "<_DataDrop, producer=%r>" % self.producer
×
416

417

418
class _DataDropSequence(_DataDrop):
2✔
419
    """One or more _DataDrops that can be subscribed"""
420

421
    def __init__(self, nout, producer):
2✔
422
        super(_DataDrop, self).__init__(producer=producer)
2✔
423
        self.nout = nout
2✔
424
        logger.debug("Created %r", self)
2✔
425

426
    def __iter__(self):
2✔
427
        for i in range(self.nout):
2✔
428
            yield self[i]
2✔
429

430
    def __len__(self):
2✔
431
        return self.nout
2✔
432

433
    def __getitem__(self, i):
2✔
434
        return delayed(lambda x, i: x[i])(self, i)
2✔
435

436
    def __repr__(self):
2✔
437
        return "<_DataDropSequence nout=%d, producer=%r>" % (
×
438
            self.nout,
439
            self.producer,
440
        )
441

442

443
def delayed(x, *args, **kwargs):
2✔
444
    """Like dask.delayed, but quietly swallowing anything other than `nout`"""
445
    if "nout" in kwargs:
2✔
446
        nout = kwargs["nout"]
2✔
447
    elif args:
2✔
448
        nout = args[0]
×
449
    else:
450
        nout = None
2✔
451
    if callable(x):
2✔
452
        return _AppDrop(x, nout=nout)
2✔
453
        # return x(*args, **kwargs)
454
    return _DataDrop(pydata=x)
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc