• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ICRAR / daliuge / 5037635209

pending completion
5037635209

push

github

Andreas Wicenec
Merge branch 'master' into liu-361

15343 of 19073 relevant lines covered (80.44%)

1.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.32
/daliuge-engine/dlg/apps/app_base.py
1
from collections import OrderedDict
2✔
2
from typing import List
2✔
3
import logging
2✔
4
import math
2✔
5
import threading
2✔
6

7
from dlg.data.drops.container import ContainerDROP
2✔
8
from dlg.data.drops.data_base import DataDROP
2✔
9
from dlg.ddap_protocol import (
2✔
10
    AppDROPStates,
11
    DROPLinkType,
12
    DROPStates,
13
    DROPRel,
14
)
15
from dlg.utils import object_tracking
2✔
16
from dlg.exceptions import InvalidDropException, InvalidRelationshipException
2✔
17

18
from dlg.process import DlgProcess
2✔
19
from dlg.meta import (
2✔
20
    dlg_int_param,
21
)
22

23
logger = logging.getLogger(__name__)
2✔
24

25
track_current_drop = object_tracking("drop")
2✔
26

27
# ===============================================================================
28
# AppDROP classes follow
29
# ===============================================================================
30

31

32
class AppDROP(ContainerDROP):
2✔
33
    """
34
    An AppDROP is a DROP representing an application that reads data
35
    from one or more DataDROPs (its inputs), and writes data onto one or more
36
    DataDROPs (its outputs).
37

38
    AppDROPs accept two different kind of inputs: "normal" and "streaming"
39
    inputs. Normal inputs are DataDROPs that must be on the COMPLETED state
40
    (and therefore their data must be fully written) before this application is
41
    run, while streaming inputs are DataDROPs that feed chunks of data into
42
    this application as the data gets written into them.
43

44
    This class contains two methods that need to be overwritten by
45
    subclasses: `dropCompleted`, invoked when input DataDROPs move to
46
    COMPLETED, and `dataWritten`, invoked with the data coming from streaming
47
    inputs.
48

49
    How and when applications are executed is completely up to the app component
50
    developer, and is not enforced by this base class. Some applications might need
51
    to be run at `initialize` time, while other might start during the first invocation
52
    of `dataWritten`. A common scenario anyway is to start an application only
53
    after all its inputs have moved to COMPLETED (implying that none of them is
54
    an streaming input); for these cases see the `BarrierAppDROP`.
55
    """
56

57
    def initialize(self, **kwargs):
2✔
58
        super(AppDROP, self).initialize(**kwargs)
2✔
59

60
        # Inputs and Outputs are the DROPs that get read from and written
61
        # to by this AppDROP, respectively. An input DROP will see
62
        # this AppDROP as one of its consumers, while an output DROP
63
        # will see this AppDROP as one of its producers.
64
        #
65
        # Input and output objects are later referenced by their *index*
66
        # (relative to the order in which they were added to this object)
67
        # Therefore we use an ordered dict to keep the insertion order.
68
        self._inputs = OrderedDict()
2✔
69
        self._outputs = OrderedDict()
2✔
70

71
        # Same as above, only that these correspond to the 'streaming' version
72
        # of the consumers
73
        self._streamingInputs = OrderedDict()
2✔
74

75
        # An AppDROP has a second, separate state machine indicating its
76
        # execution status.
77
        self._execStatus = AppDROPStates.NOT_RUN
2✔
78

79
    @track_current_drop
2✔
80
    def addInput(self, inputDrop, back=True):
2✔
81
        uid = inputDrop.uid
2✔
82
        if uid not in self._inputs:
2✔
83
            self._inputs[uid] = inputDrop
2✔
84
            if back:
2✔
85
                inputDrop.addConsumer(self, False)
2✔
86

87
    @property
2✔
88
    def inputs(self) -> List[DataDROP]:
2✔
89
        """
90
        The list of inputs set into this AppDROP
91
        """
92
        return list(self._inputs.values())
2✔
93

94
    @track_current_drop
2✔
95
    def addOutput(self, outputDrop: DataDROP, back=True):
2✔
96
        if outputDrop is self:
2✔
97
            raise InvalidRelationshipException(
×
98
                DROPRel(outputDrop, DROPLinkType.OUTPUT, self),
99
                "Cannot add an AppConsumer as its own output",
100
            )
101
        uid = outputDrop.uid
2✔
102
        if uid not in self._outputs:
2✔
103
            self._outputs[uid] = outputDrop
2✔
104

105
            if back:
2✔
106
                outputDrop.addProducer(self, False)
2✔
107

108
            # Subscribe the output DROP to events sent by this AppDROP when it
109
            # finishes its execution.
110
            self.subscribe(outputDrop, "producerFinished")
2✔
111

112
    @property
2✔
113
    def outputs(self) -> List[DataDROP]:
2✔
114
        """
115
        The list of outputs set into this AppDROP
116
        """
117
        return list(self._outputs.values())
2✔
118

119
    def addStreamingInput(self, streamingInputDrop, back=True):
2✔
120
        if streamingInputDrop not in self._streamingInputs.values():
2✔
121
            uid = streamingInputDrop.uid
2✔
122
            self._streamingInputs[uid] = streamingInputDrop
2✔
123
            if back:
2✔
124
                streamingInputDrop.addStreamingConsumer(self, False)
2✔
125

126
    @property
2✔
127
    def streamingInputs(self) -> List[DataDROP]:
2✔
128
        """
129
        The list of streaming inputs set into this AppDROP
130
        """
131
        return list(self._streamingInputs.values())
2✔
132

133
    def _generateNamedInputs(self):
2✔
134
        """
135
        Generates a named mapping of input data drops. Can only be called during run().
136
        """
137
        named_inputs: OrderedDict[str, DataDROP] = OrderedDict()
×
138
        if "inputs" in self.parameters and isinstance(
×
139
            self.parameters["inputs"][0], dict
140
        ):
141
            for i in range(len(self._inputs)):
×
142
                key = list(self.parameters["inputs"][i].values())[0]
×
143
                value = self._inputs[
×
144
                    list(self.parameters["inputs"][i].keys())[0]
145
                ]
146
                named_inputs[key] = value
×
147
        else:
148
            for key, field in self.parameters["applicationArgs"].items():
×
149
                if field["usage"] in ["InputPort", "InputOutput"]:
×
150
                    named_inputs[field["name"]] = field
×
151
        return named_inputs
×
152

153
    def _generateNamedOutputs(self):
2✔
154
        """
155
        Generates a named mapping of output data drops. Can only be called during run().
156
        """
157
        named_outputs: OrderedDict[str, DataDROP] = OrderedDict()
×
158
        if "outputs" in self.parameters and isinstance(
×
159
            self.parameters["outputs"][0], dict
160
        ):
161
            for i in range(len(self._outputs)):
×
162
                key = list(self.parameters["outputs"][i].values())[0]
×
163
                value = self._outputs[
×
164
                    list(self.parameters["outputs"][i].keys())[0]
165
                ]
166
                named_outputs[key] = value
×
167
        else:
168
            for key, field in self.parameters["applicationArgs"].items():
×
169
                if field["usage"] in ["OutputPort", "InputOutput"]:
×
170
                    named_outputs[field["name"]] = field
×
171
        return named_outputs
×
172

173
    def handleEvent(self, e):
2✔
174
        """
175
        Handles the arrival of a new event. Events are delivered from those
176
        objects this DROP is subscribed to.
177
        """
178
        if e.type == "dropCompleted":
2✔
179
            self.dropCompleted(e.uid, e.status)
2✔
180

181
    def dropCompleted(self, uid, drop_state):
2✔
182
        """
183
        Callback invoked when the DROP with UID `uid` (which is either a
184
        normal or a streaming input of this AppDROP) has moved to the
185
        COMPLETED or ERROR state. By default no action is performed.
186
        """
187

188
    def dataWritten(self, uid, data):
2✔
189
        """
190
        Callback invoked when `data` has been written into the DROP with
191
        UID `uid` (which is one of the streaming inputs of this AppDROP).
192
        By default no action is performed
193
        """
194

195
    @property
2✔
196
    def execStatus(self):
2✔
197
        """
198
        The execution status of this AppDROP
199
        """
200
        return self._execStatus
2✔
201

202
    @execStatus.setter
2✔
203
    def execStatus(self, execStatus):
2✔
204
        if self._execStatus == execStatus:
2✔
205
            return
2✔
206
        self._execStatus = execStatus
2✔
207
        self._fire("execStatus", execStatus=execStatus)
2✔
208

209
    def _notifyAppIsFinished(self):
2✔
210
        """
211
        Method invoked by subclasses when the execution of the application is
212
        over. Subclasses must make sure that both the status and execStatus
213
        properties are set to their correct values correctly before invoking
214
        this method.
215
        """
216
        is_error = self._execStatus == AppDROPStates.ERROR
2✔
217
        if is_error:
2✔
218
            self.status = DROPStates.ERROR
2✔
219
        else:
220
            self.status = DROPStates.COMPLETED
2✔
221
        logger.debug(
2✔
222
            "Moving %r to %s",
223
            self.oid,
224
            "FINISHED" if not is_error else "ERROR",
225
        )
226
        self._fire(
2✔
227
            "producerFinished", status=self.status, execStatus=self.execStatus
228
        )
229
        self.completedrop()
2✔
230

231
    def cancel(self):
2✔
232
        """Moves this application drop to its CANCELLED state"""
233
        super(AppDROP, self).cancel()
2✔
234
        self.execStatus = AppDROPStates.CANCELLED
2✔
235

236
    def skip(self):
2✔
237
        """Moves this application drop to its SKIPPED state"""
238
        super().skip()
2✔
239

240
        prev_execStatus = self.execStatus
2✔
241
        self.execStatus = AppDROPStates.SKIPPED
2✔
242
        for o in self._outputs.values():
2✔
243
            o.skip()
2✔
244

245
        logger.debug(f"Moving {self.__repr__()} to SKIPPED")
2✔
246
        if prev_execStatus in [AppDROPStates.NOT_RUN]:
2✔
247
            self._fire(
2✔
248
                "producerFinished",
249
                status=self.status,
250
                execStatus=self.execStatus,
251
            )
252

253

254
class InputFiredAppDROP(AppDROP):
2✔
255
    """
256
    An InputFiredAppDROP accepts no streaming inputs and waits until a given
257
    amount of inputs (called *effective inputs*) have moved to COMPLETED to
258
    execute its 'run' method, which must be overwritten by subclasses. This way,
259
    this application allows to continue the execution of the graph given a
260
    minimum amount of inputs being ready. The transitions of subsequent inputs
261
    to the COMPLETED state have no effect.
262

263
    Normally only one call to the `run` method will happen per application.
264
    However users can override this by specifying a different number of tries
265
    before finally giving up.
266

267
    The amount of effective inputs must be less or equal to the amount of inputs
268
    added to this application once the graph is being executed. The special
269
    value of -1 means that all inputs are considered as effective, in which case
270
    this class acts as a BarrierAppDROP, effectively blocking until all its
271
    inputs have moved to the COMPLETED, SKIPPED or ERROR state. Setting this
272
    value to anything other than -1 or the number of inputs, results in
273
    late arriving inputs to be ignored, even if they would successfully finish.
274
    This requires careful implementation of the upstream and downstream apps to
275
    deal with this situation. It is only really useful to control a combination
276
    of maximum allowed execution time and acceptable number of completed inputs.
277

278
    An input error threshold controls the behavior of the application given an
279
    error in one or more of its inputs (i.e., a DROP moving to the ERROR state).
280
    The threshold is a value within 0 and 100 that indicates the tolerance
281
    to erroneous effective inputs, and after which the application will not be
282
    run but moved to the ERROR state itself instead.
283
    """
284

285
    input_error_threshold = dlg_int_param(
2✔
286
        "Input error threshold (0 and 100)", 0
287
    )
288
    n_effective_inputs = dlg_int_param("Number of effective inputs", -1)
2✔
289
    n_tries = dlg_int_param("Number of tries", 1)
2✔
290

291
    def initialize(self, **kwargs):
2✔
292
        super(InputFiredAppDROP, self).initialize(**kwargs)
2✔
293
        self._completedInputs = []
2✔
294
        self._errorInputs = []
2✔
295
        self._skippedInputs = []
2✔
296

297
        # Error threshold must be within 0 and 100
298
        if self.input_error_threshold < 0 or self.input_error_threshold > 100:
2✔
299
            raise InvalidDropException(
×
300
                self, "%r: input_error_threshold not within [0,100]" % (self,)
301
            )
302

303
        # Amount of effective inputs
304
        if "n_effective_inputs" not in kwargs:
2✔
305
            raise InvalidDropException(
2✔
306
                self, "%r: n_effective_inputs is mandatory" % (self,)
307
            )
308

309
        if self.n_effective_inputs < -1 or self.n_effective_inputs == 0:
2✔
310
            raise InvalidDropException(
2✔
311
                self,
312
                "%r: n_effective_inputs must be > 0 or equals to -1" % (self,),
313
            )
314

315
        # Number of tries
316
        if self.n_tries < 1:
2✔
317
            raise InvalidDropException(
×
318
                self, "Invalid n_tries, must be a positive number"
319
            )
320

321
    def addStreamingInput(self, streamingInputDrop, back=True):
2✔
322
        raise InvalidRelationshipException(
×
323
            DROPRel(streamingInputDrop, DROPLinkType.STREAMING_INPUT, self),
324
            "InputFiredAppDROPs don't accept streaming inputs",
325
        )
326

327
    def dropCompleted(self, uid, drop_state):
2✔
328
        super(InputFiredAppDROP, self).dropCompleted(uid, drop_state)
2✔
329

330
        logger.debug(
2✔
331
            "Received notification from input drop: uid=%s, state=%d",
332
            uid,
333
            drop_state,
334
        )
335

336
        # A value of -1 means all inputs
337
        n_inputs = len(self._inputs)
2✔
338
        n_eff_inputs = self.n_effective_inputs
2✔
339
        if n_eff_inputs == -1:
2✔
340
            n_eff_inputs = n_inputs
2✔
341

342
        # More effective inputs than inputs, this is a horror
343
        if n_eff_inputs > n_inputs:
2✔
344
            raise Exception(
2✔
345
                "%r: More effective inputs (%d) than inputs (%d)"
346
                % (self, self.n_effective_inputs, n_inputs)
347
            )
348

349
        if drop_state == DROPStates.ERROR:
2✔
350
            self._errorInputs.append(uid)
2✔
351
        elif drop_state == DROPStates.COMPLETED:
2✔
352
            self._completedInputs.append(uid)
2✔
353
        elif drop_state == DROPStates.SKIPPED:
2✔
354
            self._skippedInputs.append(uid)
2✔
355
        else:
356
            raise Exception(
×
357
                "Invalid DROP state in dropCompleted: %s" % drop_state
358
            )
359

360
        error_len = len(self._errorInputs)
2✔
361
        ok_len = len(self._completedInputs)
2✔
362
        skipped_len = len(self._skippedInputs)
2✔
363

364
        # We have enough inputs to proceed
365
        if (skipped_len + error_len + ok_len) == n_eff_inputs:
2✔
366
            # calculate the number of errors that have already occurred
367
            percent_failed = math.floor(
2✔
368
                (error_len / float(n_eff_inputs)) * 100
369
            )
370
            if percent_failed > 0:
2✔
371
                logger.debug(
2✔
372
                    "Error rate on inputs for %r: %d/%d",
373
                    self,
374
                    percent_failed,
375
                    self.input_error_threshold,
376
                )
377

378
            # if we hit the input error threshold then ERROR the drop and move on
379
            if percent_failed > self.input_error_threshold:
2✔
380
                logger.info(
2✔
381
                    "Error threshold reached on %r, not executing it: %d/%d",
382
                    self,
383
                    percent_failed,
384
                    self.input_error_threshold,
385
                )
386

387
                self.execStatus = AppDROPStates.ERROR
2✔
388
                self.status = DROPStates.ERROR
2✔
389
                self._notifyAppIsFinished()
2✔
390
            elif skipped_len == n_eff_inputs:
2✔
391
                self.skip()
2✔
392
            else:
393
                self.async_execute()
2✔
394

395
    def async_execute(self):
2✔
396
        # Return immediately, but schedule the execution of this app
397
        # If we have been given a thread pool use that
398
        if hasattr(self, "_tp"):
2✔
399
            self._tp.apply_async(self._execute_and_log_exception)
2✔
400
        else:
401
            t = threading.Thread(target=self._execute_and_log_exception)
2✔
402
            t.daemon = 1
2✔
403
            t.start()
2✔
404
            return t
2✔
405

406
    def _execute_and_log_exception(self):
2✔
407
        try:
2✔
408
            self.execute()
2✔
409
        except:
2✔
410
            logger.exception(
2✔
411
                "Unexpected exception during drop (%r) execution", self
412
            )
413

414
    _dlg_proc_lock = threading.Lock()
2✔
415

416
    @track_current_drop
2✔
417
    def execute(self, _send_notifications=True):
2✔
418
        """
419
        Manually trigger the execution of this application.
420

421
        This method is normally invoked internally when the application detects
422
        all its inputs are COMPLETED.
423
        """
424

425
        # TODO: We need to be defined more clearly how the state is set in
426
        #       applications, for the time being they follow their execState.
427

428
        # Run at most self._n_tries if there are errors during the execution
429
        logger.debug("Executing %r", self.oid)
2✔
430
        tries = 0
2✔
431
        drop_state = DROPStates.COMPLETED
2✔
432
        self.execStatus = AppDROPStates.RUNNING
2✔
433
        while tries < self.n_tries:
2✔
434
            try:
2✔
435
                if hasattr(self, "_tp"):
2✔
436
                    proc = DlgProcess(target=self.run, daemon=True)
2✔
437
                    # see YAN-975 for why this is happening
438
                    lock = InputFiredAppDROP._dlg_proc_lock
2✔
439
                    with lock:
2✔
440
                        proc.start()
2✔
441
                    with lock:
2✔
442
                        proc.join()
2✔
443
                    proc.close()
2✔
444
                    if proc.exception:
2✔
445
                        raise proc.exception
×
446
                else:
447
                    self.run()
2✔
448
                if self.execStatus == AppDROPStates.CANCELLED:
2✔
449
                    return
×
450
                self.execStatus = AppDROPStates.FINISHED
2✔
451
                break
2✔
452
            except:
2✔
453
                if self.execStatus == AppDROPStates.CANCELLED:
2✔
454
                    return
2✔
455
                tries += 1
2✔
456
                logger.exception(
2✔
457
                    "Error while executing %r (try %s/%s)",
458
                    self,
459
                    tries,
460
                    self.n_tries,
461
                )
462

463
        # We gave up running the application, go to error
464
        if tries == self.n_tries:
2✔
465
            self.execStatus = AppDROPStates.ERROR
2✔
466
            drop_state = DROPStates.ERROR
2✔
467

468
        self.status = drop_state
2✔
469
        if _send_notifications:
2✔
470
            self._notifyAppIsFinished()
2✔
471

472
    def run(self):
2✔
473
        """
474
        Run this application. It can be safely assumed that at this point all
475
        the required inputs are COMPLETED.
476
        """
477

478
    # TODO: another thing we need to check
479
    def exists(self):
2✔
480
        return True
2✔
481

482

483
class BarrierAppDROP(InputFiredAppDROP):
2✔
484
    """
485
    A BarrierAppDROP is an InputFireAppDROP that waits for all its inputs to
486
    complete, effectively blocking the flow of the graph execution.
487
    """
488

489
    def initialize(self, **kwargs):
2✔
490
        # Blindly override existing value if any
491
        kwargs["n_effective_inputs"] = -1
2✔
492
        super().initialize(**kwargs)
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc