• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ICRAR / daliuge / 4911681207

pending completion
4911681207

Pull #231

github

GitHub
Merge 9186e10d1 into e48989cce
Pull Request #231: Liu 355

180 of 229 new or added lines in 17 files covered. (78.6%)

26 existing lines in 5 files now uncovered.

15345 of 19059 relevant lines covered (80.51%)

1.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.17
/daliuge-engine/dlg/apps/dynlib.py
1
#
2
#    ICRAR - International Centre for Radio Astronomy Research
3
#    (c) UWA - The University of Western Australia, 2017
4
#    Copyright by UWA (in the framework of the ICRAR)
5
#    All rights reserved
6
#
7
#    This library is free software; you can redistribute it and/or
8
#    modify it under the terms of the GNU Lesser General Public
9
#    License as published by the Free Software Foundation; either
10
#    version 2.1 of the License, or (at your option) any later version.
11
#
12
#    This library is distributed in the hope that it will be useful,
13
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
14
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15
#    Lesser General Public License for more details.
16
#
17
#    You should have received a copy of the GNU Lesser General Public
18
#    License along with this library; if not, write to the Free Software
19
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston,
20
#    MA 02111-1307  USA
21
#
22

23
import ctypes
2✔
24
import functools
2✔
25
import logging
2✔
26
import multiprocessing
2✔
27
import queue
2✔
28
import threading
2✔
29
import six
2✔
30

31
from .. import rpc, utils
2✔
32
from ..ddap_protocol import AppDROPStates
2✔
33
from ..apps.app_base import AppDROP, BarrierAppDROP
2✔
34
from ..exceptions import InvalidDropException
2✔
35

36
logger = logging.getLogger(__name__)
2✔
37

38
_read_cb_type = ctypes.CFUNCTYPE(
2✔
39
    ctypes.c_size_t, ctypes.POINTER(ctypes.c_char), ctypes.c_size_t
40
)
41

42
_write_cb_type = ctypes.CFUNCTYPE(
2✔
43
    ctypes.c_size_t, ctypes.POINTER(ctypes.c_char), ctypes.c_size_t
44
)
45

46
_app_running_cb_type = ctypes.CFUNCTYPE(None)
2✔
47

48
_app_done_cb_type = ctypes.CFUNCTYPE(None, ctypes.c_int)
2✔
49

50

51
class CDlgInput(ctypes.Structure):
2✔
52
    _fields_ = [
2✔
53
        ("uid", ctypes.c_char_p),
54
        ("oid", ctypes.c_char_p),
55
        ("name", ctypes.c_char_p),
56
        ("status", ctypes.c_int),
57
        ("read", _read_cb_type),
58
    ]
59

60

61
class CDlgStreamingInput(ctypes.Structure):
2✔
62
    _fields_ = [
2✔
63
        ("uid", ctypes.c_char_p),
64
        ("oid", ctypes.c_char_p),
65
        ("name", ctypes.c_char_p),
66
    ]
67

68

69
class CDlgOutput(ctypes.Structure):
2✔
70
    _fields_ = [
2✔
71
        ("uid", ctypes.c_char_p),
72
        ("oid", ctypes.c_char_p),
73
        ("name", ctypes.c_char_p),
74
        ("write", _write_cb_type),
75
    ]
76

77

78
class CDlgApp(ctypes.Structure):
2✔
79
    _fields_ = [
2✔
80
        ("appname", ctypes.c_char_p),
81
        ("uid", ctypes.c_char_p),
82
        ("oid", ctypes.c_char_p),
83
        ("ranks", ctypes.POINTER(ctypes.c_int32)),
84
        ("n_ranks", ctypes.c_uint),
85
        ("inputs", ctypes.POINTER(CDlgInput)),
86
        ("n_inputs", ctypes.c_uint),
87
        ("streaming_inputs", ctypes.POINTER(CDlgStreamingInput)),
88
        ("n_streaming_inputs", ctypes.c_uint),
89
        ("outputs", ctypes.POINTER(CDlgOutput)),
90
        ("n_outputs", ctypes.c_uint),
91
        ("running", _app_running_cb_type),
92
        ("done", _app_done_cb_type),
93
        ("data", ctypes.c_void_p),
94
    ]
95

96
    def pack_python(self):
2✔
97
        out = {}
×
98
        for key, val in self._fields_:
×
99
            out[key] = repr(getattr(self, key))
×
100
        return out
×
101

102

103
def _to_c_input(i):
2✔
104
    """
105
    Convert an input drop into its corresponding C structure
106
    """
107

108
    input_read = i.read
2✔
109

110
    def _read(desc_, buf, n):
2✔
111
        x = input_read(desc_, n)
2✔
112
        ctypes.memmove(buf, x, len(x))
2✔
113
        return len(x)
2✔
114

115
    desc = i.open()
2✔
116
    r = _read_cb_type(functools.partial(_read, desc))
2✔
117
    c_input = CDlgInput(
2✔
118
        i.uid.encode("utf8"),
119
        i.oid.encode("utf8"),
120
        i.name.encode("utf8"),
121
        i.status,
122
        r,
123
    )
124
    return desc, c_input
2✔
125

126

127
def _to_c_output(o):
2✔
128
    """
129
    Convert an output drop into its corresponding C structure
130
    """
131

132
    def _write(_o, buf, n, **kwargs):
2✔
133
        return _o.write(buf[:n], **kwargs)
2✔
134

135
    w = _write_cb_type(functools.partial(_write, o))
2✔
136
    return CDlgOutput(
2✔
137
        o.uid.encode("utf8"), o.oid.encode("utf8"), o.name.encode("utf8"), w
138
    )
139

140

141
def prepare_c_inputs(c_app, inputs):
2✔
142
    """
143
    Converts all inputs to its C equivalents and sets them into `c_app`
144
    """
145

146
    c_inputs = []
2✔
147
    input_closers = []
2✔
148
    for i in inputs:
2✔
149
        desc, c_input = _to_c_input(i)
2✔
150
        input_closers.append(functools.partial(i.close, desc))
2✔
151
        c_inputs.append(c_input)
2✔
152
    c_app.inputs = (CDlgInput * len(c_inputs))(*c_inputs)
2✔
153
    c_app.n_inputs = len(c_inputs)
2✔
154
    return input_closers
2✔
155

156

157
def prepare_c_outputs(c_app, outputs):
2✔
158
    """
159
    Converts all outputs to its C equivalents and sets them into `c_app`
160
    """
161

162
    c_outputs = [_to_c_output(o) for o in outputs]
2✔
163
    c_app.outputs = (CDlgOutput * len(c_outputs))(*c_outputs)
2✔
164
    c_app.n_outputs = len(c_outputs)
2✔
165

166

167
def prepare_c_ranks(c_app, ranks):
2✔
168
    """
169
    Convert the ranks list into its C equivalent and sets them to `c_app`
170
    """
171
    if ranks is None:
2✔
172
        # The number of ranks is
173
        c_app.n_ranks = 0
2✔
174
    else:
175
        c_app.ranks = (ctypes.c_int32 * len(ranks))(*ranks)
×
176
        c_app.n_ranks = len(ranks)
×
177

178

179
def run(lib, c_app, input_closers):
2✔
180
    """
181
    Invokes the `run` method on `lib` with the given `c_app`. After completion,
182
    all opened file descriptors are closed.
183
    """
184
    try:
2✔
185
        if hasattr(lib, "run2"):
2✔
186
            # With run2 we pass the results as a PyObject*
187
            run2 = lib.run2
2✔
188
            run2.restype = ctypes.py_object
2✔
189
            result = run2(ctypes.pointer(c_app))
2✔
190
            if isinstance(result, Exception):
2✔
191
                raise result
2✔
192
            if result:
2✔
193
                raise Exception(
×
194
                    "Invocation of {}:run2 returned with status {}".format(
195
                        lib, result
196
                    )
197
                )
198

199
        elif lib.run(ctypes.pointer(c_app)):
2✔
200
            raise Exception(
×
201
                "Invocation of %r:run returned with status != 0" % lib
202
            )
203
    finally:
204
        for closer in input_closers:
2✔
205
            closer()
2✔
206

207

208
class InvalidLibrary(Exception):
2✔
209
    pass
2✔
210

211

212
def load_and_init(libname, oid, uid, params):
2✔
213
    """
214
    Loads and initializes `libname` with the given parameters, prepares the
215
    corresponding C application structure, and returns both objects
216
    """
217

218
    # Try with a simple name, or as full path
219
    from ctypes.util import find_library
2✔
220

221
    libname = find_library(libname) or libname
2✔
222

223
    lib = ctypes.cdll.LoadLibrary(libname)
2✔
224
    logger.info("Loaded %s as %r", libname, lib)
2✔
225

226
    one_of_functions = [["run", "run2"], ["init", "init2"]]
2✔
227
    for functions in one_of_functions:
2✔
228
        found_one = False
2✔
229
        for fname in functions:
2✔
230
            if hasattr(lib, fname):
2✔
231
                found_one = True
2✔
232
                break
2✔
233

234
        if not found_one:
2✔
235
            raise InvalidLibrary(
×
236
                "{} doesn't have one of the functions {}".format(
237
                    libname, functions
238
                )
239
            )
240

241
    # Create the initial contents of the C dlg_app_info structure
242
    # We pass no inputs because we don't know them (and don't need them)
243
    # at this point yet.
244
    # The running and done callbacks are also NULLs
245
    c_app = CDlgApp(
2✔
246
        None,
247
        uid.encode("utf8"),
248
        oid.encode("utf8"),
249
        None,
250
        0,
251
        None,
252
        0,
253
        None,
254
        0,
255
        None,
256
        0,
257
        ctypes.cast(None, _app_running_cb_type),
258
        ctypes.cast(None, _app_done_cb_type),
259
        None,
260
    )
261

262
    if hasattr(lib, "init2"):
2✔
263
        # With init2 we pass the params as a PyObject*
264
        logger.info("Extra parameters passed to application: %r", params)
2✔
265
        init2 = lib.init2
2✔
266
        init2.restype = ctypes.py_object
2✔
267
        result = init2(ctypes.pointer(c_app), ctypes.py_object(params))
2✔
268
        if isinstance(result, Exception):
2✔
269
            raise result
2✔
270
        if result:
2✔
271
            raise InvalidLibrary(
×
272
                "{} failed during initialization (init2)".format(libname)
273
            )
274

275
    elif hasattr(lib, "init"):
2✔
276
        # Collect the rest of the parameters to pass them down to the library
277
        # We need to keep them in a local variable so when we expose them to
278
        # the app later on via pointers we still have their contents
279
        local_params = [
2✔
280
            (str(k).encode("utf8"), str(v).encode("utf8"))
281
            for k, v in params.items()
282
        ]
283
        logger.debug(
2✔
284
            "Extra parameters passed to application: %r", local_params
285
        )
286

287
        # Wrap in ctypes
288
        str_ptr_type = ctypes.POINTER(ctypes.c_char_p)
2✔
289
        two_str_type = ctypes.c_char_p * 2
2✔
290
        app_params = [two_str_type(k, v) for k, v in local_params]
2✔
291
        app_params.append(None)
2✔
292
        params = (str_ptr_type * len(app_params))(*app_params)
2✔
293

294
        # Let the shared library initialize this app
295
        # If we have a list of key/value pairs that are all strings
296
        if lib.init(ctypes.pointer(c_app), params):
2✔
297
            raise InvalidLibrary(
×
298
                "{} failed during initialization (init)".format(libname)
299
            )
300

301
    else:
302
        raise InvalidLibrary(
×
303
            "{} failed during initialization. No init or init2".format(libname)
304
        )
305

306
    return lib, c_app
2✔
307

308

309
class DynlibAppBase(object):
2✔
310
    def initialize(self, **kwargs):
2✔
311
        super(DynlibAppBase, self).initialize(**kwargs)
2✔
312

313
        if "lib" not in kwargs:
2✔
314
            raise InvalidDropException(self, "library not specified")
×
315

316
        try:
2✔
317
            self.lib, self._c_app = load_and_init(
2✔
318
                kwargs.pop("lib"), self.oid, self.uid, kwargs
319
            )
320
        except InvalidLibrary as e:
2✔
321
            raise InvalidDropException(self, e.args[0])
×
322

323
        # Have we properly set the outputs in the C application structure yet?
324
        self._c_outputs_set = False
2✔
325
        self._c_outputs_setting_lock = threading.Lock()
2✔
326

327
    def _ensure_c_outputs_are_set(self):
2✔
328
        with self._c_outputs_setting_lock:
2✔
329
            if self._c_outputs_set:
2✔
330
                return
×
331
            prepare_c_outputs(self._c_app, self.outputs)
2✔
332

333

334
class DynlibStreamApp(DynlibAppBase, AppDROP):
2✔
335
    def initialize(self, **kwargs):
2✔
336
        super(DynlibStreamApp, self).initialize(**kwargs)
2✔
337

338
        # Set up callbacks for the library to signal they the application
339
        # is running, and that it has ended
340
        def _running():
2✔
341
            self.execStatus = AppDROPStates.RUNNING
2✔
342

343
        def _done(status):
2✔
344
            self.execStatus = status
2✔
345
            self._notifyAppIsFinished()
2✔
346

347
        self._c_app.running = _app_running_cb_type(_running)
2✔
348
        self._c_app.done = _app_done_cb_type(_done)
2✔
349

350
    def dataWritten(self, uid, data):
2✔
351
        self._ensure_c_outputs_are_set()
2✔
352
        app_p = ctypes.pointer(self._c_app)
2✔
353
        self.lib.data_written(app_p, uid.encode("utf8"), data, len(data))
2✔
354

355
    def dropCompleted(self, uid, drop_state):
2✔
356
        self._ensure_c_outputs_are_set()
2✔
357
        app_p = ctypes.pointer(self._c_app)
2✔
358
        self.lib.drop_completed(app_p, uid.encode("utf8"), drop_state)
2✔
359

360
    def addInput(self, inputDrop, back=True):
2✔
361
        super(DynlibStreamApp, self).addInput(inputDrop, back)
×
362
        self._c_app.n_inputs += 1
×
363

364
    def addStreamingInput(self, streamingInputDrop, back=True):
2✔
365
        super(DynlibStreamApp, self).addStreamingInput(
2✔
366
            streamingInputDrop, back
367
        )
368
        self._c_app.n_streaming_inputs += 1
2✔
369

370
    def generate_recompute_data(self):
2✔
371
        out = {"status": self.status}
×
372
        data = self._c_app.pack_python()
×
373
        if data is not None:
×
374
            out.update(data)
×
375
        return out
×
376

377

378
##
379
# @brief DynlibApp
380
# @details An application component run from a dynamic library
381
# @par EAGLE_START
382
# @param category DynlibApp
383
# @param tag template
384
# @param libpath Library Path//String/ComponentParameter/readwrite//False/False/The location of the shared object/DLL that implements this application
385
# @param dropclass dropclass/dlg.apps.dynlib.DynlibApp/String/ComponentParameter/readwrite//False/False/Drop class
386
# @param execution_time Execution Time/5/Float/ComponentParameter/readonly//False/False/Estimated execution time
387
# @param num_cpus No. of CPUs/1/Integer/ComponentParameter/readonly//False/False/Number of cores used
388
# @param group_start Group start/False/Boolean/ComponentParameter/readwrite//False/False/Is this node the start of a group?
389
# @param input_error_threshold "Input error rate (%)"/0/Integer/ComponentParameter/readwrite//False/False/the allowed failure rate of the inputs (in percent), before this component goes to ERROR state and is not executed
390
# @param n_tries Number of tries/1/Integer/ComponentParameter/readwrite//False/False/Specifies the number of times the 'run' method will be executed before finally giving up
391
# @par EAGLE_END
392
class DynlibApp(DynlibAppBase, BarrierAppDROP):
2✔
393
    """Loads a dynamic library into the current process and runs it"""
394

395
    def initialize(self, **kwargs):
2✔
396
        super(DynlibApp, self).initialize(**kwargs)
2✔
397
        self.ranks = self._popArg(kwargs, "rank", None)
2✔
398

399
    def run(self):
2✔
400
        input_closers = prepare_c_inputs(self._c_app, self.inputs)
2✔
401
        prepare_c_ranks(self._c_app, self.ranks)
2✔
402
        self._ensure_c_outputs_are_set()
2✔
403
        run(self.lib, self._c_app, input_closers)
2✔
404

405
    def generate_recompute_data(self):
2✔
406
        out = {"status": self.status}
×
407
        if self._c_app is None:
×
408
            return out
×
409
        else:
410
            out.update(self._c_app.pack_python())
×
411
            return out
×
412

413

414
class FinishSubprocess(Exception):
2✔
415
    pass
2✔
416

417

418
def _run_in_proc(*args):
2✔
419
    try:
×
420
        _do_run_in_proc(*args)
×
421
    except FinishSubprocess:
×
422
        pass
×
423

424

425
def _do_run_in_proc(queue, libname, oid, uid, params, inputs, outputs):
2✔
426
    def advance_step(f, *args, **kwargs):
×
427
        try:
×
428
            r = f(*args, **kwargs)
×
429
            queue.put(None)
×
430
            return r
×
431
        except Exception as e:
×
432
            queue.put(e)
×
433
            raise FinishSubprocess()
×
434

435
    # Step 1: initialise the library and return if there is an error
436
    lib, c_app = advance_step(load_and_init, libname, oid, uid, params)
×
437

438
    client = rpc.RPCClient()
×
439
    try:
×
440
        client.start()
×
441

442
        def setup_drop_proxies(inputs, outputs):
×
443
            to_drop_proxy = lambda x: rpc.DropProxy(
×
444
                client, x[0], x[1], x[2], x[3]
445
            )
446
            inputs = [to_drop_proxy(i) for i in inputs]
×
447
            outputs = [to_drop_proxy(o) for o in outputs]
×
448
            return inputs, outputs
×
449

450
        inputs, outputs = advance_step(setup_drop_proxies, inputs, outputs)
×
451

452
        # Step 3: Finish initializing the C structure and run the application
453
        def do_run():
×
454
            input_closers = prepare_c_inputs(c_app, inputs)
×
455
            prepare_c_outputs(c_app, outputs)
×
456
            run(lib, c_app, input_closers)
×
457

458
        advance_step(do_run)
×
459
    finally:
460
        client.shutdown()
×
461

462

463
def get_from_subprocess(proc, q):
2✔
464
    """Gets elements from the queue, checking that the process is still alive"""
465
    while proc.is_alive():
2✔
466
        try:
2✔
467
            return q.get(timeout=0.1)
2✔
468
        except queue.Empty:
2✔
469
            pass
2✔
470
    raise RuntimeError("Subprocess died unexpectedly")
2✔
471

472

473
##
474
# @brief DynlibProcApp
475
# @details An application component run from a dynamic library in a different process
476
# @par EAGLE_START
477
# @param category DynlibProcApp
478
# @param tag template
479
# @param libpath Library Path//String/ComponentParameter/readwrite//False/False/The location of the shared object/DLL that implements this application
480
# @param execution_time Execution Time/5/Float/ComponentParameter/readonly//False/False/Estimated execution time
481
# @param num_cpus No. of CPUs/1/Integer/ComponentParameter/readonly//False/False/Number of cores used
482
# @param group_start Group start/False/Boolean/ComponentParameter/readwrite//False/False/Is this node the start of a group?
483
# @param input_error_threshold "Input error rate (%)"/0/Integer/ComponentParameter/readwrite//False/False/the allowed failure rate of the inputs (in percent), before this component goes to ERROR state and is not executed
484
# @param n_tries Number of tries/1/Integer/ComponentParameter/readwrite//False/False/Specifies the number of times the 'run' method will be executed before finally giving up
485
# @par EAGLE_END
486
class DynlibProcApp(BarrierAppDROP):
2✔
487
    """Loads a dynamic library in a different process and runs it"""
488

489
    def initialize(self, **kwargs):
2✔
490
        super(DynlibProcApp, self).initialize(**kwargs)
2✔
491

492
        if "lib" not in kwargs:
2✔
493
            raise InvalidDropException(self, "library not specified")
×
494
        self.libname = kwargs.pop("lib")
2✔
495
        self.timeout = self._popArg(kwargs, "timeout", 600)  # 10 minutes
2✔
496
        self.app_params = kwargs
2✔
497
        self.proc = None
2✔
498

499
    def run(self):
2✔
500
        if not hasattr(self, "_rpc_server"):
2✔
UNCOV
501
            raise Exception("DynlibProcApp can only run within an RPC server")
×
502

503
        # On the sub-process we create DropProxy objects, so we need to extract
504
        # from our inputs/outputs their contact point (RPC-wise) information.
505
        # If one of our inputs/outputs is a DropProxy we already have this
506
        # information; otherwise we must figure it out.
507
        inputs = [self._get_proxy_info(i) for i in self.inputs]
2✔
508
        outputs = [self._get_proxy_info(o) for o in self.outputs]
2✔
509

510
        logger.info("Starting new process to run the dynlib on")
2✔
511
        queue = multiprocessing.Queue()
2✔
512
        args = (
2✔
513
            queue,
514
            self.libname,
515
            self.oid,
516
            self.uid,
517
            self.app_params,
518
            inputs,
519
            outputs,
520
        )
521
        self.proc = multiprocessing.Process(target=_run_in_proc, args=args)
2✔
522
        self.proc.start()
2✔
523

524
        try:
2✔
525
            steps = (
2✔
526
                "loading and initialising library",
527
                "creating DropProxy instances",
528
                "running the application",
529
            )
530
            for step in steps:
2✔
531
                logger.info("Subprocess %s", step)
2✔
532
                error = get_from_subprocess(self.proc, queue)
2✔
533
                if error is not None:
2✔
534
                    logger.error("Error in sub-process when %s", step)
×
535
                    raise error
×
536
        finally:
537
            self.proc.join(self.timeout)
2✔
538

539
    def _get_proxy_info(self, x):
2✔
540
        if isinstance(x, rpc.DropProxy):
2✔
541
            return x.hostname, x.port, x.session_id, x.uid
2✔
542

543
        # TODO: we can't use the NodeManager's host directly here, as that
544
        #       indicates the address the different servers *bind* to
545
        #       (and, for example, can be 0.0.0.0)
546
        rpc_server = x._rpc_server
2✔
547
        host, port = rpc_server._rpc_host, rpc_server._rpc_port
2✔
548
        host = utils.to_externally_contactable_host(host, prefer_local=True)
2✔
549
        return (host, port, x._dlg_session.sessionId, x.uid)
2✔
550

551
    def cancel(self):
2✔
552
        BarrierAppDROP.cancel(self)
2✔
553
        try:
2✔
554
            self.proc.terminate()
2✔
555
        except:
×
556
            logger.exception("Error while terminating process %r", self.proc)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc