• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ICRAR / daliuge / 4390894983

pending completion
4390894983

push

github

Andreas Wicenec
added NullDROP

1 of 1 new or added line in 1 file covered. (100.0%)

1 existing line in 1 file now uncovered.

15425 of 18837 relevant lines covered (81.89%)

1.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.49
/daliuge-engine/test/manager/test_dim.py
1
#
2
#    ICRAR - International Centre for Radio Astronomy Research
3
#    (c) UWA - The University of Western Australia, 2015
4
#    Copyright by UWA (in the framework of the ICRAR)
5
#    All rights reserved
6
#
7
#    This library is free software; you can redistribute it and/or
8
#    modify it under the terms of the GNU Lesser General Public
9
#    License as published by the Free Software Foundation; either
10
#    version 2.1 of the License, or (at your option) any later version.
11
#
12
#    This library is distributed in the hope that it will be useful,
13
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
14
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15
#    Lesser General Public License for more details.
16
#
17
#    You should have received a copy of the GNU Lesser General Public
18
#    License along with this library; if not, write to the Free Software
19
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston,
20
#    MA 02111-1307  USA
21
#
22
import codecs
2✔
23
import json
2✔
24
import os
2✔
25
import time
2✔
26
import unittest
2✔
27
from asyncio.log import logger
2✔
28

29
import pkg_resources
2✔
30

31
from dlg import droputils
2✔
32
from dlg import utils
2✔
33
from dlg.common import tool, Categories
2✔
34
from dlg.ddap_protocol import DROPStates
2✔
35
from dlg.manager.composite_manager import DataIslandManager
2✔
36
from dlg.manager.session import SessionStates
2✔
37
from dlg.testutils import ManagerStarter
2✔
38
from test.manager import testutils
2✔
39

40
hostname = "localhost"
2✔
41

42
default_repro = {
2✔
43
    "rmode": "1",
44
    "RERUN": {
45
        "lg_blockhash": "x",
46
        "pgt_blockhash": "y",
47
        "pg_blockhash": "z",
48
    }
49
}
50
default_graph_repro = {
2✔
51
    "rmode": "1",
52
    "meta_data": {"repro_protocol": 0.1, "hashing_alg": "_sha3.sha3_256"},
53
    "merkleroot": "a",
54
    "RERUN": {
55
        "signature": "b",
56
    }
57
}
58

59

60
def add_test_reprodata(graph: list):
2✔
61
    for drop in graph:
2✔
62
        drop["reprodata"] = default_repro.copy()
2✔
63
    graph.append(default_graph_repro.copy())
2✔
64
    return graph
2✔
65

66

67
class LocalDimStarter(ManagerStarter):
2✔
68
    def setUp(self):
2✔
69
        super(LocalDimStarter, self).setUp()
2✔
70
        self.nm_info = self.start_nm_in_thread()
2✔
71
        self.dm = self.nm_info.manager
2✔
72
        self.dim = DataIslandManager([hostname])
2✔
73

74
    def tearDown(self):
2✔
75
        self.nm_info.stop()
2✔
76
        self.dim.shutdown()
2✔
77
        super(LocalDimStarter, self).tearDown()
2✔
78

79

80
class TestDIM(LocalDimStarter, unittest.TestCase):
2✔
81
    def createSessionAndAddTypicalGraph(self, sessionId, sleepTime=0):
2✔
82
        graphSpec = [
2✔
83
            {
84
                "oid": "A",
85
                "type": "data",
86
                "storage": Categories.MEMORY,
87
                "node": hostname,
88
                "consumers": ["B"],
89
            },
90
            {
91
                "oid": "B",
92
                "type": "app",
93
                "app": "dlg.apps.simple.SleepAndCopyApp",
94
                "sleepTime": sleepTime,
95
                "outputs": ["C"],
96
                "node": hostname,
97
            },
98
            {
99
                "oid": "C",
100
                "type": "data",
101
                "storage": Categories.MEMORY,
102
                "node": hostname,
103
            },
104
        ]
105
        graphSpec = add_test_reprodata(graphSpec)
2✔
106
        self.dim.createSession(sessionId)
2✔
107
        self.assertEqual(0, self.dim.getGraphSize(sessionId))
2✔
108
        self.dim.addGraphSpec(sessionId, graphSpec)
2✔
109
        self.assertEqual(len(graphSpec), self.dim.getGraphSize(sessionId))
2✔
110

111
    def test_createSession(self):
2✔
112
        sessionId = "lalo"
2✔
113
        self.dim.createSession(sessionId)
2✔
114
        self.assertEqual(1, len(self.dm.getSessionIds()))
2✔
115
        self.assertEqual(sessionId, self.dm.getSessionIds()[0])
2✔
116

117
    def test_addGraphSpec(self):
2✔
118

119
        sessionId = "lalo"
2✔
120

121
        # No node specified
122
        graphSpec = [{"oid": "A", "type": "data", "storage": Categories.MEMORY}]
2✔
123
        self.assertRaises(Exception, self.dim.addGraphSpec, sessionId, graphSpec)
2✔
124

125
        # Wrong node specified
126
        graphSpec = [
2✔
127
            {
128
                "oid": "A",
129
                "type": "data",
130
                "storage": Categories.MEMORY,
131
                "node": "unknown_host",
132
            }
133
        ]
134
        graphSpec = add_test_reprodata(graphSpec)
2✔
135
        self.assertRaises(Exception, self.dim.addGraphSpec, sessionId, graphSpec)
2✔
136

137
        # OK
138
        graphSpec = [
2✔
139
            {
140
                "oid": "A",
141
                "type": "data",
142
                "storage": Categories.MEMORY,
143
                "node": hostname,
144
            }
145
        ]
146
        graphSpec = add_test_reprodata(graphSpec)
2✔
147
        self.dim.createSession(sessionId)
2✔
148
        self.assertEqual(0, self.dim.getGraphSize(sessionId))
2✔
149
        self.dim.addGraphSpec(sessionId, graphSpec)
2✔
150
        self.assertEqual(1, self.dim.getGraphSize(sessionId))
2✔
151
        graphFromDM = self.dm.getGraph(sessionId)
2✔
152
        self.assertEqual(1, len(graphFromDM))
2✔
153
        dropSpec = list(graphFromDM.values())[0]
2✔
154
        self.assertEqual("A", dropSpec["oid"])
2✔
155
        self.assertEqual("data", dropSpec["type"])
2✔
156
        self.assertEqual("Memory", dropSpec["storage"])
2✔
157

158
    def test_deployGraph(self):
2✔
159

160
        sessionId = "lalo"
2✔
161
        self.createSessionAndAddTypicalGraph(sessionId)
2✔
162

163
        # Deploy now and get A and C
164
        self.dim.deploySession(sessionId)
2✔
165
        a, c = [self.dm._sessions[sessionId].drops[x] for x in ("A", "C")]
2✔
166

167
        data = os.urandom(10)
2✔
168
        with droputils.DROPWaiterCtx(self, c, 3):
2✔
169
            a.write(data)
2✔
170
            a.setCompleted()
2✔
171

172
        self.assertEqual(data, droputils.allDropContents(c))
2✔
173

174
    def test_deployGraphWithCompletedDOs(self):
2✔
175
        self._test_deployGraphWithCompletedDOs("lalo")
2✔
176

177
    def test_deployGraphWithCompletedDOs_sessionIdWithSpaces(self):
2✔
178
        self._test_deployGraphWithCompletedDOs("lala with spaces")
2✔
179

180
    def _test_deployGraphWithCompletedDOs(self, sessionId):
2✔
181

182
        self.createSessionAndAddTypicalGraph(sessionId, sleepTime=1)
2✔
183

184
        # Deploy now and get C
185
        self.dim.deploySession(sessionId, completedDrops=["A"])
2✔
186
        c = self.dm._sessions[sessionId].drops["C"]
2✔
187

188
        # This should be happening before the sleepTime expires
189
        with droputils.DROPWaiterCtx(self, c, 2):
2✔
190
            pass
2✔
191

192
        self.assertEqual(DROPStates.COMPLETED, c.status)
2✔
193

194
    def test_sessionStatus(self):
2✔
195
        def assertSessionStatus(sessionId, status):
2✔
196
            sessionStatus = self.dim.getSessionStatus(sessionId)
2✔
197
            self.assertEqual(1, len(sessionStatus))
2✔
198
            self.assertIn(hostname, sessionStatus)
2✔
199
            self.assertEqual(status, sessionStatus[hostname])
2✔
200
            self.assertEqual(status, self.dm.getSessionStatus(sessionId))
2✔
201

202
        sessionId = "lala"
2✔
203
        self.dim.createSession(sessionId)
2✔
204
        assertSessionStatus(sessionId, SessionStates.PRISTINE)
2✔
205

206
        sessionId = "lalo"
2✔
207
        self.createSessionAndAddTypicalGraph(sessionId)
2✔
208
        assertSessionStatus(sessionId, SessionStates.BUILDING)
2✔
209

210
        self.dm.deploySession(sessionId)
2✔
211
        assertSessionStatus(sessionId, SessionStates.RUNNING)
2✔
212

213
        a, c = [self.dm._sessions[sessionId].drops[x] for x in ("A", "C")]
2✔
214
        data = os.urandom(10)
2✔
215
        with droputils.DROPWaiterCtx(self, c, 3):
2✔
216
            a.write(data)
2✔
217
            a.setCompleted()
2✔
218

219
        assertSessionStatus(sessionId, SessionStates.FINISHED)
2✔
220

221
    def test_getGraph(self):
2✔
222

223
        sessionId = "lalo"
2✔
224
        self.createSessionAndAddTypicalGraph(sessionId)
2✔
225

226
        graphSpecFromDim = self.dim.getGraph(sessionId)
2✔
227
        self.assertEqual(3, len(graphSpecFromDim))
2✔
228
        for oid in ("A", "B", "C"):
2✔
229
            self.assertIn(oid, graphSpecFromDim)
2✔
230
        graphSepcFromDM = self.dm.getGraph(sessionId)
2✔
231
        self.assertDictEqual(graphSepcFromDM, graphSpecFromDim)
2✔
232

233
    def test_getGraphStatus(self):
2✔
234
        def assertGraphStatus(sessionId, expectedStatus):
2✔
235
            graphStatusByDim = self.dim.getGraphStatus(sessionId)
2✔
236
            graphStatusByDM = self.dm.getGraphStatus(sessionId)
2✔
237
            self.assertDictEqual(graphStatusByDim, graphStatusByDM)
2✔
238
            for dropStatus in graphStatusByDim.values():
2✔
239
                self.assertEqual(expectedStatus, dropStatus["status"])
2✔
240

241
        sessionId = "lala"
2✔
242
        self.createSessionAndAddTypicalGraph(sessionId)
2✔
243
        self.dim.deploySession(sessionId)
2✔
244
        assertGraphStatus(sessionId, DROPStates.INITIALIZED)
2✔
245

246
        a, c = [self.dm._sessions[sessionId].drops[x] for x in ("A", "C")]
2✔
247
        data = os.urandom(10)
2✔
248
        with droputils.DROPWaiterCtx(self, c, 3):
2✔
249
            a.write(data)
2✔
250
            a.setCompleted()
2✔
251
        assertGraphStatus(sessionId, DROPStates.COMPLETED)
2✔
252

253
    def test_doCancel(self):
2✔
254
        def assertGraphStatus(sessionId, expectedStatus):
2✔
255
            graphStatusByDim = self.dim.getGraphStatus(sessionId)
2✔
256
            graphStatusByDM = self.dm.getGraphStatus(sessionId)
2✔
257
            self.assertDictEqual(graphStatusByDim, graphStatusByDM)
2✔
258
            for dropStatus in graphStatusByDim.values():
2✔
259
                self.assertEqual(expectedStatus, dropStatus["status"])
2✔
260

261
        sessionId = "lala"
2✔
262
        self.createSessionAndAddTypicalGraph(sessionId, 10)
2✔
263
        self.dim.deploySession(sessionId)
2✔
264
        assertGraphStatus(sessionId, DROPStates.INITIALIZED)
2✔
265

266
        self.dim.cancelSession(sessionId)
2✔
267

268
        # a, c = [self.dm._sessions[sessionId].drops[x] for x in ('A', 'C')]
269
        # data = os.urandom(10)
270
        # with droputils.DROPWaiterCtx(self, c, 3):
271
        #    a.write(data)
272
        #    a.setCompleted()
273
        assertGraphStatus(sessionId, DROPStates.CANCELLED)
2✔
274

275
    def test_submit_unreprodata(self):
2✔
276
        """
277
        Need to ensure that the DIM can handle a graph with empty reprodata
278
        (the default if nothing is provided at translation time)
279
        """
280
        graphSpec = [
2✔
281
            {
282
                "oid": "A",
283
                "type": "data",
284
                "storage": Categories.MEMORY,
285
                "node": hostname,
286
                "consumers": ["B"],
287
            },
288
            {
289
                "oid": "B",
290
                "type": "app",
291
                "app": "dlg.apps.simple.SleepAndCopyApp",
292
                "sleepTime": 1,
293
                "outputs": ["C"],
294
                "node": hostname,
295
            },
296
            {
297
                "oid": "C",
298
                "type": "data",
299
                "storage": Categories.MEMORY,
300
                "node": hostname,
301
            },
302
            {}  # A dummy empty reprodata (the default if absolutely nothing is specified)
303
        ]
304
        self.dim.createSession('a')
2✔
305
        self.assertEqual(0, self.dim.getGraphSize('a'))
2✔
306
        self.dim.addGraphSpec('a', graphSpec)
2✔
307
        self.assertEqual(len(graphSpec), self.dim.getGraphSize('a'))
2✔
308

309
    def test_submit_noreprodata(self):
2✔
310
        """
311
        Need to ensure that the DIM can handle a graph with no reprodata
312
        (the default if nothing is provided at translation time)
313
        """
314
        graphSpec = [
2✔
315
            {
316
                "oid": "A",
317
                "type": "data",
318
                "storage": Categories.MEMORY,
319
                "node": hostname,
320
                "consumers": ["B"],
321
            },
322
            {
323
                "oid": "B",
324
                "type": "app",
325
                "app": "dlg.apps.simple.SleepAndCopyApp",
326
                "sleepTime": 1,
327
                "outputs": ["C"],
328
                "node": hostname,
329
            },
330
            {
331
                "oid": "C",
332
                "type": "data",
333
                "storage": Categories.MEMORY,
334
                "node": hostname,
335
            },
336
        ]
337
        self.dim.createSession('a')
2✔
338
        self.assertEqual(0, self.dim.getGraphSize('a'))
2✔
339
        self.dim.addGraphSpec('a', graphSpec)
2✔
340
        self.assertEqual(len(graphSpec), self.dim.getGraphSize('a'))
2✔
341

342

343
class TestREST(LocalDimStarter, unittest.TestCase):
2✔
344
    def test_fullRound(self):
2✔
345
        """
346
        A test that exercises most of the REST interface exposed on top of the
347
        DataIslandManager
348
        """
349

350
        sessionId = "lala"
2✔
351
        restPort = 8989  # don't interfere with EAGLE default port
2✔
352
        args = ["--port", str(restPort), "-N", hostname, "-qqq"]
2✔
353
        dimProcess = tool.start_process("dim", args)
2✔
354

355
        with testutils.terminating(dimProcess, timeout=10):
2✔
356

357
            # Wait until the REST server becomes alive
358
            self.assertTrue(
2✔
359
                utils.portIsOpen("localhost", restPort, timeout=10),
360
                "REST server didn't come up in time",
361
            )
362

363
            # The DIM is still empty
364
            sessions = testutils.get(self, "/sessions", restPort)
2✔
365
            self.assertEqual(0, len(sessions))
2✔
366
            dimStatus = testutils.get(self, "", restPort)
2✔
367
            self.assertEqual(1, len(dimStatus["hosts"]))
2✔
368
            self.assertEqual(hostname, dimStatus["hosts"][0])
2✔
369
            self.assertEqual(0, len(dimStatus["sessionIds"]))
2✔
370

371
            # Create a session and check it exists
372
            testutils.post(
2✔
373
                self, "/sessions", restPort, '{"sessionId":"%s"}' % (sessionId)
374
            )
375
            sessions = testutils.get(self, "/sessions", restPort)
2✔
376
            self.assertEqual(1, len(sessions))
2✔
377
            self.assertEqual(sessionId, sessions[0]["sessionId"])
2✔
378
            self.assertDictEqual(
2✔
379
                {hostname: SessionStates.PRISTINE}, sessions[0]["status"]
380
            )
381

382
            # Add this complex graph spec to the session
383
            # The UID of the two leaf nodes of this complex.js graph are T and S
384
            # Since the original complexGraph doesn't have node information
385
            # we need to add it manually before submitting -- otherwise it will
386
            # get rejected by the DIM.
387
            with pkg_resources.resource_stream(
2✔
388
                    "test", "graphs/complex.js"
389
            ) as f:  # @UndefinedVariable
390
                complexGraphSpec = json.load(codecs.getreader("utf-8")(f))
2✔
391
                logger.debug(f"Loaded graph: {f}")
2✔
392
            for dropSpec in complexGraphSpec:
2✔
393
                dropSpec["node"] = hostname
2✔
394
            testutils.post(
2✔
395
                self,
396
                "/sessions/%s/graph/append" % (sessionId),
397
                restPort,
398
                json.dumps(complexGraphSpec),
399
            )
400
            self.assertEqual(
2✔
401
                {hostname: SessionStates.BUILDING},
402
                testutils.get(self, "/sessions/%s/status" % (sessionId), restPort),
403
            )
404

405
            # Now we deploy the graph...
406
            testutils.post(
2✔
407
                self,
408
                "/sessions/%s/deploy" % (sessionId),
409
                restPort,
410
                "completed=SL_A,SL_B,SL_C,SL_D,SL_K",
411
                mimeType="application/x-www-form-urlencoded",
412
            )
413
            self.assertEqual(
2✔
414
                {hostname: SessionStates.RUNNING},
415
                testutils.get(self, "/sessions/%s/status" % (sessionId), restPort),
416
            )
417

418
            # ...and write to all 5 root nodes that are listening in ports
419
            # starting at 1111
420
            msg = os.urandom(10)
2✔
421
            for i in range(5):
2✔
422
                utils.write_to(
2✔
423
                    "localhost", 1111 + i, msg, 2
424
                ), "Couldn't write data to localhost:%d" % (1111 + i)
425

426
            # Wait until the graph has finished its execution. We'll know
427
            # it finished by polling the status of the session
428
            while (
2✔
429
                    SessionStates.RUNNING
430
                    in testutils.get(
431
                self, "/sessions/%s/status" % (sessionId), restPort
432
            ).values()
433
            ):
UNCOV
434
                time.sleep(0.2)
×
435

436
            self.assertEqual(
2✔
437
                {hostname: SessionStates.FINISHED},
438
                testutils.get(self, "/sessions/%s/status" % (sessionId), restPort),
439
            )
440
            testutils.delete(self, "/sessions/%s" % (sessionId), restPort)
2✔
441
            sessions = testutils.get(self, "/sessions", restPort)
2✔
442
            self.assertEqual(0, len(sessions))
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc