• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ICRAR / daliuge / 10363213063

13 Aug 2024 03:39AM UTC coverage: 79.63% (-0.09%) from 79.722%
10363213063

Pull #271

github

web-flow
Merge branch 'master' into liu-377
Pull Request #271: Liu 377

70 of 122 new or added lines in 13 files covered. (57.38%)

12 existing lines in 6 files now uncovered.

15375 of 19308 relevant lines covered (79.63%)

1.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.16
/daliuge-engine/dlg/apps/simple.py
1
#
2
#    ICRAR - International Centre for Radio Astronomy Research
3
#    (c) UWA - The University of Western Australia, 2017
4
#    Copyright by UWA (in the framework of the ICRAR)
5
#    All rights reserved
6
#
7
#    This library is free software; you can redistribute it and/or
8
#    modify it under the terms of the GNU Lesser General Public
9
#    License as published by the Free Software Foundation; either
10
#    version 2.1 of the License, or (at your option) any later version.
11
#
12
#    This library is distributed in the hope that it will be useful,
13
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
14
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15
#    Lesser General Public License for more details.
16
#
17
#    You should have received a copy of the GNU Lesser General Public
18
#    License along with this library; if not, write to the Free Software
19
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston,
20
#    MA 02111-1307  USA
21
#
22
"""Applications used as examples, for testing, or in simple situations"""
23
import _pickle
2✔
24
from numbers import Number
2✔
25
import pickle
2✔
26
import random
2✔
27
from typing import List, Optional
2✔
28
import requests
2✔
29
import logging
2✔
30
import time
2✔
31
import numpy as np
2✔
32

33
from dlg import droputils, drop_loaders
2✔
34
from dlg.apps.app_base import BarrierAppDROP
2✔
35
from dlg.data.drops.container import ContainerDROP
2✔
36
from dlg.data.drops import InMemoryDROP, FileDROP
2✔
37
from dlg.apps.branch import BranchAppDrop
2✔
38
from dlg.meta import (
2✔
39
    dlg_float_param,
40
    dlg_string_param,
41
    dlg_bool_param,
42
    dlg_int_param,
43
    dlg_list_param,
44
    dlg_dict_param,
45
    dlg_component,
46
    dlg_batch_input,
47
    dlg_batch_output,
48
    dlg_streaming_input,
49
)
50
from dlg.exceptions import DaliugeException
2✔
51
from dlg.rpc import DropProxy
2✔
52

53

54
logger = logging.getLogger(__name__)
2✔
55

56

57
class NullBarrierApp(BarrierAppDROP):
2✔
58
    component_meta = dlg_component(
2✔
59
        "NullBarrierApp",
60
        "Null Barrier.",
61
        [dlg_batch_input("binary/*", [])],
62
        [dlg_batch_output("binary/*", [])],
63
        [dlg_streaming_input("binary/*")],
64
    )
65

66
    """A BarrierAppDrop that doesn't perform any work"""
67

68
    def run(self):
2✔
69
        pass
2✔
70

71

72
##
73
# @brief PythonApp
74
# @details A placeholder APP to aid construction of new applications.
75
# This is mainly useful (and used) when starting a new workflow from scratch.
76
# @par EAGLE_START
77
# @param category PythonApp
78
# @param tag template
79
# @param dropclass dlg.apps.simple.PythonApp/String/ComponentParameter/NoPort/ReadOnly//False/False/Application class
80
# @param base_name simple/String/ComponentParameter/NoPort/ReadOnly//False/False/Base name of application class
81
# @param num_cpus 1/Integer/ConstraintParameter/NoPort/ReadOnly//False/False/Number of cores used
82
# @param execution_time 5/Float/ConstraintParameter/NoPort/ReadOnly//False/False/Estimated execution time
83
# @param group_start False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Is this node the start of a group?
84
# @par EAGLE_END
85
class PythonApp(BarrierAppDROP):
2✔
86
    """A placeholder BarrierAppDrop that just aids the generation of the palette component"""
87

88
    pass
2✔
89

90

91
##
92
# @brief SleepApp
93
# @details A simple APP that sleeps the specified amount of time (0 by default).
94
# This is mainly useful (and used) to test graph translation and structure
95
# without executing real algorithms. Very useful for debugging.
96
# @par EAGLE_START
97
# @param category PythonApp
98
# @param tag daliuge
99
# @param sleep_time 5/Integer/ApplicationArgument/NoPort/ReadWrite//False/False/The number of seconds to sleep
100
# @param dropclass dlg.apps.simple.SleepApp/String/ComponentParameter/NoPort/ReadOnly//False/False/Application class
101
# @param base_name simple/String/ComponentParameter/NoPort/ReadOnly//False/False/Base name of application class
102
# @param execution_time 5/Float/ConstraintParameter/NoPort/ReadOnly//False/False/Estimated execution time
103
# @param num_cpus 1/Integer/ConstraintParameter/NoPort/ReadOnly//False/False/Number of cores used
104
# @param group_start False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Is this node the start of a group?
105
# @param input_parser pickle/Select/ComponentParameter/NoPort/ReadWrite/raw,pickle,eval,npy,path,dataurl/False/False/Input port parsing technique
106
# @param output_parser pickle/Select/ComponentParameter/NoPort/ReadWrite/raw,pickle,eval,npy,path,dataurl/False/False/Output port parsing technique
107
# @par EAGLE_END
108

109

110
class SleepApp(BarrierAppDROP):
2✔
111
    """A BarrierAppDrop that sleeps the specified amount of time (0 by default)"""
112

113
    component_meta = dlg_component(
2✔
114
        "SleepApp",
115
        "Sleep App.",
116
        [dlg_batch_input("binary/*", [])],
117
        [dlg_batch_output("binary/*", [])],
118
        [dlg_streaming_input("binary/*")],
119
    )
120
    sleep_time = dlg_float_param("sleep_time", 0)
2✔
121

122
    def initialize(self, **kwargs):
2✔
123
        super(SleepApp, self).initialize(**kwargs)
2✔
124

125
    def run(self):
2✔
126
        self._run()
2✔
127
        try:
2✔
128
            # If data is coming through a named port we load it from there.
129
            if isinstance(self.sleep_time, (InMemoryDROP, FileDROP, DropProxy)):
2✔
130
                logger.debug("Trying to read from %s", self.sleep_time)
2✔
131
                self.sleep_time = drop_loaders.load_pickle(self.sleep_time)
2✔
132
            time.sleep(self.sleep_time)
2✔
133
        except (TypeError, ValueError):
×
134
            logger.debug(
×
135
                "Found invalid sleep_time: %s. Resetting to 0. %s",
136
                self.sleep_time,
137
                type(self.sleep_time),
138
            )
139
            self.sleep_time = 0
×
140
            time.sleep(self.sleep_time)
×
141
        logger.debug("%s slept for %s s", self.name, self.sleep_time)
2✔
142

143

144
##
145
# @brief CopyApp
146
# @details A simple APP that copies its inputs into its outputs.
147
# All inputs are copied into all outputs in the order they were declared in
148
# the graph. If an input is a container (e.g. a directory) it copies the
149
# content recursively.
150
# @par EAGLE_START
151
# @param category PythonApp
152
# @param tag daliuge
153
# @param bufsize 65536/Integer/ApplicationArgument/NoPort/ReadWrite//False/False/Buffer size
154
# @param dropclass dlg.apps.simple.CopyApp/String/ComponentParameter/NoPort/ReadOnly//False/False/Application class
155
# @param base_name simple/String/ComponentParameter/NoPort/ReadOnly//False/False/Base name of application class
156
# @param execution_time 5/Float/ConstraintParameter/NoPort/ReadOnly//False/False/Estimated execution time
157
# @param num_cpus 1/Integer/ConstraintParameter/NoPort/ReadOnly//False/False/Number of cores used
158
# @param group_start False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Is this node the start of a group?
159
# @param n_tries 1/Integer/ComponentParameter/NoPort/ReadWrite//False/False/Specifies the number of times the 'run' method will be executed before finally giving up
160
# @param dummy /Object/ApplicationArgument/InputOutput/ReadWrite//False/False/Dummy port
161
# @param input_parser pickle/Select/ComponentParameter/NoPort/ReadWrite/raw,pickle,eval,npy,path,dataurl/False/False/Input port parsing technique
162
# @param output_parser pickle/Select/ComponentParameter/NoPort/ReadWrite/raw,pickle,eval,npy,path,dataurl/False/False/Output port parsing technique
163
# @par EAGLE_END
164
class CopyApp(BarrierAppDROP):
2✔
165
    """
166
    A BarrierAppDrop that copies its inputs into its outputs.
167
    All inputs are copied into all outputs in the order they were declared in
168
    the graph.
169
    """
170

171
    bufsize = dlg_int_param("bufsize", 65536)
2✔
172

173
    component_meta = dlg_component(
2✔
174
        "CopyApp",
175
        "Copy App.",
176
        [dlg_batch_input("binary/*", [])],
177
        [dlg_batch_output("binary/*", [])],
178
        [dlg_streaming_input("binary/*")],
179
    )
180

181
    def run(self):
2✔
182
        logger.debug("Using buffer size %d", self.bufsize)
2✔
183
        logger.info(
2✔
184
            "Copying data from inputs %s to outputs %s",
185
            [x.name for x in self.inputs],
186
            [x.name for x in self.outputs],
187
        )
188
        self.copyAll()
2✔
189
        logger.info(
2✔
190
            "Copy finished",
191
        )
192

193
    def copyAll(self):
2✔
194
        for inputDrop in self.inputs:
2✔
195
            self.copyRecursive(inputDrop)
2✔
196

197
        # logger.debug("Target checksum: %d", outputDrop.checksum)
198

199
    def copyRecursive(self, inputDrop):
2✔
200
        if isinstance(inputDrop, ContainerDROP):
2✔
201
            for child in inputDrop.children:
×
202
                self.copyRecursive(child)
×
203
        else:
204
            for outputDrop in self.outputs:
2✔
205
                droputils.copyDropContents(inputDrop, outputDrop, bufsize=self.bufsize)
2✔
206

207

208
##
209
# @brief SleepAndCopyApp
210
# @par EAGLE_START
211
# @param category PythonApp
212
# @param tag daliuge
213
# @param sleep_time 5/Integer/ApplicationArgument/NoPort/ReadWrite//False/False/The number of seconds to sleep
214
# @param dropclass dlg.apps.simple.SleepAndCopyApp/String/ComponentParameter/NoPort/ReadOnly//False/False/Application class
215
# @param base_name simple/String/ComponentParameter/NoPort/ReadOnly//False/False/Base name of application class
216
# @param execution_time 5/Float/ConstraintParameter/NoPort/ReadOnly//False/False/Estimated execution time
217
# @param num_cpus 1/Integer/ConstraintParameter/NoPort/ReadOnly//False/False/Number of cores used
218
# @param group_start False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Is this node the start of a group?
219
# @param input_parser pickle/Select/ComponentParameter/NoPort/ReadWrite/raw,pickle,eval,npy,path,dataurl/False/False/Input port parsing technique
220
# @param output_parser pickle/Select/ComponentParameter/NoPort/ReadWrite/raw,pickle,eval,npy,path,dataurl/False/False/Output port parsing technique
221
# @par EAGLE_END
222
class SleepAndCopyApp(SleepApp, CopyApp):
2✔
223
    """A combination of the SleepApp and the CopyApp. It sleeps, then copies"""
224

225
    def run(self):
2✔
226
        SleepApp.run(self)
2✔
227
        CopyApp.run(self)
2✔
228

229

230
##
231
# @brief RandomArrayApp
232
# @details A testing APP that does not take any input and produces a random array of
233
# type int64, if integer is set to True, else of type float64.
234
# size indicates the number of elements ranging between the values low and high.
235
# The resulting array will be send to all connected output apps.
236
# @par EAGLE_START
237
# @param category PythonApp
238
# @param tag daliuge
239
# @param size 100/Integer/ApplicationArgument/NoPort/ReadWrite//False/False/The size of the array
240
# @param low 0/Float/ApplicationArgument/NoPort/ReadWrite//False/False/Low value of range in array [inclusive]
241
# @param high 1/Float/ApplicationArgument/NoPort/ReadWrite//False/False/High value of range of array [exclusive]
242
# @param integer True/Boolean/ApplicationArgument/NoPort/ReadWrite//False/False/Generate integer array?
243
# @param dropclass dlg.apps.simple.RandomArrayApp/String/ComponentParameter/NoPort/ReadOnly//False/False/Application class
244
# @param base_name simple/String/ComponentParameter/NoPort/ReadOnly//False/False/Base name of application class
245
# @param execution_time 5/Float/ConstraintParameter/NoPort/ReadOnly//False/False/Estimated execution time
246
# @param num_cpus 1/Integer/ConstraintParameter/NoPort/ReadOnly//False/False/Number of cores used
247
# @param group_start False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Is this node the start of a group?
248
# @param array /Object.Array/ApplicationArgument/OutputPort/ReadWrite//False/False/random array
249
# @param input_parser pickle/Select/ComponentParameter/NoPort/ReadWrite/raw,pickle,eval,npy,path,dataurl/False/False/Input port parsing technique
250
# @param output_parser pickle/Select/ComponentParameter/NoPort/ReadWrite/raw,pickle,eval,npy,path,dataurl/False/False/Output port parsing technique
251
# @par EAGLE_END
252
class RandomArrayApp(BarrierAppDROP):
2✔
253
    """
254
    A BarrierAppDrop that generates an array of random numbers. It does
255
    not require any inputs and writes the generated array to all of its
256
    outputs.
257

258
    Keywords:
259

260
    integer:  bool [True], generate integer array
261
    low:      float, lower boundary (will be converted to int for integer arrays)
262
    high:     float, upper boundary (will be converted to int for integer arrays)
263
    size:     int, number of array elements
264
    """
265

266
    component_meta = dlg_component(
2✔
267
        "RandomArrayApp",
268
        "Random Array App.",
269
        [dlg_batch_input("binary/*", [])],
270
        [dlg_batch_output("binary/*", [])],
271
        [dlg_streaming_input("binary/*")],
272
    )
273

274
    # default values
275
    integer = dlg_bool_param("integer", True)
2✔
276
    low = dlg_float_param("low", 0)
2✔
277
    high = dlg_float_param("high", 100)
2✔
278
    size = dlg_int_param("size", 100)
2✔
279
    marray = []
2✔
280

281
    def initialize(self, keep_array=False, **kwargs):
2✔
282
        super(RandomArrayApp, self).initialize(**kwargs)
2✔
283
        self._keep_array = keep_array
2✔
284

285
    def run(self):
2✔
286
        # At least one output should have been added
287
        outs = self.outputs
2✔
288
        if len(outs) < 1:
2✔
289
            raise Exception("At least one output should have been added to %r" % self)
×
290
        marray = self.generateRandomArray()
2✔
291
        if self._keep_array:
2✔
292
            self.marray = marray
2✔
293
        for o in outs:
2✔
294
            d = pickle.dumps(marray)
2✔
295
            o.len = len(d)
2✔
296
            o.write(d)
2✔
297

298
    def generateRandomArray(self):
2✔
299
        if self.integer:
2✔
300
            # generate an array of self.size integers with numbers between
301
            # slef.low and self.high
302
            marray = np.random.randint(int(self.low), int(self.high), size=(self.size))
2✔
303
        else:
304
            # generate an array of self.size floats with numbers between
305
            # self.low and self.high
306
            marray = (np.random.random(size=self.size) + self.low) * self.high
×
307
        return marray
2✔
308

309
    def _getArray(self):
2✔
310
        return self.marray
2✔
311

312

313
##
314
# @brief AverageArrays
315
# @details A testing APP that takes multiple numpy arrays on input and calculates
316
# the mean or the median, depending on the value provided in the method parameter.
317
# Users can add as many producers to the input array port as required and the resulting array
318
# will also be send to all connected output apps.
319
# @par EAGLE_START
320
# @param category PythonApp
321
# @param tag daliuge
322
# @param method mean/Select/ApplicationArgument/NoPort/ReadWrite/mean,median/False/False/The method used for averaging
323
# @param dropclass dlg.apps.simple.AverageArraysApp/String/ComponentParameter/NoPort/ReadOnly//False/False/Application class
324
# @param base_name simple/String/ComponentParameter/NoPort/ReadOnly//False/False/Base name of application class
325
# @param execution_time 5/Float/ConstraintParameter/NoPort/ReadOnly//False/False/Estimated execution time
326
# @param num_cpus 1/Integer/ConstraintParameter/NoPort/ReadOnly//False/False/Number of cores used
327
# @param group_start False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Is this node the start of a group?
328
# @param array /Object.Array/ApplicationArgument/InputOutput/ReadWrite//False/False/Port for the array(s)
329
# @param input_parser pickle/Select/ComponentParameter/NoPort/ReadWrite/raw,pickle,eval,npy,path,dataurl/False/False/Input port parsing technique
330
# @param output_parser pickle/Select/ComponentParameter/NoPort/ReadWrite/raw,pickle,eval,npy,path,dataurl/False/False/Output port parsing technique
331
# @par EAGLE_END
332
class AverageArraysApp(BarrierAppDROP):
2✔
333
    """
334
    A BarrierAppDrop that averages arrays received on input. It requires
335
    multiple inputs and writes the generated average vector to all of its
336
    outputs.
337
    The input arrays are assumed to have the same number of elements and
338
    the output array will also have that same number of elements.
339

340
    Keywords:
341

342
    method:  string <['mean']|'median'>, use mean or median as method.
343
    """
344

345
    from numpy import mean, median
2✔
346

347
    component_meta = dlg_component(
2✔
348
        "AverageArraysApp",
349
        "Average Array App.",
350
        [dlg_batch_input("binary/*", [])],
351
        [dlg_batch_output("binary/*", [])],
352
        [dlg_streaming_input("binary/*")],
353
    )
354

355
    # default values
356
    methods = ["mean", "median"]
2✔
357
    method = dlg_string_param("method", methods[0])
2✔
358

359
    def __init__(self, oid, uid, **kwargs):
2✔
360
        super().__init__(oid, kwargs)
2✔
361
        self.marray = []
2✔
362

363
    def initialize(self, **kwargs):
2✔
364
        super().initialize(**kwargs)
2✔
365

366
    def run(self):
2✔
367
        # At least one output should have been added
368

369
        outs = self.outputs
2✔
370
        if len(outs) < 1:
2✔
371
            raise Exception("At least one output should have been added to %r" % self)
×
372
        self.getInputArrays()
2✔
373
        self._avg = self.averageArray()
2✔
374
        for o in outs:
2✔
375
            d = pickle.dumps(self._avg)
2✔
376
            o.len = len(d)
2✔
377
            o.write(d)  # average across inputs
2✔
378

379
    def getInputArrays(self):
2✔
380
        """
381
        Create the input array from all inputs received. Shape is
382
        (<#inputs>, <#elements>), where #elements is the length of the
383
        vector received from one input.
384
        """
385
        ins = self.inputs
2✔
386
        if len(ins) < 1:
2✔
387
            raise Exception("At least one input should have been added to %r" % self)
×
388
        marray = []
2✔
389
        for inp in ins:
2✔
390
            sarray = droputils.allDropContents(inp)
2✔
391
            if len(sarray) == 0:
2✔
392
                print(f"Input does not contain data!")
×
393
            else:
394
                sarray = pickle.loads(sarray)
2✔
395
                if isinstance(sarray, (list, tuple, np.ndarray)):
2✔
396
                    marray.extend(list(sarray))
2✔
397
                else:
398
                    marray.append(sarray)
×
399
        self.marray = marray
2✔
400

401
    def averageArray(self):
2✔
402
        method_to_call = getattr(np, self.method)
2✔
403
        return method_to_call(self.marray, axis=0)
2✔
404

405

406
##
407
# @brief GenericGatherApp
408
# @details App that reads all its inputs and simply writes them in
409
# concatenated to all its outputs. This can be used stand-alone or
410
# as part of a Gather. It does not do anything to the data, just
411
# passing it on.
412
#
413
# @par EAGLE_START
414
# @param construct Gather
415
# @param category PythonApp
416
# @param tag daliuge
417
# @param num_of_inputs 4/Integer/ConstructParameter/NoPort/ReadWrite//False/False/The Gather width, stating how many inputs each Gather instance will handle
418
# @param dropclass dlg.apps.simple.GenericGatherApp/String/ComponentParameter/NoPort/ReadOnly//False/False/Application class
419
# @param base_name simple/String/ComponentParameter/NoPort/ReadOnly//False/False/Base name of application class
420
# @param execution_time 5/Float/ConstraintParameter/NoPort/ReadOnly//False/False/Estimated execution time
421
# @param num_cpus 1/Integer/ConstraintParameter/NoPort/ReadOnly//False/False/Number of cores used
422
# @param group_start False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Is this node the start of a group?
423
# @param input /Object/ApplicationArgument/InputPort/ReadWrite//False/False/0-base placeholder port for inputs
424
# @param output /Object/ApplicationArgument/OutputPort/ReadWrite//False/False/Placeholder port for outputs
425
# @param input_parser pickle/Select/ComponentParameter/NoPort/ReadWrite/raw,pickle,eval,npy,path,dataurl/False/False/Input port parsing technique
426
# @param output_parser pickle/Select/ComponentParameter/NoPort/ReadWrite/raw,pickle,eval,npy,path,dataurl/False/False/Output port parsing technique
427
# @par EAGLE_END
428
class GenericGatherApp(BarrierAppDROP):
2✔
429
    component_meta = dlg_component(
2✔
430
        "GenericGatherApp",
431
        "Gather multiple inputs",
432
        [dlg_batch_input("binary/*", [])],
433
        [dlg_batch_output("binary/*", [])],
434
        [dlg_streaming_input("binary/*")],
435
    )
436

437
    # automatically populated by scatter node
438
    num_of_inputs: int = dlg_int_param("num_of_inputs", 1)
2✔
439

440
    def initialize(self, **kwargs):
2✔
441
        super(GenericGatherApp, self).initialize(**kwargs)
2✔
442

443
    def readWriteData(self):
2✔
444
        inputs = self.inputs
2✔
445
        outputs = self.outputs
2✔
446
        total_len = 0
2✔
447
        for output in outputs:
2✔
448
            for input in inputs:
2✔
449
                value = droputils.allDropContents(input)
2✔
450
                output.write(value)
2✔
451

452
    def run(self):
2✔
453
        self.readWriteData()
2✔
454

455

456
##
457
# @brief DictGatherApp
458
# @details App packs all data on input into a dictionary using the input drop's names as keys and the reading the
459
# dict values from the input drops. This app can be used stand-alone without a gather construct.
460
# @par EAGLE_START
461
# @param category PythonApp
462
# @param tag daliuge
463
# @param value_dict value_dict/Jasom/ApplicationArgument/NoPort/ReadWrite//False/False/The value dictionary can be initialized here
464
# @param dropclass dlg.apps.simple.DictGatherApp/String/ComponentParameter/NoPort/ReadOnly//False/False/Application class
465
# @param base_name simple/String/ComponentParameter/NoPort/ReadOnly//False/False/Base name of application class
466
# @param execution_time 5/Float/ConstraintParameter/NoPort/ReadOnly//False/False/Estimated execution time
467
# @param num_cpus 1/Integer/ConstraintParameter/NoPort/ReadOnly//False/False/Number of cores used
468
# @param group_start False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Is this node the start of a group?
469
# @param input /Object/ApplicationArgument/InputPort/ReadWrite//False/False/0-base placeholder port for inputs
470
# @param output /Object/ApplicationArgument/OutputPort/ReadWrite//False/False/Placeholder port for outputs
471
# @param input_parser pickle/Select/ComponentParameter/NoPort/ReadWrite/raw,pickle,eval,npy,path,dataurl/False/False/Input port parsing technique
472
# @param output_parser pickle/Select/ComponentParameter/NoPort/ReadWrite/raw,pickle,eval,npy,path,dataurl/False/False/Output port parsing technique
473
# @par EAGLE_END
474
class DictGatherApp(BarrierAppDROP):
2✔
475
    component_meta = dlg_component(
2✔
476
        "DictGatherApp",
477
        "Collect multiple inputs into a dictionary",
478
        [dlg_batch_input("binary/*", [])],
479
        [dlg_batch_output("binary/*", [])],
480
        [dlg_streaming_input("binary/*")],
481
    )
482
    value_dict = dlg_dict_param("value_dict", {})
2✔
483

484
    def initialize(self, **kwargs):
2✔
485
        super(DictGatherApp, self).initialize(**kwargs)
×
486
        self.kwargs = kwargs
×
487

488
    def readWriteData(self):
2✔
489
        inputs = self.inputs
×
490
        outputs = self.outputs
×
491
        total_len = 0
×
492
        # for input in inputs:
493
        #     total_len += input.size
494
        # logger.debug(f">>>> writing {inputs} to {outputs}")
495
        for output in outputs:
×
496
            for input in inputs:
×
497
                value = droputils.allDropContents(input)
×
498
                self.value_dict[input.name] = pickle.loads(value)
×
499
                for aa_key, aa_dict in self.kwargs["applicationArgs"].items():
×
500
                    if aa_key not in self.value_dict and aa_dict["value"]:
×
501
                        self.value_dict[aa_key] = aa_dict["value"]
×
502
            logger.debug(
×
503
                "Writing %s to %s",
504
                self.value_dict,
505
                output.name,
506
            )
507
            output.write(pickle.dumps(self.value_dict))
×
508

509
            # logger.debug(f">>> written {d} to {output}")
510

511

512
##
513
# @brief ArrayGatherApp
514
# @details App appends all input daata to a list. This app can be used stand-alone without a gather construct.
515
# @par EAGLE_START
516
# @param category PythonApp
517
# @param tag daliuge
518
# @param value_array value_array/array/ApplicationArgument/OutputPort/ReadWrite//False/False/The value array can be initialized here
519
# @param dropclass dlg.apps.simple.ArrayGatherApp/String/ComponentParameter/NoPort/ReadOnly//False/False/Application class
520
# @param base_name simple/String/ComponentParameter/NoPort/ReadOnly//False/False/Base name of application class
521
# @param execution_time 5/Float/ConstraintParameter/NoPort/ReadOnly//False/False/Estimated execution time
522
# @param num_cpus 1/Integer/ConstraintParameter/NoPort/ReadOnly//False/False/Number of cores used
523
# @param group_start False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Is this node the start of a group?
524
# @param input /Object/ComponentParameter/InputPort/ReadWrite//False/False/0-base placeholder port for inputs
525
# @param input_parser pickle/Select/ComponentParameter/NoPort/ReadWrite/raw,pickle,eval,npy,path,dataurl/False/False/Input port parsing technique
526
# @param output_parser pickle/Select/ComponentParameter/NoPort/ReadWrite/raw,pickle,eval,npy,path,dataurl/False/False/Output port parsing technique
527
# @par EAGLE_END
528
class ArrayGatherApp(BarrierAppDROP):
2✔
529
    component_meta = dlg_component(
2✔
530
        "ArrayGatherApp",
531
        "Collect multiple inputs into an array",
532
        [dlg_batch_input("binary/*", [])],
533
        [dlg_batch_output("binary/*", [])],
534
        [dlg_streaming_input("binary/*")],
535
    )
536
    value_list = dlg_list_param("value_list", [])
2✔
537

538
    def initialize(self, **kwargs):
2✔
NEW
539
        super(ArrayGatherApp, self).initialize(**kwargs)
×
NEW
540
        self.kwargs = kwargs
×
541

542
    def readWriteData(self):
2✔
NEW
543
        inputs = self.inputs
×
NEW
544
        outputs = self.outputs
×
NEW
545
        total_len = 0
×
NEW
546
        for output in outputs:
×
NEW
547
            for input in inputs:
×
NEW
548
                value = droputils.allDropContents(input)
×
NEW
549
                self.value_list.append(pickle.loads(value))
×
NEW
550
            output.write(pickle.dumps(self.value_list))
×
551

552
    def run(self):
2✔
553
        self.readWriteData()
×
554

555

556
##
557
# @brief GenericNpyGatherApp
558
# @details A BarrierAppDrop that combines one or more inputs using cumulative operations.
559
# @par EAGLE_START
560
# @param category PythonApp
561
# @param construct Gather
562
# @param tag daliuge
563
# @param num_of_inputs 4/Integer/ConstructParameter/NoPort/ReadWrite//False/False/The Gather width, stating how many inputs each Gather instance will handle
564
# @param function sum/Select/ApplicationArgument/NoPort/ReadWrite/sum,prod,min,max,add,multiply,maximum,minimum/False/False/The function used for gathering
565
# @param reduce_axes None/String/ApplicationArgument/NoPort/ReadOnly//False/False/The array axes to reduce, None reduces all axes for sum, prod, max, min functions
566
# @param dropclass dlg.apps.simple.GenericNpyGatherApp/String/ComponentParameter/NoPort/ReadOnly//False/False/Application class
567
# @param base_name simple/String/ComponentParameter/NoPort/ReadOnly//False/False/Base name of application class
568
# @param execution_time 5/Float/ConstraintParameter/NoPort/ReadOnly//False/False/Estimated execution time
569
# @param num_cpus 1/Integer/ConstraintParameter/NoPort/ReadOnly//False/False/Number of cores used
570
# @param group_start False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Is this node the start of a group?
571
# @param array_in /Object.Array/ApplicationArgument/InputPort/ReadWrite//False/False/Port for the input array(s)
572
# @param array_out /Object.Array/ApplicationArgument/OutputPort/ReadWrite//False/False/reduced array
573
# @param input_parser pickle/Select/ComponentParameter/NoPort/ReadWrite/raw,pickle,eval,npy,path,dataurl/False/False/Input port parsing technique
574
# @param output_parser pickle/Select/ComponentParameter/NoPort/ReadWrite/raw,pickle,eval,npy,path,dataurl/False/False/Output port parsing technique
575
# @par EAGLE_END
576
class GenericNpyGatherApp(BarrierAppDROP):
2✔
577
    """
578
    A BarrierAppDrop that reduces then gathers one or more inputs using cumulative operations.
579
    function:  string <'sum'|'prod'|'min'|'max'|'add'|'multiply'|'maximum'|'minimum'>.
580

581
    """
582

583
    component_meta = dlg_component(
2✔
584
        "GenericNpyGatherApp",
585
        "Generic Npy Gather App.",
586
        [dlg_batch_input("binary/*", [])],
587
        [dlg_batch_output("binary/*", [])],
588
        [dlg_streaming_input("binary/*")],
589
    )
590

591
    # reduce and combine operation pair names
592
    # reduce operation reduces the dimensionality of a ndarray
593
    # gather operation combines ndarrays and retains dimensionality
594
    functions = {
2✔
595
        # reduce and gather (output dimension is reduced)
596
        "sum": "add",  # sum reduction of inputs along an axis first then gathers across drops
597
        "prod": "multiply",  # prod reduction of inputs along an axis first then gathers across drops
598
        "max": "maximum",  # max reduction of input along an axis first then gathers across drops
599
        "min": "minimum",  # min reduction of input along an axis first then gathers across drops
600
        # gather only
601
        "add": None,  # elementwise addition of inputs, ndarrays must be of same shape
602
        "multiply": None,  # elementwise multiplication of inputs, ndarrays must be of same shape
603
        "maximum": None,  # elementwise maximums of inputs, ndarrays must be of same shape
604
        "minimum": None,  # elementwise minimums of inputs, ndarrays must be of same shape
605
    }
606
    function: str = dlg_string_param("function", "sum")  # type: ignore
2✔
607
    reduce_axes: list = dlg_list_param("reduce_axes", "None")  # type: ignore
2✔
608

609
    def run(self):
2✔
610
        if len(self.inputs) < 1:
×
611
            raise Exception(f"At least one input should have been added to {self}")
×
612
        if len(self.outputs) < 1:
×
613
            raise Exception(f"At least one output should have been added to {self}")
×
614
        if self.function not in self.functions:
×
615
            raise Exception(f"Function {self.function} not supported by {self}")
×
616

617
        result = (
×
618
            self.reduce_gather_inputs()
619
            if self.functions[self.function] is not None
620
            else self.gather_inputs()
621
        )
622

623
        for o in self.outputs:
×
624
            drop_loaders.save_numpy(o, result)
×
625

626
    def reduce_gather_inputs(self):
2✔
627
        """reduces then gathers each input drop interpreted as an npy drop"""
628
        result: Optional[Number] = None
×
629
        reduce = getattr(np, f"{self.function}")
×
630
        gather = getattr(np, f"{self.functions[self.function]}")
×
631
        for input in self.inputs:
×
632
            data = drop_loaders.load_numpy(input)
×
633
            # skip gather for the first input
634
            result = (
×
635
                # reduce(data, axis=self.reduce_axes, allow_pickle=True)
636
                reduce(data, axis=self.reduce_axes)
637
                if result is None
638
                else gather(
639
                    result,
640
                    # reduce(data, axis=self.reduce_axes, allow_pickle=True),
641
                    reduce(data, axis=self.reduce_axes),
642
                )
643
            )
644
        return result
×
645

646
    def gather_inputs(self):
2✔
647
        """gathers each input drop interpreted as an npy drop"""
648
        result: Optional[Number] = None
×
649
        gather = getattr(np, f"{self.function}")
×
650
        for input in self.inputs:
×
651
            data = drop_loaders.load_numpy(input)
×
652
            # assign instead of gather for the first input
653
            # result = data if result is None else gather(result, data, allow_pickle=True)
654
            result = data if result is None else gather(result, data)
×
655
        return result
×
656

657

658
##
659
# @brief HelloWorldApp
660
# @details A simple APP that implements the standard Hello World in DALiuGE.
661
# It allows to change 'World' with some other string and it also permits
662
# to connect the single output port to multiple sinks, which will all receive
663
# the same message. App does not require any input.
664
# @par EAGLE_START
665
# @param category PythonApp
666
# @param tag daliuge
667
# @param greet World/String/ApplicationArgument/NoPort/ReadWrite//False/False/What appears after 'Hello '
668
# @param dropclass dlg.apps.simple.HelloWorldApp/String/ComponentParameter/NoPort/ReadOnly//False/False/Application class
669
# @param base_name simple/String/ComponentParameter/NoPort/ReadOnly//False/False/Base name of application class
670
# @param execution_time 5/Float/ConstraintParameter/NoPort/ReadOnly//False/False/Estimated execution time
671
# @param num_cpus 1/Integer/ConstraintParameter/NoPort/ReadOnly//False/False/Number of cores used
672
# @param group_start False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Is this node the start of a group?
673
# @param hello "world"/Object/ApplicationArgument/OutputPort/ReadWrite//False/False/message
674
# @param input_parser pickle/Select/ComponentParameter/NoPort/ReadWrite/raw,pickle,eval,npy,path,dataurl/False/False/Input port parsing technique
675
# @param output_parser pickle/Select/ComponentParameter/NoPort/ReadWrite/raw,pickle,eval,npy,path,dataurl/False/False/Output port parsing technique
676
# @par EAGLE_END
677
class HelloWorldApp(BarrierAppDROP):
2✔
678
    """
679
    An App that writes 'Hello World!' or 'Hello <greet>!' to all of
680
    its outputs.
681

682
    Keywords:
683
    greet:   string, [World], whom to greet.
684
    """
685

686
    component_meta = dlg_component(
2✔
687
        "HelloWorldApp",
688
        "Hello World App.",
689
        [dlg_batch_input("binary/*", [])],
690
        [dlg_batch_output("binary/*", [])],
691
        [dlg_streaming_input("binary/*")],
692
    )
693

694
    greet = dlg_string_param("greet", "World")
2✔
695

696
    def run(self):
2✔
697
        ins = self.inputs
2✔
698
        # if no inputs use the parameter else use the input
699
        if len(ins) == 0:
2✔
700
            self.greeting = "Hello %s" % self.greet
2✔
701
        elif len(ins) != 1:
2✔
702
            raise Exception("Only one input expected for %r" % self)
×
703
        else:  # the input is expected to be a vector. We'll use the first element
704
            try:
2✔
705
                phrase = str(pickle.loads(droputils.allDropContents(ins[0]))[0])
2✔
706
            except _pickle.UnpicklingError:
×
707
                phrase = str(droputils.allDropContents(ins[0]), encoding="utf-8")
×
708
            self.greeting = f"Hello {phrase}"
2✔
709

710
        outs = self.outputs
2✔
711
        if len(outs) < 1:
2✔
712
            raise Exception("At least one output should have been added to %r" % self)
×
713
        for o in outs:
2✔
714
            o.len = len(self.greeting.encode())
2✔
715
            o.write(self.greeting.encode())  # greet across all outputs
2✔
716

717

718
##
719
# @brief UrlRetrieveApp
720
# @details A simple APP that retrieves the content of a URL and writes
721
# it to all outputs.
722
# @par EAGLE_START
723
# @param category PythonApp
724
# @param tag daliuge
725
# @param url None/String/ApplicationArgument/NoPort/ReadWrite//False/False/The URL to retrieve
726
# @param dropclass dlg.apps.simple.UrlRetrieveApp/String/ComponentParameter/NoPort/ReadOnly//False/False/Application class
727
# @param base_name simple/String/ComponentParameter/NoPort/ReadOnly//False/False/Base name of application class
728
# @param execution_time 5/Float/ConstraintParameter/NoPort/ReadOnly//False/False/Estimated execution time
729
# @param num_cpus 1/Integer/ConstraintParameter/NoPort/ReadOnly//False/False/Number of cores used
730
# @param group_start False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Is this node the start of a group?
731
# @param content /String/ApplicationArgument/OutputPort/ReadWrite//False/False/content read from URL
732
# @param input_parser pickle/Select/ComponentParameter/NoPort/ReadWrite/raw,pickle,eval,npy,path,dataurl/False/False/Input port parsing technique
733
# @param output_parser pickle/Select/ComponentParameter/NoPort/ReadWrite/raw,pickle,eval,npy,path,dataurl/False/False/Output port parsing technique
734
# @par EAGLE_END
735
class UrlRetrieveApp(BarrierAppDROP):
2✔
736
    """
737
    An App that retrieves the content of a URL
738

739
    Keywords:
740
    URL:   string, URL to retrieve.
741
    """
742

743
    component_meta = dlg_component(
2✔
744
        "UrlRetrieveApp",
745
        "URL Retrieve App",
746
        [dlg_batch_input("binary/*", [])],
747
        [dlg_batch_output("binary/*", [])],
748
        [dlg_streaming_input("binary/*")],
749
    )
750

751
    url = dlg_string_param("url", "")
2✔
752

753
    def run(self):
2✔
754
        try:
×
755
            logger.info("Accessing URL %s", self.url)
×
756
            u = requests.get(self.url)
×
757
        except requests.exceptions.RequestException as e:
×
758
            raise e.reason
×
759

760
        outs = self.outputs
×
761
        if len(outs) < 1:
×
762
            raise Exception("At least one output should have been added to %r" % self)
×
763
        for o in outs:
×
764
            o.len = len(u.content)
×
765
            o.write(u.content)  # send content to all outputs
×
766

767

768
##
769
# @brief GenericScatterApp
770
# @details An APP that splits about any object that can be converted to a numpy array
771
# into as many parts as the app has outputs, provided that the initially converted numpy
772
# array has enough elements. The return will be a numpy array of arrays, where the first
773
# axis is of length len(outputs). The modulo remainder of the length of the original array and
774
# the number of outputs will be distributed across the first len(outputs)-1 elements of the
775
# resulting array.
776
# @par EAGLE_START
777
# @param category PythonApp
778
# @param construct Scatter
779
# @param tag daliuge
780
# @param num_of_copies 4/Integer/ConstructParameter/NoPort/ReadWrite//False/False/Specifies the number of replications of the content of the scatter construct
781
# @param array_in /Object.Array/ApplicationArgument/InputPort/ReadWrite//False/False/A numpy array of arrays, where the first axis is of length <numSplit>
782
# @param object_out /Object/ApplicationArgument/OutputPort/ReadWrite//False/False/reduced array or single element, depending on element flag.
783
# @param element False/Boolean/ApplicationArgument/NoPort/ReadWrite//False/False/if True the outputs of each of the splits will be the first element of the split array, rather than the split array.
784
# @param group_start False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Is this node the start of a group?
785
# @param n_tries 1/Integer/ComponentParameter/NoPort/ReadWrite//False/False/Specifies the number of times the 'run' method will be executed before finally giving up
786
# @param dropclass dlg.apps.simple.GenericScatterApp/String/ComponentParameter/NoPort/ReadOnly//False/False/Application class
787
# @param base_name simple/String/ComponentParameter/NoPort/ReadOnly//False/False/Base name of application class
788
# @param execution_time 5/Float/ConstraintParameter/NoPort/ReadOnly//False/False/Estimated execution time
789
# @param num_cpus 1/Integer/ConstraintParameter/NoPort/ReadOnly//False/False/Number of cores used
790
# @param input_parser pickle/Select/ComponentParameter/NoPort/ReadWrite/raw,pickle,eval,npy,path,dataurl/False/False/Input port parsing technique
791
# @param output_parser pickle/Select/ComponentParameter/NoPort/ReadWrite/raw,pickle,eval,npy,path,dataurl/False/False/Output port parsing technique
792
# @par EAGLE_END
793
class GenericScatterApp(BarrierAppDROP):
2✔
794
    """
795
    An APP that splits an object that has a len attribute into <numSplit> parts and
796
    returns a numpy array of arrays, where the first axis is of length <numSplit>.
797
    """
798

799
    component_meta = dlg_component(
2✔
800
        "GenericScatterApp",
801
        "Scatter an array like object into numSplit parts",
802
        [dlg_batch_input("binary/*", [])],
803
        [dlg_batch_output("binary/*", [])],
804
        [dlg_streaming_input("binary/*")],
805
    )
806

807
    # automatically populated by scatter node
808
    num_of_copies: int = dlg_int_param("num_of_copies", 1)
2✔
809
    element: bool = dlg_bool_param("element", False)
2✔
810

811
    def initialize(self, **kwargs):
2✔
812
        super(GenericScatterApp, self).initialize(**kwargs)
2✔
813

814
    def run(self):
2✔
815
        numSplit = self.num_of_copies
2✔
816
        cont = droputils.allDropContents(self.inputs[0])
2✔
817
        # if the data is of type string it is not pickled, but stored as a binary string.
818
        try:
2✔
819
            inpArray = pickle.loads(cont)
2✔
820
        except:
×
821
            inpArray = cont.decode()
×
822
        try:  # just checking whether the object is some object that can be used as an array
2✔
823
            nObj = np.array(inpArray)
2✔
824
        except:
×
825
            raise
×
826
        try:
2✔
827
            result = np.array_split(nObj, numSplit)
2✔
828
        except IndexError as err:
×
829
            raise err
×
830
        for i in range(numSplit):
2✔
831
            o = self.outputs[i]
2✔
832
            if not self.element:
2✔
833
                d = pickle.dumps(result[i])
2✔
834
            else:
NEW
835
                d = pickle.dumps(result[i][0])
×
836
            o.len = len(d)
2✔
837
            o.write(d)  # average across inputs
2✔
838

839

840
##
841
# @brief GenericNpyScatterApp
842
# @details An APP that splits about any axis on any npy format data drop
843
# into as many part./run    s as the app has outputs, provided that the initially converted numpy
844
# array has enough elements. The return will be a numpy array of arrays, where the first
845
# axis is of length len(outputs). The modulo remainder of the length of the original array and
846
# the number of outputs will be distributed across the first len(outputs)-1 elements of the
847
# resulting array.
848
# @par EAGLE_START
849
# @param construct Scatter
850
# @param category PythonApp
851
# @param tag daliuge
852
# @param num_of_copies 4/Integer/ConstructParameter/NoPort/ReadWrite//False/False/Specifies the number of replications of the content of the scatter construct
853
# @param scatter_axes /String/ApplicationArgument/NoPort/ReadWrite//False/False/The axes to split input ndarrays on, e.g. [0,0,0], length must match the number of input ports
854
# @param group_start False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Is this node the start of a group?
855
# @param dropclass dlg.apps.simple.GenericNpyScatterApp/String/ComponentParameter/NoPort/ReadOnly//False/False/Application class
856
# @param base_name simple/String/ComponentParameter/NoPort/ReadOnly//False/False/Base name of application class
857
# @param execution_time 5/Float/ConstraintParameter/NoPort/ReadOnly//False/False/Estimated execution time
858
# @param num_cpus 1/Integer/ConstraintParameter/NoPort/ReadOnly//False/False/Number of cores used
859
# @param array_in /Object.Array/ApplicationArgument/InputPort/ReadWrite//False/False/A numpy array of arrays
860
# @param array_out /Object.Array/ApplicationArgument/OutputPort/ReadWrite//False/False/reduced array
861
# @param input_parser pickle/Select/ComponentParameter/NoPort/ReadWrite/raw,pickle,eval,npy,path,dataurl/False/False/Input port parsing technique
862
# @param output_parser pickle/Select/ComponentParameter/NoPort/ReadWrite/raw,pickle,eval,npy,path,dataurl/False/False/Output port parsing technique
863
# @par EAGLE_END
864
class GenericNpyScatterApp(BarrierAppDROP):
2✔
865
    """
866
    An APP that splits an object that has a len attribute into <num_of_copies> parts and
867
    returns a numpy array of arrays.
868
    """
869

870
    component_meta = dlg_component(
2✔
871
        "GenericNpyScatterApp",
872
        "Scatter an array like object into <num_of_copies> parts",
873
        [dlg_batch_input("binary/*", [])],
874
        [dlg_batch_output("binary/*", [])],
875
        [dlg_streaming_input("binary/*")],
876
    )
877

878
    # automatically populated by scatter node
879
    num_of_copies: int = dlg_int_param("num_of_copies", 1)
2✔
880
    scatter_axes: List[int] = dlg_list_param("scatter_axes", "[0]")
2✔
881

882
    def run(self):
2✔
883
        if len(self.inputs) * self.num_of_copies != len(self.outputs):
2✔
884
            raise DaliugeException(
×
885
                f"expected {len(self.inputs) * self.num_of_copies} outputs,\
886
                 got {len(self.outputs)}"
887
            )
888
        if len(self.inputs) != len(self.scatter_axes):
2✔
889
            raise DaliugeException(
×
890
                f"expected {len(self.inputs)} axes,\
891
                 got {len(self.scatter_axes)}, {self.scatter_axes}"
892
            )
893

894
        # split it as many times as we have outputs
895
        self.num_of_copies = self.num_of_copies
2✔
896

897
        for in_index in range(len(self.inputs)):
2✔
898
            nObj = drop_loaders.load_numpy(self.inputs[in_index])
2✔
899
            try:
2✔
900
                result = np.array_split(
2✔
901
                    nObj, self.num_of_copies, axis=self.scatter_axes[in_index]
902
                )
903
            except IndexError as err:
×
904
                raise err
×
905
            for split_index in range(self.num_of_copies):
2✔
906
                out_index = in_index * self.num_of_copies + split_index
2✔
907
                drop_loaders.save_numpy(self.outputs[out_index], result[split_index])
2✔
908

909

910
class SimpleBranch(BranchAppDrop, NullBarrierApp):
2✔
911
    """Simple branch app that is told the result of its condition"""
912

913
    def initialize(self, **kwargs):
2✔
914
        self.result = self._popArg(kwargs, "result", True)
2✔
915
        BranchAppDrop.initialize(self, **kwargs)
2✔
916

917
    def run(self):
2✔
918
        pass
2✔
919

920
    def condition(self):
2✔
921
        return self.result
2✔
922

923

924
##
925
# @brief PickOne
926
# @details App that picks the first element of an input list, passes that
927
# to all outputs, except the first one. The first output is used to pass
928
# the remaining array on. This app is useful for a loop.
929
#
930
# @par EAGLE_START
931
# @param category PythonApp
932
# @param tag daliuge
933
# @param dropclass dlg.apps.simple.PickOne/String/ComponentParameter/NoPort/ReadOnly//False/False/Application class
934
# @param base_name simple/String/ComponentParameter/NoPort/ReadOnly//False/False/Base name of application class
935
# @param execution_time 5/Float/ConstraintParameter/NoPort/ReadOnly//False/False/Estimated execution time
936
# @param num_cpus 1/Integer/ConstraintParameter/NoPort/ReadOnly//False/False/Number of cores used
937
# @param group_start False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Is this node the start of a group?
938
# @param rest_array /Object.array/ApplicationArgument/InputOutput/ReadWrite//False/False/List of elements
939
# @param element /Object.element/ApplicationArgument/OutputPort/ReadWrite//False/False/first element
940
# @param input_parser pickle/Select/ComponentParameter/NoPort/ReadWrite/raw,pickle,eval,npy,path,dataurl/False/False/Input port parsing technique
941
# @param output_parser pickle/Select/ComponentParameter/NoPort/ReadWrite/raw,pickle,eval,npy,path,dataurl/False/False/Output port parsing technique
942
# @par EAGLE_END
943
class PickOne(BarrierAppDROP):
2✔
944
    """
945
    Simple app picking one element at a time. Good for Loops.
946
    """
947

948
    def initialize(self, **kwargs):
2✔
949
        BarrierAppDROP.initialize(self, **kwargs)
×
950

951
    def readData(self):
2✔
952
        input = self.inputs[0]
×
953
        data = pickle.loads(droputils.allDropContents(input))
×
954

955
        # make sure we always have a ndarray with at least 1dim.
956
        if type(data) not in (list, tuple) and not isinstance(data, (np.ndarray)):
×
957
            raise TypeError
×
958
        if isinstance(data, np.ndarray) and data.ndim == 0:
×
959
            data = np.array([data])
×
960
        else:
961
            data = np.array(data)
×
962
        value = data[0] if len(data) else None
×
963
        rest = data[1:] if len(data) > 1 else np.array([])
×
964
        return value, rest
×
965

966
    def writeData(self, value, rest):
2✔
967
        """
968
        Prepare the data and write to all outputs
969
        """
970
        # write rest to array output
971
        # and value to every other output
972
        for output in self.outputs:
×
973
            if output.name == "rest_array":
×
974
                d = pickle.dumps(rest)
×
975
                output.len = len(d)
×
976
            else:
977
                d = pickle.dumps(value)
×
978
                output.len = len(d)
×
979
            output.write(d)
×
980

981
    def run(self):
2✔
982
        value, rest = self.readData()
×
983
        self.writeData(value, rest)
×
984

985

986
##
987
# @brief ListAppendThrashingApp
988
# @details A testing APP that appends a random integer to a list num times.
989
# This is a CPU intensive operation and can thus be used to provide a test for application threading
990
# since this operation will not yield.
991
# The resulting array will be sent to all connected output apps.
992
# @par EAGLE_START
993
# @param category PythonApp
994
# @param tag test
995
# @param size 100/Integer/ApplicationArgument/NoPort/ReadWrite//False/False/the size of the array
996
# @param dropclass dlg.apps.simple.ListAppendThrashingApp/String/ComponentParameter/NoPort/ReadOnly//False/False/Application class
997
# @param base_name simple/String/ComponentParameter/NoPort/ReadOnly//False/False/Base name of application class
998
# @param execution_time 5/Float/ConstraintParameter/NoPort/ReadOnly//False/False/Estimated execution time
999
# @param num_cpus 1/Integer/ConstraintParameter/NoPort/ReadOnly//False/False/Number of cores used
1000
# @param group_start False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Is this node the start of a group?
1001
# @param array /Object.Array/ApplicationArgument/OutputPort/ReadWrite//False/False/random array
1002
# @param input_parser pickle/Select/ComponentParameter/NoPort/ReadWrite/raw,pickle,eval,npy,path,dataurl/False/False/Input port parsing technique
1003
# @param output_parser pickle/Select/ComponentParameter/NoPort/ReadWrite/raw,pickle,eval,npy,path,dataurl/False/False/Output port parsing technique
1004
# @par EAGLE_END
1005
class ListAppendThrashingApp(BarrierAppDROP):
2✔
1006
    """
1007
    A BarrierAppDrop that appends random integers to a list N times. It does
1008
    not require any inputs and writes the generated array to all of its
1009
    outputs.
1010

1011
    Keywords:
1012

1013
    size:     int, number of array elements
1014
    """
1015

1016
    compontent_meta = dlg_component(
2✔
1017
        "ListAppendThrashingApp",
1018
        "List Append Thrashing",
1019
        [dlg_batch_input("binary/*", [])],
1020
        [dlg_batch_output("binary/*", [])],
1021
        [dlg_streaming_input("binary/*")],
1022
    )
1023

1024
    def initialize(self, **kwargs):
2✔
1025
        self.size = self._popArg(kwargs, "size", 100)
2✔
1026
        self.marray = []
2✔
1027
        super(ListAppendThrashingApp, self).initialize(**kwargs)
2✔
1028

1029
    def run(self):
2✔
1030
        # At least one output should have been added
1031
        outs = self.outputs
2✔
1032
        if len(outs) < 1:
2✔
1033
            raise Exception("At least one output should have been added to %r" % self)
×
1034
        self.marray = self.generateArray()
2✔
1035
        for o in outs:
2✔
1036
            d = pickle.dumps(self.marray)
2✔
1037
            o.len = len(d)
2✔
1038
            o.write(pickle.dumps(self.marray))
2✔
1039

1040
    def generateArray(self):
2✔
1041
        # This operation is wasteful to simulate an N^2 operation.
1042
        marray = []
2✔
1043
        for _ in range(int(self.size)):
2✔
1044
            marray = []
2✔
1045
            for i in range(int(self.size)):
2✔
1046
                marray.append(random.random())
2✔
1047
        return marray
2✔
1048

1049
    def _getArray(self):
2✔
1050
        return self.marray
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc