• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ICRAR / daliuge / 19664018267

25 Nov 2025 09:05AM UTC coverage: 81.311% (-0.02%) from 81.328%
19664018267

Pull #377

github

web-flow
Merge 846f893ea into 47a138d72
Pull Request #377: LIU-529: Fix issue no prefix file download.

13 of 22 new or added lines in 7 files covered. (59.09%)

2 existing lines in 1 file now uncovered.

18995 of 23361 relevant lines covered (81.31%)

3.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.12
/daliuge-engine/dlg/manager/rest.py
1
#
2
#    ICRAR - International Centre for Radio Astronomy Research
3
#    (c) UWA - The University of Western Australia, 2015
4
#    Copyright by UWA (in the framework of the ICRAR)
5
#    All rights reserved
6
#
7
#    This library is free software; you can redistribute it and/or
8
#    modify it under the terms of the GNU Lesser General Public
9
#    License as published by the Free Software Foundation; either
10
#    version 2.1 of the License, or (at your option) any later version.
11
#
12
#    This library is distributed in the hope that it will be useful,
13
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
14
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15
#    Lesser General Public License for more details.
16
#
17
#    You should have received a copy of the GNU Lesser General Public
18
#    License along with this library; if not, write to the Free Software
19
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston,
20
#    MA 02111-1307  USA
21
#
22
"""
23
Module containing the REST layer that exposes the methods of the different
24
Data Managers (DROPManager and DataIslandManager) to the outside world.
25
"""
26
# pylint: disable=protected-access
27

28
from email.message import Message
6✔
29
import functools
6✔
30
import io
6✔
31
import json
6✔
32
import logging
6✔
33
import os
6✔
34
import re
6✔
35
import tarfile
6✔
36
import threading
6✔
37

38
import bottle
6✔
39

40
from bottle import static_file
6✔
41
from pathlib import Path
6✔
42

43
from dlg import constants
6✔
44
from dlg.manager.client import NodeManagerClient, DataIslandManagerClient
6✔
45
from dlg import utils
6✔
46
from dlg.exceptions import (
6✔
47
    InvalidGraphException,
48
    InvalidSessionState,
49
    DaliugeException,
50
    NoSessionException,
51
    SessionAlreadyExistsException,
52
    InvalidDropException,
53
    InvalidRelationshipException,
54
    SubManagerException,
55
)
56
from dlg.restserver import RestServer
6✔
57
from dlg.restutils import RestClient, RestClientException
6✔
58
from dlg.manager.session import generateLogFileName
6✔
59
from dlg.common.deployment_methods import DeploymentMethods
6✔
60
from dlg.common.version import version as dlg_version
6✔
61
from dlg.manager.manager_data import Node
6✔
62

63
logger = logging.getLogger(f"dlg.{__name__}")
6✔
64

65
def file_as_string(fname, enc="utf8"):
6✔
66
    res = Path(__file__).parent / fname
3✔
67
    return utils.b2s(res.read_bytes(), enc)
3✔
68

69

70
def daliuge_aware(func):
6✔
71
    @functools.wraps(func)
6✔
72
    def fwrapper(*args, **kwargs):
6✔
73
        try:
6✔
74
            res = func(*args, **kwargs)
6✔
75

76
            if isinstance(res, (bytes, bottle.HTTPResponse)):
6✔
77
                return res
×
78

79
            if res is not None:
6✔
80
                # set CORS headers
81
                origin = bottle.request.headers.raw("Origin")
6✔
82
                # logger.debug("CORS request comming from: %s", origin)
83
                # logger.debug("Request method: %s", bottle.request.method)
84
                if origin is None or re.match(
6✔
85
                    r"(http://dlg-tm.local:80[0-9][0-9]|https://dlg-tm.icrar.org|http://dlg-tm.localhost)",
86
                    origin,
87
                ):
88
                    pass
6✔
89
                elif re.match(r"http://((localhost)|(127.0.0.1)):80[0-9][0-9]", origin):
×
90
                    origin = "http://localhost:8084"
×
91

92
                bottle.response.headers["Access-Control-Allow-Origin"] = origin
6✔
93
                bottle.response.headers["Access-Control-Allow-Credentials"] = "true"
6✔
94
                bottle.response.headers["Access-Control-Allow-Methods"] = (
6✔
95
                    "GET, POST, PUT, OPTIONS, HEAD"
96
                )
97
                bottle.response.headers["Access-Control-Allow-Headers"] = (
6✔
98
                    "Origin, Accept, Content-Type, Content-Encoding, X-Requested-With, X-CSRF-Token"
99
                )
100
                # logger.debug("CORS headers set to allow from: %s", origin)
101
            bottle.response.content_type = "application/json"
6✔
102
            # logger.debug("REST function called: %s", func.__name__)
103
            jres = (
6✔
104
                json.dumps(res)
105
                if res is not None
106
                else json.dumps({"Status": "Success"})
107
            )
108
            # logger.debug("Bottle sending back result: %s", jres[: min(len(jres), 80)])
109
            return jres
6✔
110
        except Exception as e: # pylint: disable=broad-exception-caught
3✔
111
            logger.exception("Error while fulfilling request for func %s ", func)
3✔
112

113
            status, eargs = 500, ()
3✔
114
            if isinstance(e, NotImplementedError):
3✔
115
                status, eargs = 501, e.args
×
116
            elif isinstance(e, NoSessionException):
3✔
117
                status, eargs = 404, (e.session_id,)
3✔
118
            elif isinstance(e, SessionAlreadyExistsException):
3✔
119
                status, eargs = 409, (e.session_id,)
3✔
120
            elif isinstance(e, InvalidDropException):
3✔
121
                status, eargs = 409, ((e.oid, e.uid), e.reason)
×
122
            elif isinstance(e, InvalidRelationshipException):
3✔
123
                status, eargs = 409, (e.rel, e.reason)
×
124
            elif isinstance(e, InvalidGraphException):
3✔
125
                status, eargs = 400, e.args
×
126
            elif isinstance(e, InvalidSessionState):
3✔
127
                status, eargs = 400, e.args
3✔
128
            elif isinstance(e, RestClientException):
3✔
129
                status, eargs = 556, e.args
×
130
            elif isinstance(e, SubManagerException):
3✔
131
                status = 555
3✔
132
                eargs = {}
3✔
133
                # args[1] is a dictionary of host:exception
134
                for host, subex in e.args[1].items():
3✔
135
                    logger.debug(">>>> Error class name: %s", subex.__class__.__name__)
3✔
136
                    eargs[host] = {
3✔
137
                        "type": subex.__class__.__name__,
138
                        # "args": subex.args,
139
                        "args": "dummy",
140
                    }
141
            elif isinstance(e, DaliugeException):
3✔
142
                status, eargs = 555, e.args
3✔
143
            else:
144
                raise
×
145

146
            error = {"type": e.__class__.__name__, "args": eargs}
3✔
147
            bottle.response.status = status
3✔
148
            return json.dumps(error)
3✔
149

150
    return fwrapper
6✔
151

152

153
class ManagerRestServer(RestServer):
6✔
154
    """
155
    An object that wraps a DataManager and exposes its methods via a REST
156
    interface. The server is started via the `start` method in a separate thread
157
    and runs until the process is shut down.
158

159
    This REST server currently also serves HTML pages in some of its methods
160
    (i.e. those not under /api).
161
    """
162

163
    def __init__(self, dm, maxreqsize=10):
6✔
164
        super(ManagerRestServer, self).__init__()
6✔
165

166
        # Increase maximum file sizes
167
        bottle.BaseRequest.MEMFILE_MAX = maxreqsize * 1024 * 1024
6✔
168

169
        self.dm = dm
6✔
170

171
        # Mappings
172
        app = self.app
6✔
173
        app.get("/api/submission_method", callback=self.submit_methods)
6✔
174
        app.post("/api/stop", callback=self.stop_manager)
6✔
175
        app.post("/api/sessions", callback=self.createSession)
6✔
176
        app.get("/api/sessions", callback=self.getSessions)
6✔
177
        app.get("/api/sessions/<sessionId>", callback=self.getSessionInformation)
6✔
178
        app.delete("/api/sessions/<sessionId>", callback=self.destroySession)
6✔
179
        app.get("/api/sessions/<sessionId>/logs", callback=self.getLogFile)
6✔
180
        app.get("/api/sessions/<sessionId>/status", callback=self.getSessionStatus)
6✔
181
        app.post("/api/sessions/<sessionId>/deploy", callback=self.deploySession)
6✔
182
        app.post("/api/sessions/<sessionId>/cancel", callback=self.cancelSession)
6✔
183
        app.get("/api/sessions/<sessionId>/graph", callback=self.getGraph)
6✔
184
        app.get("/api/sessions/<sessionId>/graph/size", callback=self.getGraphSize)
6✔
185
        app.get(
6✔
186
            "/api/sessions/<sessionId>/graph/status",
187
            callback=self.getGraphStatus,
188
        )
189
        app.post(
6✔
190
            "/api/sessions/<sessionId>/graph/append",
191
            callback=self.addGraphSpec,
192
        )
193
        app.get(
6✔
194
            "/api/sessions/<sessionId>/repro/data",
195
            callback=self.getSessionReproData,
196
        )
197
        app.get(
6✔
198
            "/api/sessions/<sessionId>/repro/status",
199
            callback=self.getSessionReproStatus,
200
        )
201

202
        app.route("/api/sessions", method="OPTIONS", callback=self.acceptPreflight)
6✔
203
        app.route(
6✔
204
            "/api/sessions/<sessionId>/graph/append",
205
            method="OPTIONS",
206
            callback=self.acceptPreflight2,
207
        )
208

209
        # The non-REST mappings that serve HTML-related content
210
        app.route("/static/<filepath:path>", callback=self.server_static)
6✔
211
        app.get("/session", callback=self.visualizeSession)
6✔
212
        app.route("/api/sessions/<sessionId>/dir", callback=self._getSessionDir)
6✔
213

214
        # The DROP callbacks require both a /session and /api/session interface to
215
        # allow for both links that serve the HTML (/sessions) whils also support managers
216
        # communicating to each other (/api/sessions), which occurs when commands
217
        # are replicated by the DIM to multiple nodes. /api/session calls will return only
218
        # data.
219
        #
220
        # See dlg.common.clients.BaseDROPManagerClient._requests for where the replicated
221
        # URL request is created.
222

223
        app.route("/sessions/<sessionId>/graph/drop/<dropId>",
6✔
224
                  callback=self.getDropStatus)
225
        app.route("/api/sessions/<sessionId>/graph/drop/<dropId>",
6✔
226
                callback=self._getDropStatus)
227

228
        app.route("/sessions/<sessionId>/graph/drop/data/<dropId>",
6✔
229
                  callback=self.getDropData)
230
        app.route("/api/sessions/<sessionId>/graph/drop/data/<dropId>",
6✔
231
                callback=self._getDropData)
232

233
        # sub-class specifics
234
        self.initializeSpecifics(app)
6✔
235

236
    def initializeSpecifics(self, app):
6✔
237
        """
238
        Methods through which subclasses can initialize other mappings on top of
239
        the default ones and perform other DataManager-specific actions.
240
        The default implementation does nothing.
241
        """
242

243
    @daliuge_aware
6✔
244
    def submit_methods(self):
6✔
245
        return {"methods": [DeploymentMethods.BROWSER, DeploymentMethods.SERVER]}
3✔
246

247
    def _stop_manager(self):
6✔
248
        self.dm.shutdown()
×
249
        self.stop()
×
250
        logger.info(
×
251
            "Thanks for using our %s, come back again :-)",
252
            self.dm.__class__.__name__,
253
        )
254

255
    @daliuge_aware
6✔
256
    def stop_manager(self):
6✔
257
        threading.Thread(target=self._stop_manager).start()
×
258

259
    @daliuge_aware
6✔
260
    def createSession(self):
6✔
261
        newSession = bottle.request.json
6✔
262
        sessionId = newSession["sessionId"]
6✔
263
        self.dm.createSession(sessionId)
6✔
264
        return {"sessionId": sessionId}
6✔
265

266
    @daliuge_aware
6✔
267
    def acceptPreflight(self):
6✔
268
        return {}
×
269

270
    @daliuge_aware
6✔
271
    def acceptPreflight2(self, sessionId):
6✔
272
        logger.info("Preflight2 for %s", sessionId)
×
273
        return {}
×
274

275
    def sessions(self):
6✔
276
        sessions = []
6✔
277
        for sessionId in self.dm.getSessionIds():
6✔
278
            sessions.append(
6✔
279
                {
280
                    "sessionId": sessionId,
281
                    "status": self.dm.getSessionStatus(sessionId),
282
                    "size": self.dm.getGraphSize(sessionId),
283
                }
284
            )
285
        return sessions
6✔
286

287
    @daliuge_aware
6✔
288
    def getSessions(self):
6✔
289
        return self.sessions()
6✔
290

291
    @daliuge_aware
6✔
292
    def getSessionInformation(self, sessionId):
6✔
293
        status = self.dm.getSessionStatus(sessionId)
6✔
294
        try:
6✔
295
            graphDict = self.dm.getGraph(sessionId)
6✔
296
            directory = self.dm.getSessionDir(sessionId)
6✔
297
        except KeyError:  # Pristine state sessions don't have a graph, yet.
×
298
            graphDict = {}
×
299
            status = 0
×
300
            directory = ""
×
301
        return {"status": status, "graph": graphDict, "dir":directory}
6✔
302

303
    @daliuge_aware
6✔
304
    def getSessionReproStatus(self, sessionId):
6✔
305
        return self.dm.getSessionReproStatus(sessionId)
3✔
306

307
    @daliuge_aware
6✔
308
    def getSessionsReproStatus(self):
6✔
309
        sessions = []
×
310
        for sessionId in self.dm.getSessionIds():
×
311
            sessions.append(
×
312
                {
313
                    "sessionId": sessionId,
314
                    "status": self.dm.getSessionStatus(sessionId),
315
                    "size": self.dm.getGraphSize(sessionId),
316
                    "repro": self.dm.getSessionReproStatus(sessionId),
317
                }
318
            )
319
        return sessions
×
320

321
    @daliuge_aware
6✔
322
    def getSessionReproData(self, sessionId):
6✔
323
        #  For now, we only have information on a per-graph basis.
324
        graphDict = self.dm.getGraph(sessionId)
3✔
325
        reprodata = self.dm.getGraphReproData(sessionId)
3✔
326
        return {"graph": graphDict, "reprodata": reprodata}
3✔
327

328
    @daliuge_aware
6✔
329
    def destroySession(self, sessionId):
6✔
330
        self.dm.destroySession(sessionId)
3✔
331

332
    @daliuge_aware
6✔
333
    def getSessionStatus(self, sessionId):
6✔
334
        return self.dm.getSessionStatus(sessionId)
6✔
335

336
    @daliuge_aware
6✔
337
    def deploySession(self, sessionId):
6✔
338
        completedDrops = []
6✔
339
        if "completed" in bottle.request.forms:
6✔
340
            completedDrops = bottle.request.forms["completed"].split(",")
6✔
341
        return self.dm.deploySession(sessionId, completedDrops=completedDrops)
6✔
342
        # return {"Status": "Success"}
343

344
    @daliuge_aware
6✔
345
    def cancelSession(self, sessionId):
6✔
346
        self.dm.cancelSession(sessionId)
3✔
347

348
    @daliuge_aware
6✔
349
    def getGraph(self, sessionId):
6✔
350
        return self.dm.getGraph(sessionId)
6✔
351

352
    @daliuge_aware
6✔
353
    def getGraphSize(self, sessionId):
6✔
354
        return self.dm.getGraphSize(sessionId)
6✔
355

356
    @daliuge_aware
6✔
357
    def getGraphStatus(self, sessionId):
6✔
358
        return self.dm.getGraphStatus(sessionId)
6✔
359

360
    @daliuge_aware
6✔
361
    def addGraphSpec(self, sessionId):
6✔
362
        # WARNING: TODO: Somehow, the content_type can be overwritten to 'text/plain'
363
        logger.debug("Graph content type: %s", bottle.request.content_type)
6✔
364
        if (
6✔
365
            "application/json" not in bottle.request.content_type
366
            and "text/plain" not in bottle.request.content_type
367
        ):
368
            bottle.response.status = 415
×
369
            return
×
370

371
        # We also accept gzipped content
372
        hdrs = bottle.request.headers
6✔
373
        logger.debug("Graph hdr: %s", {k: v for k, v in hdrs.items()})
6✔
374
        if hdrs.get("Content-Encoding", None) == "gzip":
6✔
375
            json_content = utils.ZlibUncompressedStream(bottle.request.body)
6✔
376
        else:
377
            json_content = bottle.request.body
×
378

379
        graph_parts = bottle.json_loads(json_content.read())
6✔
380

381
        # Do something about host Nodes in graph_parts?
382
        return self.dm.addGraphSpec(sessionId, graph_parts)
6✔
383
        # return {"graph_parts": graph_parts}
384

385
    # ===========================================================================
386
    # non-REST methods
387
    # ===========================================================================
388
    def server_static(self, filepath):
6✔
389
        staticRoot = Path(__file__).parent / "web/static"
×
390
        return bottle.static_file(filepath, root=staticRoot)
×
391

392
    def _getSessionDir(self, sessionId):
6✔
393
        return self.dm.getSessionDir(sessionId)
6✔
394

395
    def visualizeSession(self):
6✔
396
        params = bottle.request.params
3✔
397
        sessionId = params["sessionId"] if "sessionId" in params else ""
3✔
398
        selectedNode = params["node"] if "node" in params else ""
3✔
399
        viewMode = params["view"] if "view" in params else ""
3✔
400
        tpl = file_as_string("web/session.html")
3✔
401
        urlparts = bottle.request.urlparts
3✔
402
        serverUrl = urlparts.scheme + "://" + urlparts.netloc
3✔
403
        return bottle.template(
3✔
404
            tpl,
405
            sessionId=sessionId,
406
            selectedNode=selectedNode,
407
            viewMode=viewMode,
408
            serverUrl=serverUrl,
409
            dmType=self.dm.__class__.__name__,
410
            version=dlg_version,
411
            sessionDir=sessionId
412
        )
413

414
    def _getDropStatus(self, sessionId, dropId):
6✔
415
        return self.dm.getDropStatus(sessionId, dropId)
×
416

417
    def getDropStatus(self, sessionId, dropId):
6✔
418
        params = bottle.request.params
×
419
        logger.warning("PARAMS: %s", params)
×
420

421
        urlparts = bottle.request.urlparts
×
422
        serverUrl = urlparts.scheme + "://" + urlparts.netloc
×
423

424
        data = self._getDropStatus(sessionId, dropId)
×
425
        if data["logs"]:
×
426
            columns = [col for col in data["logs"][-1].keys()]
×
427
            filter_column = "Level"
×
428
            filter_column_index = columns.index(filter_column)
×
429
        else:
430
            columns = []
×
431
            filter_column_index=0
×
432

433
        tpl = file_as_string("web/drop_log.html")
×
434
        return bottle.template(
×
435
            tpl,
436
            data=data,
437
            columns=columns,
438
            filter_index=filter_column_index,
439
            sessionId=sessionId,
440
            serverUrl=serverUrl,
441
            dmType=self.dm.__class__.__name__,
442
            version=str(dlg_version)
443
        )
444

445
    def _getDropData(self, sessionId, dropId):
6✔
446
        return  self.dm.getDropData(sessionId, dropId)
×
447

448
    def getDropData(self, sessionId, dropId):
6✔
449
        """
450
        Given a Drop from this session, attempt to produce the file
451
        :param sessionId:
452
        :param dropId:
453
        :return:
454
        """
455
        fpath = self._getDropData(sessionId, dropId)['filepath']
×
456

457
        if not fpath: # Empty string, no filepath
×
458
            return {"No file exists"}
×
459

460
        elif not Path(fpath).exists():
×
461
            bottle.response.status = 404
×
462
            return {"error": f"File '{fpath}' not found"}
×
463
        else:
464
            path = Path(fpath)
×
465
            filename = path.name
×
466
            root = path.parent
×
467
            if not path.suffix:
×
NEW
468
                fdownload = f"{filename}.txt"
×
469
            else:
NEW
470
                fdownload = filename
×
NEW
471
            return static_file(filename, root=str(root), download=fdownload)
×
472

473
class NMRestServer(ManagerRestServer):
6✔
474
    """
475
    A REST server for NodeManagers. It includes mappings for NM-specific
476
    methods and the mapping for the main visualization HTML pages.
477
    """
478

479
    def initializeSpecifics(self, app):
6✔
480
        app.get("/api", callback=self.getNMStatus)
6✔
481
        app.post(
6✔
482
            "/api/sessions/<sessionId>/graph/link",
483
            callback=self.linkGraphParts,
484
        )
485
        app.post(
6✔
486
            "/api/sessions/<sessionId>/subscriptions",
487
            callback=self.add_node_subscriptions,
488
        )
489
        app.post("/api/sessions/<sessionId>/trigger", callback=self.trigger_drops)
6✔
490
        # The non-REST mappings that serve HTML-related content
491
        app.get("/", callback=self.visualizeDM)
6✔
492
        app.get("/api/shutdown", callback=self.shutdown_node_manager)
6✔
493

494
    @daliuge_aware
6✔
495
    def shutdown_node_manager(self):
6✔
496
        logger.debug("Shutting down node manager")
×
497
        self.dm.shutdown()
×
498

499
    @daliuge_aware
6✔
500
    def getNMStatus(self):
6✔
501
        # we currently return the sessionIds, more things might be added in the
502
        # future
503
        logger.debug("NM REST call: status")
×
504
        return {"sessions": self.sessions()}
×
505

506
    @daliuge_aware
6✔
507
    def getLogFile(self, sessionId):
6✔
508
        logger.debug("NM REST call: logfile")
3✔
509
        logdir = self.dm.getLogDir()
3✔
510
        logfile = generateLogFileName(logdir, sessionId)
3✔
511
        if not os.path.isfile(logfile):
3✔
512
            raise NoSessionException(sessionId, "Log file not found.")
3✔
513
        return static_file(
×
514
            os.path.basename(logfile),
515
            root=logdir,
516
            download=os.path.basename(logfile),
517
        )
518

519
    @daliuge_aware
6✔
520
    def linkGraphParts(self, sessionId):
6✔
521
        logger.debug("NM REST call: graph/link")
×
522
        params = bottle.request.params
×
523
        lhOID = params["lhOID"]
×
524
        rhOID = params["rhOID"]
×
525
        linkType = int(params["linkType"])
×
526
        self.dm.linkGraphParts(sessionId, lhOID, rhOID, linkType)
×
527

528
    @daliuge_aware
6✔
529
    def add_node_subscriptions(self, sessionId):
6✔
530
        logger.debug("NM REST call: add_subscriptions %s", bottle.request.json)
3✔
531
        if bottle.request.content_type != "application/json":
3✔
532
            bottle.response.status = 415
×
533
            return
×
534
        subscriptions = self._parse_subscriptions(bottle.request.json)
3✔
535
        self.dm.add_node_subscriptions(sessionId, subscriptions)
3✔
536

537
    def _parse_subscriptions(self, json_request):
6✔
538
        return {Node(host): droprels for host, droprels in json_request.items()}
3✔
539

540
    @daliuge_aware
6✔
541
    def trigger_drops(self, sessionId):
6✔
542
        if bottle.request.content_type != "application/json":
6✔
543
            bottle.response.status = 415
×
544
            return
×
545
        self.dm.trigger_drops(sessionId, bottle.request.json)
6✔
546

547
    # ===========================================================================
548
    # non-REST methods
549
    # ===========================================================================
550
    def visualizeDM(self):
6✔
551
        tpl = file_as_string("web/dm.html")
3✔
552
        urlparts = bottle.request.urlparts
3✔
553
        serverUrl = urlparts.scheme + "://" + urlparts.netloc
3✔
554
        return bottle.template(
3✔
555
            tpl,
556
            serverUrl=serverUrl,
557
            dmType=self.dm.__class__.__name__,
558
            version=dlg_version,
559
            reset="false",
560
        )
561

562

563
class CompositeManagerRestServer(ManagerRestServer):
6✔
564
    """
565
    A REST server for DataIslandManagers. It includes mappings for DIM-specific
566
    methods.
567
    """
568

569
    def initializeSpecifics(self, app):
6✔
570
        app.get("/api", callback=self.getCMStatus)
6✔
571
        app.get("/api/past_sessions", callback=self.getPastSessions)
6✔
572
        app.get("/api/nodes", callback=self.getCMNodes)
6✔
573
        app.post("/api/node/<node>", callback=self.addCMNode)
6✔
574
        app.delete("/api/node/<node>", callback=self.removeCMNode)
6✔
575

576
        # Query forwarding to sub-nodes
577
        app.get("/api/node/<node>/sessions", callback=self.getNodeSessions)
6✔
578
        app.get(
6✔
579
            "/api/node/<node>/sessions/<sessionId>",
580
            callback=self.getNodeSessionInformation,
581
        )
582
        app.get(
6✔
583
            "/api/node/<node>/sessions/<sessionId>/status",
584
            callback=self.getNodeSessionStatus,
585
        )
586
        app.get(
6✔
587
            "/api/node/<node>/sessions/<sessionId>/graph",
588
            callback=self.getNodeGraph,
589
        )
590
        app.get(
6✔
591
            "/api/node/<node>/sessions/<sessionId>/graph/status",
592
            callback=self.getNodeGraphStatus,
593
        )
594

595
        # The non-REST mappings that serve HTML-related content
596
        app.get("/", callback=self.visualizeDIM)
6✔
597

598
    @daliuge_aware
6✔
599
    def getCMStatus(self):
6✔
600
        """
601
        REST (GET): /api/
602

603
        Return JSON-compatible list of Composite Manager nodes and sessions
604
        """
605
        return {
×
606
            "hosts": [str(n) for n in self.dm.dmHosts],
607
            "sessionIds": self.dm.getSessionIds(),
608
        }
609

610
    @daliuge_aware
6✔
611
    def getCMNodes(self):
6✔
612
        """
613
        REST (GET): /api/nodes
614

615
        Return JSON-compatible list of Composite Manager nodes
616
        """
617
        return [str(n) for n in self.dm.nodes]
×
618

619
    def _getAllCMNodes(self):
6✔
620
        return self.dm.nodes
×
621

622
    @daliuge_aware
6✔
623
    def addCMNode(self, node):
6✔
624
        """
625
        REST (POST): "/api/node/<node>"
626

627
        Add the posted node to the Composite Manager
628

629
        Converts from JSON to our ser
630
        """
631
        logger.debug("Adding node %s", node)
×
632
        self.dm.add_node(Node(node))
×
633

634
    @daliuge_aware
6✔
635
    def removeCMNode(self, node):
6✔
636
        """
637
        REST (DELETE): "/api/node/<node>"
638

639
        Add the posted node to the Composite Manager
640

641
        """
642
        logger.debug("Removing node %s", node)
×
643
        self.dm.remove_node(Node(node))
×
644

645
    @daliuge_aware
6✔
646
    def getNodeSessions(self, node):
6✔
647
        """
648
        REST (GET): "/api/node/<node>/sessions"
649

650
        Retrieve sessions for given node
651
        """
652
        host_node = Node(node)
×
653
        if host_node not in self.dm.nodes:
×
654
            raise RuntimeError(f"{host_node} not in current list of nodes")
×
655
        with NodeManagerClient(host=host_node.host, port=host_node.port) as dm:
×
656
            return dm.sessions()
×
657

658
    @daliuge_aware
6✔
659
    def getPastSessions(self):
6✔
660
        """
661
        REST (GET): /api/past_sessions
662

663
        Return JSON-compatible list of Composite Manager nodes
664
        """
665
        return self.pastSessions()
×
666

667
    def pastSessions(self):
6✔
668
        """
669
        Retrieve sessions from this DropManager and place it in JSON-format, for
670
        serialisation across the wire.
671
        """
672

673
        return [
×
674
            {"sessionId": pastSession} for pastSession in self.dm.getPastSessionIds()
675
        ]
676

677
    def _tarfile_write(self, tar, headers, stream):
6✔
678
        file_header = headers.getheader("Content-Disposition")
×
679
        length = headers.getheader("Content-Length")
×
680
        m = Message()
×
681
        m.add_header("content-disposition", file_header)
×
682
        filename = m.get_params("filename")
×
683
        info = tarfile.TarInfo(filename)
×
684
        info.size = int(length)
×
685

686
        content = []
×
687
        while True:
×
688
            buffer = stream.read()
×
689
            if not buffer:
×
690
                break
×
691
            content.append(buffer)
×
692

693
        tar.addfile(info, io.BytesIO(initial_bytes="".join(content).encode()))
×
694

695
    @daliuge_aware
6✔
696
    def getLogFile(self, sessionId):
6✔
697
        fh = io.BytesIO()
×
698
        with tarfile.open(fileobj=fh, mode="w:gz") as tar:
×
699
            for node in self._getAllCMNodes():
×
700
                with NodeManagerClient(host=node.host, port=node.port) as dm:
×
701
                    try:
×
702
                        stream, resp = dm.get_log_file(sessionId)
×
703
                        self._tarfile_write(tar, resp, stream)
×
704
                    except NoSessionException:
×
705
                        pass
×
706

707
        data = fh.getvalue()
×
708
        size = len(data)
×
709
        bottle.response.set_header("Content-type", "application/x-tar")
×
710
        bottle.response["Content-Disposition"] = (
×
711
            f"attachment; " f"filename=dlg_{sessionId}.tar"
712
        )
713
        bottle.response["Content-Length"] = size
×
714
        return data
×
715

716
    @daliuge_aware
6✔
717
    def getNodeSessionInformation(self, node: str, sessionId):
6✔
718
        try:
×
719
            n = Node(self.dm.get_node_from_json(node))
×
720
            with NodeManagerClient(host=n.host, port=n.port) as dm:
×
721
                return dm.session(sessionId)
×
722
        except ValueError as e:
×
723
            raise ValueError(f"{n} not in current list of nodes") from e
×
724

725
    @daliuge_aware
6✔
726
    def getNodeSessionStatus(self, node: str, sessionId):
6✔
727
        try:
×
728
            n = Node(self.dm.get_node_from_json(node))
×
729
            with NodeManagerClient(host=n, port=n.port) as dm:
×
730
                return dm.session_status(sessionId)
×
731
        except ValueError as e:
×
732
            raise ValueError(f"{node} not in current list of nodes") from e
×
733

734
    @daliuge_aware
6✔
735
    def getNodeGraph(self, node: str, sessionId):
6✔
736
        try:
×
737
            n = Node(self.dm.get_node_from_json(node))
×
738
            with NodeManagerClient(host=n.host, port=n.port) as dm:
×
739
                return dm.graph(sessionId)
×
740
        except ValueError as e:
×
741
            raise ValueError(f"{node} not in current list of nodes") from e
×
742

743
    @daliuge_aware
6✔
744
    def getNodeGraphStatus(self, node: str, sessionId):
6✔
745
        try:
×
746
            n = Node(self.dm.get_node_from_json(node))
×
747
            with NodeManagerClient(host=n.host, port=n.port) as dm:
×
748
                return dm.graph_status(sessionId)
×
749
        except ValueError as e:
×
750
            raise ValueError(f"{node} not in current list of nodes") from e
×
751

752
    # ===========================================================================
753
    # non-REST methods
754
    # ===========================================================================
755

756
    def visualizeDIM(self):
6✔
757
        tpl = file_as_string("web/dim.html")
×
758
        urlparts = bottle.request.urlparts
×
759
        selectedNode = (
×
760
            bottle.request.params["node"] if "node" in bottle.request.params else ""
761
        )
762
        serverUrl = urlparts.scheme + "://" + urlparts.netloc
×
763
        return bottle.template(
×
764
            tpl,
765
            dmType=self.dm.__class__.__name__,
766
            dmPort=self.dm.dmPort,
767
            serverUrl=serverUrl,
768
            dmHosts=json.dumps([str(n) for n in self.dm.dmHosts]),
769
            nodes=json.dumps([str(n) for n in self.dm.nodes]),
770
            selectedNode=selectedNode,
771
            version=dlg_version
772
        )
773

774

775
class MasterManagerRestServer(CompositeManagerRestServer):
6✔
776
    def initializeSpecifics(self, app):
6✔
777
        CompositeManagerRestServer.initializeSpecifics(self, app)
×
778
        # DIM manamagement
779
        app.post("/api/island/<dim>", callback=self.addDIM)
×
780
        app.delete("/api/island/<dim>", callback=self.removeDIM)
×
781
        # Query forwarding to daemons
782
        app.post("/api/managers/<host>/island", callback=self.createDataIsland)
×
783
        app.post("/api/managers/<host>/node/start", callback=self.startNM)
×
784
        app.post("/api/managers/<host>/node/stop", callback=self.stopNM)
×
785
        # manage node manager assignment
786
        app.post("/api/managers/<host>/node/<node>", callback=self.addNM)
×
787
        app.delete("/api/managers/<host>/node/<node>", callback=self.removeNM)
×
788
        # Querying about managers
789
        app.get("/api/islands", callback=self.getDIMs)
×
790
        app.get("/api/nodes", callback=self.getNMs)
×
791
        app.get("/api/managers/<host>/node", callback=self.getNMInfo)
×
792
        app.get("/api/managers/<host>/island", callback=self.getDIMInfo)
×
793
        app.get("/api/managers/<host>/master", callback=self.getMMInfo)
×
794

795
    @daliuge_aware
6✔
796
    def createDataIsland(self, host):
6✔
797
        with RestClient(
×
798
            host=host, port=constants.DAEMON_DEFAULT_REST_PORT, timeout=10
799
        ) as c:
800
            c._post_json("/managers/island/start", bottle.request.body.read())
×
801
        self.dm.addDmHost(host)
×
802
        return {"islands": self.dm.dmHosts}
×
803

804
    @daliuge_aware
6✔
805
    def getDIMs(self):
6✔
806
        return {"islands": self.dm.dmHosts}
×
807

808
    @daliuge_aware
6✔
809
    def addDIM(self, dim):
6✔
810
        logger.debug("Adding DIM %s", dim)
×
811
        self.dm.addDmHost(dim)
×
812

813
    @daliuge_aware
6✔
814
    def removeDIM(self, dim):
6✔
815
        logger.debug("Removing dim %s", dim)
×
816
        self.dm.removeDmHost(dim)
×
817

818
    @daliuge_aware
6✔
819
    def getNMs(self):
6✔
820
        return {"nodes": [str(n) for n in self.dm.nodes]}
×
821

822
    @daliuge_aware
6✔
823
    def startNM(self, host):
6✔
824
        port = constants.DAEMON_DEFAULT_REST_PORT
×
825
        logger.debug("Sending NM start request to %s:%s", host, port)
×
826
        with RestClient(host=host, port=port, timeout=10) as c:
×
827
            return json.loads(c.POST("/managers/node/start").read())
×
828

829
    @daliuge_aware
6✔
830
    def stopNM(self, host):
6✔
831
        port = constants.DAEMON_DEFAULT_REST_PORT
×
832
        logger.debug("Sending NM stop request to %s:%s", host, port)
×
833
        with RestClient(host=host, port=port, timeout=10) as c:
×
834
            return json.loads(c.POST("/managers/node/stop").read())
×
835

836
    @daliuge_aware
6✔
837
    def addNM(self, host, node):
6✔
838
        port = constants.ISLAND_DEFAULT_REST_PORT
×
839
        logger.debug("Adding NM %s to DIM %s", node, host)
×
840
        with RestClient(host=host, port=port, timeout=10, url_prefix="/api") as c:
×
841
            return json.loads(
×
842
                c.POST(
843
                    f"/node/{node}",
844
                ).read()
845
            )
846

847
    @daliuge_aware
6✔
848
    def removeNM(self, host, node):
6✔
849
        port = constants.ISLAND_DEFAULT_REST_PORT
×
850
        logger.debug("Removing NM %s from DIM %s", node, host)
×
851
        with RestClient(host=host, port=port, timeout=10, url_prefix="/api") as c:
×
852
            return json.loads(c._DELETE(f"/node/{node}").read())
×
853

854
    @daliuge_aware
6✔
855
    def getNMInfo(self, host):
6✔
856
        port = constants.DAEMON_DEFAULT_REST_PORT
×
857
        logger.debug("Sending request %s:%s/managers/node", host, port)
×
858
        with RestClient(host=host, port=port, timeout=10) as c:
×
859
            return json.loads(c._GET("/managers/node").read())
×
860

861
    @daliuge_aware
6✔
862
    def getDIMInfo(self, host):
6✔
863
        with RestClient(
×
864
            host=host, port=constants.DAEMON_DEFAULT_REST_PORT, timeout=10
865
        ) as c:
866
            return json.loads(c._GET("/managers/island").read())
×
867

868
    @daliuge_aware
6✔
869
    def getMMInfo(self, host):
6✔
870
        with RestClient(
×
871
            host=host, port=constants.DAEMON_DEFAULT_REST_PORT, timeout=10
872
        ) as c:
873
            return json.loads(c._GET("/managers/master").read())
×
874

875
    def _getAllCMNodes(self):
6✔
876
        nodes = []
×
877
        for host in self.dm.dmHosts:
×
878
            h = Node(host)
×
879
            with DataIslandManagerClient(host=h.host, port=h.port) as dm:
×
880
                nodes += dm.nodes()
×
881
        return [str(n) for n in nodes]
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc