• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ICRAR / daliuge / 10363213063

13 Aug 2024 03:39AM UTC coverage: 79.63% (-0.09%) from 79.722%
10363213063

Pull #271

github

web-flow
Merge branch 'master' into liu-377
Pull Request #271: Liu 377

70 of 122 new or added lines in 13 files covered. (57.38%)

12 existing lines in 6 files now uncovered.

15375 of 19308 relevant lines covered (79.63%)

1.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.96
/daliuge-translator/dlg/dropmake/web/translator_rest.py
1
#
2
#    ICRAR - International Centre for Radio Astronomy Research
3
#    (c) UWA - The University of Western Australia, 2016
4
#    Copyright by UWA (in the framework of the ICRAR)
5
#    All rights reserved
6
#
7
#    This library is free software; you can redistribute it and/or
8
#    modify it under the terms of the GNU Lesser General Public
9
#    License as published by the Free Software Foundation; either
10
#    version 2.1 of the License, or (at your option) any later version.
11
#
12
#    This library is distributed in the hope that it will be useful,
13
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
14
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15
#    Lesser General Public License for more details.
16
#
17
#    You should have received a copy of the GNU Lesser General Public
18
#    License along with this library; if not, write to the Free Software
19
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston,
20
#    MA 02111-1307  USA
21
#
22
#    chen.wu@icrar.org
23
import argparse
2✔
24
import datetime
2✔
25
import json
2✔
26
import logging
2✔
27
import os
2✔
28
import pathlib
2✔
29
import signal
2✔
30
import sys
2✔
31
import threading
2✔
32
import time
2✔
33
import traceback
2✔
34
from enum import Enum
2✔
35
from json import JSONDecodeError
2✔
36
from typing import Union
2✔
37
from urllib.parse import urlparse
2✔
38

39
import uvicorn
2✔
40
from fastapi import FastAPI, Request, Body, Query, HTTPException, Form
2✔
41
from fastapi.responses import (
2✔
42
    HTMLResponse,
43
    StreamingResponse,
44
    JSONResponse,
45
    RedirectResponse,
46
)
47
from fastapi.staticfiles import StaticFiles
2✔
48
from fastapi.templating import Jinja2Templates
2✔
49
from jsonschema import validate, ValidationError
2✔
50
from pydantic import BaseModel
2✔
51

52
import dlg.constants
2✔
53
import dlg.dropmake.pg_generator
2✔
54
from dlg import restutils, common
2✔
55
from dlg.clients import CompositeManagerClient
2✔
56
from dlg.common.reproducibility.constants import (
2✔
57
    REPRO_DEFAULT,
58
    ALL_RMODES,
59
    ReproducibilityFlags,
60
)
61
from dlg.common.reproducibility.reproducibility import (
2✔
62
    init_lgt_repro_data,
63
    init_lg_repro_data,
64
    init_pgt_partition_repro_data,
65
    init_pgt_unroll_repro_data,
66
    init_pg_repro_data,
67
)
68

69
from dlg import utils
2✔
70
from dlg.common.deployment_methods import DeploymentMethods
2✔
71
from dlg.common.k8s_utils import check_k8s_env
2✔
72
from dlg.dropmake.lg import GraphException
2✔
73
from dlg.dropmake.pg_manager import PGManager
2✔
74
from dlg.dropmake.scheduler import SchedulerException
2✔
75
from dlg.dropmake.web.translator_utils import (
2✔
76
    file_as_string,
77
    lg_repo_contents,
78
    lg_path,
79
    lg_exists,
80
    pgt_exists,
81
    pgt_path,
82
    pgt_repo_contents,
83
    prepare_lgt,
84
    unroll_and_partition_with_params,
85
    make_algo_param_dict,
86
    get_mgr_deployment_methods,
87
    parse_mgr_url,
88
)
89
from dlg import constants
2✔
90

91
APP_DESCRIPTION = """
2✔
92
DALiuGE LG Web interface translates and deploys logical graphs.
93

94
The interface is split into two parts, refer to the main DALiuGE documentation
95
[DALiuGE documentation](https://daliuge.readthedocs.io/) for more information
96

97
### Original API
98
A set of endpoints are maintained for backwards compatibility
99

100
### New API
101
The new API mirrors that of the command line interface with a focus on body parameters, rather
102
than query parameters.
103

104
However, a new API for deployment is yet to be implemented
105

106
Original author: chen.wu@icrar.org
107
"""
108
APP_TAGS_METADATA = [
2✔
109
    {
110
        "name": "Original",
111
        "description": "The original DALiuGE LG_web endpoints.",
112
    },
113
    {
114
        "name": "Updated",
115
        "description": "The new post-centric style mirror of CLI interface.",
116
    },
117
]
118

119
file_location = pathlib.Path(__file__).parent.absolute()
2✔
120
templates = Jinja2Templates(directory=file_location)
2✔
121

122
app = FastAPI(
2✔
123
    title="DALiuGE LG Web Interface",
124
    description=APP_DESCRIPTION,
125
    openapi_tags=APP_TAGS_METADATA,
126
    contact={"name": "pritchardn", "email": "nicholas.pritchard@icrar.org"},
127
    version=dlg.version.version,
128
    license_info={
129
        "name": "LGPLv2+",
130
        "url": "https://www.gnu.org/licenses/old-licenses/lgpl-2.1.en.html",
131
    },
132
)
133
app.mount("/static", StaticFiles(directory=file_location), name="static")
2✔
134
logger = logging.getLogger(__name__)
2✔
135

136
post_sem = threading.Semaphore(1)
2✔
137
gen_pgt_sem = threading.Semaphore(1)
2✔
138

139
global lg_dir
140
global pgt_dir
141
global pg_mgr
142
LG_SCHEMA = json.loads(file_as_string("lg.graph.schema", package="dlg.dropmake"))
2✔
143

144

145
@app.post("/jsonbody", tags=["Original"])
2✔
146
def jsonbody_post_lg(
2✔
147
    lg_name: str = Form(description="The name of the lg to use"),
148
    lg_content: str = Form(description="The content of the lg to save to file"),
149
    rmode: str = Form(default=str(REPRO_DEFAULT.value)),
150
):
151
    """
152
    Post a logical graph JSON.
153
    """
154
    if not lg_exists(lg_dir, lg_name):
2✔
155
        raise HTTPException(
2✔
156
            status_code=404,
157
            detail="Creating new graphs through this API is not supported",
158
        )
159
    try:
2✔
160
        lg_content = json.loads(lg_content)
2✔
161
    except JSONDecodeError:
×
162
        logger.warning("Could not decode lgt %s", lg_name)
×
163
    lg_content = init_lgt_repro_data(lg_content, rmode)
2✔
164
    lg_path = pathlib.Path(lg_dir, lg_name)
2✔
165
    post_sem.acquire()
2✔
166
    try:
2✔
167
        with open(lg_path, "w") as lg_file:
2✔
168
            lg_file.write(json.dumps(lg_content))
2✔
169
    except Exception as e:
×
170
        raise HTTPException(
×
171
            status_code=500,
172
            detail="Failed to save logical graph {0}:{1}".format(lg_name, str(e)),
173
        )
174
    finally:
175
        post_sem.release()
2✔
176

177

178
@app.get("/jsonbody", tags=["Original"])
2✔
179
def jsonbody_get_lg(
2✔
180
    lg_name: str = Query(
181
        default=None, description="The name of the lg to load from file"
182
    )
183
):
184
    """
185
    Returns JSON representation of saved logical graph.
186
    """
187
    if lg_name is None or len(lg_name) == 0:
2✔
188
        all_lgs = lg_repo_contents(lg_dir)
2✔
189
        try:
2✔
190
            first_dir = next(iter(all_lgs))
2✔
191
            first_lg = first_dir + "/" + all_lgs[first_dir][0]
2✔
192
            lg_name = first_lg
2✔
193
        except StopIteration:
×
194
            return "Nothing found in dir {0}".format(lg_path)
×
195
    if lg_exists(lg_dir, lg_name):
2✔
196
        # print "Loading {0}".format(name)
197
        lgp = lg_path(lg_dir, lg_name)
2✔
198
        with open(lgp, "r") as f:
2✔
199
            data = json.load(f)
2✔
200
        return JSONResponse(data)
2✔
201
    else:
202
        raise HTTPException(
2✔
203
            status_code=404,
204
            detail="JSON graph {0} not found\n".format(lg_name),
205
        )
206

207

208
@app.get("/pgt_jsonbody", response_class=JSONResponse, tags=["Original"])
2✔
209
def jsonbody_get_pgt(
2✔
210
    pgt_name: str = Query(description="The name of the pgt to load from file"),
211
):
212
    """
213
    Return JSON representation of a physical graph template
214
    """
215
    if pgt_exists(pgt_dir, pgt_name):
2✔
216
        # print "Loading {0}".format(name)
217
        pgt = pgt_path(pgt_dir, pgt_name)
2✔
218
        with open(pgt, "r") as f:
2✔
219
            data = f.read()
2✔
220
        return JSONResponse(data)
2✔
221
    else:
222
        raise HTTPException(
2✔
223
            status_code=404, detail="JSON graph {0} not found".format(pgt_name)
224
        )
225

226

227
@app.get("/pg_viewer", response_class=HTMLResponse, tags=["Original"])
2✔
228
def load_pg_viewer(
2✔
229
    request: Request,
230
    pgt_view_name: str = Query(
231
        default=None, description="The string of the type of view to provide"
232
    ),
233
):
234
    """
235
    Loads the physical graph viewer
236
    """
237
    if pgt_view_name is None or len(pgt_view_name) == 0:
2✔
238
        all_pgts = pgt_repo_contents(pgt_dir)
2✔
239
        try:
2✔
240
            first_dir = next(iter(all_pgts))
2✔
241
            pgt_view_name = first_dir + os.sep + all_pgts[first_dir][0]
2✔
242
        except StopIteration:
×
243
            pgt_view_name = None
×
244
    if pgt_exists(pgt_dir, pgt_view_name):
2✔
245
        tpl = templates.TemplateResponse(
2✔
246
            "pg_viewer.html",
247
            {
248
                "request": request,
249
                "pgt_view_json_name": pgt_view_name,
250
                "partition_info": None,
251
                "title": "Physical Graph Template",
252
                "error": None,
253
            },
254
        )
255
        return tpl
2✔
256
    else:
257
        raise HTTPException(
2✔
258
            status_code=404,
259
            detail="Physical graph template view {0} not found {1}".format(
260
                pgt_view_name, pgt_dir
261
            ),
262
        )
263

264

265
@app.get("/show_gantt_chart", response_class=HTMLResponse, tags=["Original"])
2✔
266
def show_gantt_chart(
2✔
267
    request: Request,
268
    pgt_id: str = Query(
269
        description="The pgt_id used to internally reference this graph"
270
    ),
271
):
272
    """
273
    Interface to show the gantt chart
274
    """
275
    tpl = templates.TemplateResponse(
2✔
276
        "matrix_vis.html",
277
        {
278
            "request": request,
279
            "pgt_view_json_name": pgt_id,
280
            "vis_action": "pgt_gantt_chart",
281
        },
282
    )
283
    return tpl
2✔
284

285

286
@app.get("/pgt_gantt_chart", tags=["Original"])
2✔
287
def get_gantt_chart(
2✔
288
    pgt_id: str = Query(
289
        description="The pgt_id used to internally reference this graph"
290
    ),
291
):
292
    """
293
    Interface to retrieve a Gantt Chart matrix associated with a PGT
294
    """
295
    try:
×
296
        ret = pg_mgr.get_gantt_chart(pgt_id)
×
297
        return ret
×
298
    except GraphException as ge:
×
299
        raise HTTPException(
×
300
            status_code=500,
301
            detail="Failed to generate Gantt chart for {0}: {1}".format(pgt_id, ge),
302
        )
303

304

305
@app.get("/show_schedule_mat", response_class=HTMLResponse, tags=["Original"])
2✔
306
def show_schedule_matrix(
2✔
307
    request: Request,
308
    pgt_id: str = Query(
309
        description="The pgt_id used to internally reference this graph"
310
    ),
311
):
312
    """
313
    Interface to show the schedule mat
314
    """
315
    tpl = templates.TemplateResponse(
2✔
316
        "matrix_vis.html",
317
        {
318
            "request": request,
319
            "pgt_view_json_name": pgt_id,
320
            "vis_action": "pgt_schedule_mat",
321
        },
322
    )
323
    return tpl
2✔
324

325

326
@app.get("/get_schedule_matrices", tags=["Original"])
2✔
327
def get_schedule_matrices(
2✔
328
    pgt_id: str = Query(
329
        description="The pgt_id used to internally reference this graph"
330
    ),
331
):
332
    """
333
    Interface to return all schedule matrices for a single pgt_id
334
    """
335
    try:
×
336
        ret = pg_mgr.get_schedule_matrices(pgt_id)
×
337
        return ret
×
338
    except Exception as e:
×
339
        raise HTTPException(
×
340
            status_code=500,
341
            detail="Failed to get schedule matrices for {0}: {1}".format(pgt_id, e),
342
        )
343

344

345
# ------ Graph deployment methods ------ #
346

347

348
@app.get("/gen_pgt", tags=["Original"])
2✔
349
def gen_pgt(
2✔
350
    request: Request,
351
    lg_name: str = Query(
352
        description="If present, translator will attempt to load this lg from file"
353
    ),
354
    rmode: str = Query(
355
        default=str(REPRO_DEFAULT.value),
356
        description="Reproducibility mode setting level of provenance tracking. Refer to main documentation for more information",
357
    ),
358
    test: str = Query(
359
        default="false",
360
        description="If 'true', will replace all apps with sleeps",
361
    ),
362
    num_par: int = Query(
363
        default=1, description="The number of data partitions in the graph"
364
    ),
365
    algo: str = Query(
366
        default="metis",
367
        description="The scheduling algorithm used when unrolling the graph",
368
    ),
369
    num_islands: int = Query(
370
        default=0, description="The number of data-islands to partition"
371
    ),
372
    par_label: str = Query(
373
        default="Partition",
374
        description="The label prefixed to each generated partition",
375
    ),
376
):
377
    if not lg_exists(lg_dir, lg_name):
2✔
378
        raise HTTPException(
2✔
379
            status_code=404,
380
            detail="Logical graph '{0}' not found".format(lg_name),
381
        )
382
    try:
2✔
383
        lgt = prepare_lgt(lg_path(lg_dir, lg_name), rmode)
2✔
384
        test = test.lower() == "true"
2✔
385
        pgt = unroll_and_partition_with_params(
2✔
386
            lgt,
387
            test,
388
            algo,
389
            num_par,
390
            num_islands,
391
            par_label,
392
            request.query_params.items(),
393
        )
394
        num_partitions = 0  # pgt._num_parts;
2✔
395

396
        pgt_id = pg_mgr.add_pgt(pgt, lg_name)
2✔
397

398
        part_info = " - ".join(
2✔
399
            ["{0}:{1}".format(k, v) for k, v in pgt.result().items()]
400
        )
401
        tpl = templates.TemplateResponse(
2✔
402
            "pg_viewer.html",
403
            {
404
                "request": request,
405
                "pgt_view_json_name": pgt_id,
406
                "partition_info": part_info,
407
                "title": "Physical Graph Template%s"
408
                % ("" if num_partitions == 0 else "Partitioning"),
409
                "error": None,
410
            },
411
        )
412
        return tpl
2✔
413
    except GraphException as ge:
2✔
414
        logger.info("Graph Exception")
×
415
        raise HTTPException(
×
416
            status_code=500,
417
            detail="Invalid Logical Graph {1}: {0}".format(str(ge), lg_name),
418
        )
419
    except SchedulerException as se:
2✔
420
        logger.info("Schedule Exception")
×
421
        raise HTTPException(
×
422
            status_code=500,
423
            detail="Graph scheduling exception {1}: {0}".format(str(se), lg_name),
424
        )
425
    except Exception:
2✔
426
        logger.info("Partition / Other exception")
2✔
427
        trace_msg = traceback.format_exc()
2✔
428
        raise HTTPException(
2✔
429
            status_code=500,
430
            detail="Graph partition exception {1}: {0}".format(trace_msg, lg_name),
431
        )
432

433

434
@app.post("/gen_pgt", response_class=HTMLResponse, tags=["Original"])
2✔
435
async def gen_pgt_post(
2✔
436
    request: Request,
437
    lg_name: str = Form(
438
        description="If present, translator will attempt to load this lg from file"
439
    ),
440
    json_data: str = Form(description="The graph data used as the graph if supplied"),
441
    rmode: str = Form(
442
        str(REPRO_DEFAULT.value),
443
        description="Reproducibility mode setting level of provenance tracking. Refer to main documentation for more information",
444
    ),
445
    test: str = Form(
446
        default="false",
447
        description="If 'true', will replace all apps with sleeps",
448
    ),
449
    algo: str = Form(
450
        default="metis",
451
        description="The scheduling algorithm used when unrolling the graph",
452
    ),
453
    num_par: int = Form(
454
        default=1, description="The number of data partitions in the graph"
455
    ),
456
    num_islands: int = Form(
457
        default=0, description="The number of data-islands to partition"
458
    ),
459
    par_label: str = Form(
460
        default="Partition",
461
        description="The label prefixed to each generated partition",
462
    ),
463
    min_goal: Union[int, None] = Form(default=None),
464
    ptype: Union[int, None] = Form(default=None),
465
    max_load_imb: Union[int, None] = Form(default=None),
466
    max_cpu: Union[int, None] = Form(default=None),
467
    time_greedy: Union[int, None] = Form(default=None),
468
    deadline: Union[int, None] = Form(default=None),
469
    topk: Union[int, None] = Form(default=None),
470
    swarm_size: Union[int, None] = Form(default=None),
471
    max_mem: Union[int, None] = Form(default=None),
472
):
473
    """
474
    Translating Logical Graphs to Physical Graphs.
475
    Differs from get_pgt above by the fact that the logical graph data is POSTed
476
    to this route in a HTTP form, whereas gen_pgt loads the logical graph data
477
    from a local file
478
    """
479
    test = test.lower() == "true"
2✔
480
    try:
2✔
481
        logical_graph = json.loads(json_data)
2✔
482
        try:
2✔
483
            validate(logical_graph, LG_SCHEMA)
2✔
484
        except ValidationError as ve:
×
485
            error = "Validation Error {1}: {0}".format(str(ve), lg_name)
×
486
            logger.error(error)
×
487
            # raise HTTPException(status_code=500, detail=error)
488
        logical_graph = prepare_lgt(logical_graph, rmode)
2✔
489
        # LG -> PGT
490
        # TODO: Warning: I dislike doing it this way with a passion, however without changing the tests/ usage of the api getting all form fields is difficult.
491
        algo_params = make_algo_param_dict(
2✔
492
            min_goal,
493
            ptype,
494
            max_load_imb,
495
            max_cpu,
496
            time_greedy,
497
            deadline,
498
            topk,
499
            swarm_size,
500
            max_mem,
501
        )
502
        pgt = unroll_and_partition_with_params(
2✔
503
            logical_graph,
504
            test,
505
            algo,
506
            num_par,
507
            num_islands,
508
            par_label,
509
            algo_params,
510
        )
511
        pgt_id = pg_mgr.add_pgt(pgt, lg_name)
2✔
512
        part_info = " - ".join(
2✔
513
            ["{0}:{1}".format(k, v) for k, v in pgt.result().items()]
514
        )
515
        tpl = templates.TemplateResponse(
2✔
516
            "pg_viewer.html",
517
            {
518
                "request": request,
519
                "pgt_view_json_name": pgt_id,
520
                "partition_info": part_info,
521
                "title": "Physical Graph Template%s"
522
                % ("" if num_par == 0 else "Partitioning"),
523
                "error": None,
524
            },
525
        )
526
        return tpl
2✔
527
    except GraphException as ge:
×
528
        logger.info("GRAPH EXCEPTION")
×
529
        raise HTTPException(
×
530
            status_code=500,
531
            detail="Invalid Logical Graph {1}: {0}".format(str(ge), lg_name),
532
        )
533
    except SchedulerException as se:
×
534
        logger.info("SCHEDULE EXCEPTION")
×
535
        raise HTTPException(
×
536
            status_code=500,
537
            detail="Graph scheduling exception {1}: {0}".format(str(se), lg_name),
538
        )
539
    except Exception:
×
540
        logger.info("OTHER EXCEPTION")
×
541
        trace_msg = traceback.format_exc()
×
542
        raise HTTPException(
×
543
            status_code=500,
544
            detail="Graph partition exception {1}: {0}".format(trace_msg, lg_name),
545
        )
546

547

548
@app.get("/gen_pg", response_class=JSONResponse, tags=["Original"])
2✔
549
def gen_pg(
2✔
550
    request: Request,
551
    pgt_id: str = Query(
552
        description="The pgt_id used to internally reference this graph"
553
    ),
554
    dlg_mgr_deploy: Union[str, None] = Query(
555
        default=None,
556
        description="If supplied, this endpoint will attempt to deploy the graph is the dlg_pgt_url or dlg_mgr_host/port endpoint",
557
    ),
558
    dlg_mgr_url: Union[str, None] = Query(
559
        default=None, description="The DALiuGE manager to deploy the graph to"
560
    ),
561
    dlg_mgr_host: Union[str, None] = Query(
562
        default=None,
563
        description="The DALiuGE manager base IP to deploy the graph to",
564
    ),
565
    dlg_mgr_port: Union[int, None] = Query(
566
        default=None,
567
        description="The DALiuGE manager port to deploy the graph to",
568
    ),
569
    tpl_nodes_len: int = Query(
570
        default=0,
571
        description="The number of nodes to unroll the graph partition for",
572
    ),
573
):
574
    """
575
    RESTful interface to convert a PGT(P) into PG by mapping
576
    PGT(P) onto a given set of available resources
577
    """
578
    mprefix = ""
×
579
    mport = 443
×
580
    if dlg_mgr_url is not None:
×
581
        mparse = urlparse(dlg_mgr_url)
×
582
        try:
×
583
            (mhost, mport) = mparse.netloc.split(":")
×
584
            mport = int(mport)
×
585
        except:
×
586
            logger.debug("URL parsing error of: %s", dlg_mgr_url)
×
587
            mhost = mparse.netloc
×
588
            if mparse.scheme == "http":
×
589
                mport = 80
×
590
            elif mparse.scheme == "https":
×
591
                mport = 443
×
592
        mprefix = mparse.path
×
593
        if mprefix.endswith("/"):
×
594
            mprefix = mprefix[:-1]
×
595
    else:
596
        mhost = dlg_mgr_host
×
597
        if dlg_mgr_port is not None:
×
598
            mport = dlg_mgr_port
×
599
        else:
600
            mport = 443
×
601

602
    logger.debug("Manager host: %s", mhost)
×
603
    logger.debug("Manager port: %s", mport)
×
604
    logger.debug("Manager prefix: %s", mprefix)
×
605
    # if the 'deploy' checkbox is not checked,
606
    # then the form submission will NOT contain a 'dlg_mgr_deploy' field
607
    deploy = dlg_mgr_deploy is not None
×
608
    pgtp = pg_mgr.get_pgt(pgt_id)
×
609
    if pgtp is None:
×
610
        raise HTTPException(
×
611
            status_code=404,
612
            detail="PGT(P) with id {0} not found in the Physical Graph Manager".format(
613
                pgt_id
614
            ),
615
        )
616

617
    pgtpj = pgtp._gojs_json_obj
×
618
    reprodata = pgtp.reprodata
×
619
    num_partitions = len(list(filter(lambda n: "isGroup" in n, pgtpj["nodeDataArray"])))
×
620

621
    if mhost is None:
×
622
        if tpl_nodes_len > 0:
×
623
            nnodes = num_partitions
×
624
        else:
625
            raise HTTPException(
×
626
                status_code=500,
627
                detail="Must specify DALiuGE manager host or tpl_nodes_len",
628
            )
629

630
        pg_spec = pgtp.to_pg_spec([], ret_str=False, tpl_nodes_len=nnodes)
×
631
        pg_spec.append(reprodata)
×
632
        return JSONResponse(pg_spec)
×
633
    try:
×
634
        mgr_client = CompositeManagerClient(
×
635
            host=mhost, port=mport, url_prefix=mprefix, timeout=30
636
        )
637
        # 1. get a list of nodes
638
        node_list = [f"{mhost}:{mport}"] + mgr_client.nodes()
×
639
        logger.debug("Calling mapping to nodes: %s", node_list)
×
640
        # 2. mapping PGTP to resources (node list)
641
        pg_spec = pgtp.to_pg_spec(node_list, ret_str=False)
×
642

643
        if deploy:
×
644
            dt = datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S.%f")
×
645
            ssid = "{0}_{1}".format(
×
646
                pgt_id.split(".graph")[0].split("_pgt")[0].split("/")[-1], dt
647
            )
648
            mgr_client.create_session(ssid)
×
649
            # print "session created"
650
            completed_uids = common.get_roots(pg_spec)
×
651
            pg_spec.append(reprodata)
×
652
            mgr_client.append_graph(ssid, pg_spec)
×
653
            # print "graph appended"
654
            mgr_client.deploy_session(ssid, completed_uids=completed_uids)
×
655
            # mgr_client.deploy_session(ssid, completed_uids=[])
656
            # print "session deployed"
657
            # 3. redirect to the master drop manager
658
            return RedirectResponse(
×
659
                "http://{0}:{1}{2}/session?sessionId={3}".format(
660
                    mhost, mport, mprefix, ssid
661
                )
662
            )
663
        else:
664
            return JSONResponse(pg_spec)
×
665
    except restutils.RestClientException as re:
×
666
        raise HTTPException(
×
667
            status_code=500,
668
            detail="Failed to interact with DALiUGE Drop Manager: {0}".format(re),
669
        )
670
    except Exception as ex:
×
671
        logger.error(traceback.format_exc())
×
672
        raise HTTPException(
×
673
            status_code=500,
674
            detail="Failed to deploy physical graph: {0}".format(ex),
675
        )
676

677

678
@app.post("/gen_pg_spec", tags=["Original"])
2✔
679
def gen_pg_spec(
2✔
680
    pgt_id: str = Body(
681
        description="The pgt_id used to internally reference this graph"
682
    ),
683
    node_list: list = Body(
684
        default=[],
685
        description="The list of daliuge nodes to submit the graph to",
686
    ),
687
    manager_host: str = Body(
688
        description="The address of the manager host where the graph will be deployed to."
689
    ),
690
    tpl_nodes_len: int = Body(
691
        default=1,
692
        description="The number of nodes requested by the graph",
693
    ),
694
):
695
    """
696
    Interface to convert a PGT(P) into pg_spec
697
    """
698
    try:
×
NEW
699
        if manager_host.find(":") == -1:
×
NEW
700
            manager_host = f"{manager_host}:{constants.ISLAND_DEFAULT_REST_PORT}"
×
UNCOV
701
        logger.debug("pgt_id: %s", str(pgt_id))
×
702
        # logger.debug("node_list: %s", str(node_list))
703
    except Exception as ex:
×
704
        logger.error("%s", traceback.format_exc())
×
705
        raise HTTPException(
×
706
            status_code=500,
707
            detail="Unable to parse json body of request for pg_spec: {0}".format(ex),
708
        )
709
    pgtp = pg_mgr.get_pgt(pgt_id)
×
710
    if pgtp is None:
×
711
        raise HTTPException(
×
712
            status_code=404,
713
            detail="PGT(P) with id {0} not found in the Physical Graph Manager".format(
714
                pgt_id
715
            ),
716
        )
717
    if node_list is None:
×
718
        raise HTTPException(status_code=500, detail="Must specify DALiuGE nodes list")
×
719

720
    try:
×
721
        logger.debug("Calling mapping to host: %s", [manager_host] + node_list)
×
722
        pg_spec = pgtp.to_pg_spec(
×
723
            [manager_host] + node_list,
724
            tpl_nodes_len=tpl_nodes_len,
725
            ret_str=False,
726
        )
727
        root_uids = common.get_roots(pg_spec)
×
728
        logger.debug("Root UIDs: %s", list(root_uids))
×
729
        response = JSONResponse(
×
730
            json.dumps(
731
                {"pg_spec": pg_spec, "root_uids": list(root_uids)},
732
            ),
733
        )
734
        return response
×
735
    except Exception as ex:
×
736
        logger.error("%s", traceback.format_exc())
×
737
        raise HTTPException(
×
738
            status_code=500,
739
            detail="Failed to generate pg_spec: {0}".format(ex),
740
        )
741

742

743
@app.get("/gen_pg_helm", tags=["Original"])
2✔
744
def gen_pg_helm(
2✔
745
    pgt_id: str = Body(
746
        description="The pgt_id used to internally reference this graph"
747
    ),
748
):
749
    """
750
    Deploys a PGT as a K8s helm chart.
751
    """
752
    # Get pgt_data
753
    from ...deploy.start_helm_cluster import start_helm
×
754

755
    pgtp = pg_mgr.get_pgt(pgt_id)
×
756
    if pgtp is None:
×
757
        raise HTTPException(
×
758
            status_code=404,
759
            detail="PGT(P) with id {0} not found in the Physical Graph Manager".format(
760
                pgt_id
761
            ),
762
        )
763

764
    pgtpj = pgtp._gojs_json_obj
×
765
    logger.info("PGTP: %s", pgtpj)
×
766
    num_partitions = len(list(filter(lambda n: "isGroup" in n, pgtpj["nodeDataArray"])))
×
767
    # Send pgt_data to helm_start
768
    try:
×
769
        start_helm(pgtp, num_partitions, pgt_dir)
×
770
    except restutils.RestClientException as ex:
×
771
        logger.error(traceback.format_exc())
×
772
        raise HTTPException(
×
773
            status_code=500,
774
            detail="Failed to deploy physical graph: {0}".format(ex),
775
        )
776
    # TODO: Not sure what to redirect to yet
777
    return "Inspect your k8s dashboard for deployment status"
×
778

779

780
# ------ Methods from translator CLI ------ #
781

782

783
class AlgoParams(BaseModel):
2✔
784
    """
785
    Set of scheduling algorithm parameters, not all apply to all algorithms.
786
    Refer to main documentation for more information.
787
    """
788

789
    min_goal: Union[int, None] = None
2✔
790
    ptype: Union[int, None] = None
2✔
791
    max_load_imb: Union[int, None] = None
2✔
792
    max_cpu: Union[int, None] = None
2✔
793
    time_greedy: Union[int, None] = None
2✔
794
    deadline: Union[int, None] = None
2✔
795
    topk: Union[int, None] = None
2✔
796
    swarm_size: Union[int, None] = None
2✔
797
    max_mem: Union[int, None] = None
2✔
798

799

800
class KnownAlgorithms(str, Enum):
2✔
801
    """
802
    List of known scheduling algorithms.
803
    Will need to be updated manually.
804
    """
805

806
    ALGO_NONE = ("none",)
2✔
807
    ALGO_METIS = ("metis",)
2✔
808
    ALGO_MY_SARKAR = ("mysarkar",)
2✔
809
    ALGO_MIN_NUM_PARTS = ("min_num_parts",)
2✔
810
    ALGO_PSO = "pso"
2✔
811

812

813
def load_graph(graph_content: str, graph_name: str):
2✔
814
    out_graph = {}
2✔
815
    if graph_content is not None and graph_name is not None:
2✔
816
        raise HTTPException(
2✔
817
            status_code=400,
818
            detail="Need to supply either an name or content but not both",
819
        )
820
    if not lg_exists(lg_dir, graph_name):
2✔
821
        if not graph_content:
2✔
822
            raise HTTPException(status_code=400, detail="LG content is nonexistent")
2✔
823
        else:
824
            try:
2✔
825
                out_graph = json.loads(graph_content)
2✔
826
            except JSONDecodeError as jerror:
2✔
827
                logger.error(jerror)
2✔
828
                raise HTTPException(status_code=400, detail="LG content is malformed")
2✔
829
    else:
830
        lgp = lg_path(lg_dir, graph_name)
2✔
831
        with open(lgp, "r") as f:
2✔
832
            try:
2✔
833
                out_graph = json.load(f)
2✔
834
            except JSONDecodeError as jerror:
×
835
                logger.error(jerror)
×
836
                raise HTTPException(
×
837
                    status_code=500, detail="LG graph on file cannot be loaded"
838
                )
839
    return out_graph
2✔
840

841

842
@app.post("/lg_fill", response_class=JSONResponse, tags=["Updated"])
2✔
843
def lg_fill(
2✔
844
    lg_name: str = Form(
845
        default=None,
846
        description="If present, translator will attempt to load this lg from file",
847
    ),
848
    lg_content: str = Form(
849
        default=None,
850
        description="If present, translator will use this string as the graph content",
851
    ),
852
    parameters: str = Form(
853
        default="{}", description="JSON key: value store of graph paramter"
854
    ),
855
    rmode: str = Form(
856
        REPRO_DEFAULT.name,
857
        enum=[roption.name for roption in [ReproducibilityFlags.NOTHING] + ALL_RMODES],
858
        description="Reproducibility mode setting level of provenance tracking. Refer to main documentation for more information",
859
    ),
860
):
861
    """
862
    Will fill a logical graph by replacing fields with supplied parameters.
863

864
    One of lg_name or lg_content, but not both, must be specified.
865
    """
866
    lg_graph = load_graph(lg_content, lg_name)
2✔
867
    try:
2✔
868
        params = json.loads(parameters)
2✔
869
    except JSONDecodeError as jerror:
2✔
870
        logger.error(jerror)
2✔
871
        raise HTTPException(status_code=400, detail="Parameter string is invalid")
2✔
872
    output_graph = dlg.dropmake.pg_generator.fill(lg_graph, params)
2✔
873
    output_graph = init_lg_repro_data(init_lgt_repro_data(output_graph, rmode))
2✔
874
    return JSONResponse(output_graph)
2✔
875

876

877
@app.post("/unroll", response_class=JSONResponse, tags=["Updated"])
2✔
878
def lg_unroll(
2✔
879
    lg_name: str = Form(
880
        default=None,
881
        description="If present, translator will attempt to load this lg from file",
882
    ),
883
    lg_content: str = Form(
884
        default=None,
885
        description="If present, translator will use this string as the graph content",
886
    ),
887
    oid_prefix: str = Form(
888
        default=None, description="ID prefix appended to unrolled nodes"
889
    ),
890
    zero_run: bool = Form(
891
        default=None,
892
        description="If true, apps will be replaced with sleep apps",
893
    ),
894
    default_app: str = Form(
895
        default=None,
896
        description="If set, will change all apps to this app class",
897
    ),
898
):
899
    """
900
    Will unroll a logical graph into a physical graph template.
901

902
    One of lg_name or lg_content, but not both, needs to be specified.
903
    """
904
    lg_graph = load_graph(lg_content, lg_name)
2✔
905
    pgt = dlg.dropmake.pg_generator.unroll(lg_graph, oid_prefix, zero_run, default_app)
2✔
906
    pgt = init_pgt_unroll_repro_data(pgt)
2✔
907
    return JSONResponse(pgt)
2✔
908

909

910
@app.post("/partition", response_class=JSONResponse, tags=["Updated"])
2✔
911
def pgt_partition(
2✔
912
    pgt_name: str = Form(
913
        default=None,
914
        description="If specified, translator will attempt to load graph from file",
915
    ),
916
    pgt_content: str = Form(
917
        default=None,
918
        description="If present, translator will use this string as the graph content",
919
    ),
920
    num_partitions: int = Form(
921
        default=1,
922
        description="Number of partitions to unroll the graph across",
923
    ),
924
    num_islands: int = Form(
925
        default=1, description="Number of data islands to partition for"
926
    ),
927
    algorithm: KnownAlgorithms = Form(
928
        default="metis", description="The selected scheduling algorithm"
929
    ),
930
    algo_params: AlgoParams = Form(
931
        default=AlgoParams(),
932
        description="The parameter values passed to the scheduling algorithm. Required parameters varies per algorithm.",
933
    ),
934
):
935
    """
936
    Uses scheduling algorithms to partition an unrolled pgt across several partitions and data islands.
937

938
    One of pgt_name or pgt_content, but not both, must be specified.
939
    """
940
    graph = load_graph(pgt_content, pgt_name)
2✔
941
    reprodata = {}
2✔
942
    if not graph[-1].get("oid"):
2✔
943
        reprodata = graph.pop()
2✔
944
    pgt = dlg.dropmake.pg_generator.partition(
2✔
945
        graph, algorithm, num_partitions, num_islands, algo_params.dict()
946
    )
947
    pgt.append(reprodata)
2✔
948
    pgt = init_pgt_partition_repro_data(pgt)
2✔
949
    return JSONResponse(pgt)
2✔
950

951

952
@app.post("/unroll_and_partition", response_class=JSONResponse, tags=["Updated"])
2✔
953
def lg_unroll_and_partition(
2✔
954
    lg_name: str = Form(
955
        default=None,
956
        description="If present, translator will attempt to load this lg from file",
957
    ),
958
    lg_content: str = Form(
959
        default=None,
960
        description="If present, translator will use this string as the graph content",
961
    ),
962
    oid_prefix: str = Form(
963
        default=None, description="ID prefix appended to unrolled nodes"
964
    ),
965
    zero_run: bool = Form(
966
        default=None,
967
        description="If true, apps will be replaced with sleep apps",
968
    ),
969
    default_app: str = Form(
970
        default=None,
971
        description="If set, will change all apps to this app class",
972
    ),
973
    num_partitions: int = Form(
974
        default=1,
975
        description="Number of partitions to unroll the graph across",
976
    ),
977
    num_islands: int = Form(
978
        default=1, description="Number of data islands to partition for"
979
    ),
980
    algorithm: KnownAlgorithms = Form(
981
        default="metis", description="The selected scheduling algorithm"
982
    ),
983
    algo_params: AlgoParams = Form(
984
        default=AlgoParams(),
985
        description="The parameter values passed to the scheduling algorithm. Required parameters varies per algorithm.",
986
    ),
987
):
988
    """
989
    Unrolls and partitions a logical graph with the provided various parameters.
990

991
    One of lg_name and lg_content, but not both, must be specified.
992
    """
993
    lg_graph = load_graph(lg_content, lg_name)
2✔
994
    pgt = dlg.dropmake.pg_generator.unroll(lg_graph, oid_prefix, zero_run, default_app)
2✔
995
    pgt = init_pgt_unroll_repro_data(pgt)
2✔
996
    reprodata = pgt.pop()
2✔
997
    pgt = dlg.dropmake.pg_generator.partition(
2✔
998
        pgt, algorithm, num_partitions, num_islands, algo_params.dict()
999
    )
1000
    pgt.append(reprodata)
2✔
1001
    pgt = init_pgt_partition_repro_data(pgt)
2✔
1002
    return JSONResponse(pgt)
2✔
1003

1004

1005
@app.post("/map", response_class=JSONResponse, tags=["Updated"])
2✔
1006
def pgt_map(
2✔
1007
    pgt_name: str = Form(
1008
        default=None,
1009
        description="If supplied, this graph will attempted to be loaded from disk on the server",
1010
    ),
1011
    pgt_content: str = Form(
1012
        default=None, description="If supplied, this is the graph content"
1013
    ),
1014
    nodes: str = Form(
1015
        default=None,
1016
        description="Comma separated list of IP addrs e.g. 'localhost, 127.0.0.2'",
1017
    ),
1018
    num_islands: int = Form(
1019
        default=1, description="The number of data islands to launch"
1020
    ),
1021
    co_host_dim: bool = Form(
1022
        default=True,
1023
        description="Whether to launch data island manager processes alongside node-managers",
1024
    ),
1025
    host: str = Form(
1026
        default=None,
1027
        description="If present, will attempt to query this address for node-managers",
1028
    ),
1029
    port: int = Form(
1030
        default=dlg.constants.ISLAND_DEFAULT_REST_PORT,
1031
        description="Port used by HOST manager process",
1032
    ),
1033
):
1034
    """
1035
    Maps physical graph templates to node resources.
1036
    """
1037
    if not nodes:
2✔
1038
        client = CompositeManagerClient(host, port, timeout=10)
2✔
1039
        nodes = [f"{host}:{port}"] + client.nodes()
2✔
1040
    if len(nodes) <= num_islands:
2✔
1041
        logger.error("Not enough nodes to fill all islands")
×
1042
        HTTPException(
×
1043
            status_code=500,
1044
            detail="#nodes (%d) should be larger than the number of islands (%d)"
1045
            % (len(nodes), num_islands),
1046
        )
1047
    pgt = load_graph(pgt_content, pgt_name)
2✔
1048
    reprodata = {}
2✔
1049
    if not pgt[-1].get("oid"):
2✔
1050
        reprodata = pgt.pop()
2✔
1051
    logger.info(nodes)
2✔
1052
    pg = dlg.dropmake.pg_generator.resource_map(pgt, nodes, num_islands, co_host_dim)
2✔
1053
    pg.append(reprodata)
2✔
1054
    pg = init_pg_repro_data(pg)
2✔
1055
    return JSONResponse(pg)
2✔
1056

1057

1058
@app.get("/api/submission_method")
2✔
1059
def get_submission_method(
2✔
1060
    dlg_mgr_url: str = Query(
1061
        default=None,
1062
        description="If present, translator will query this URL for its deployment options",
1063
    ),
1064
    dlg_mgr_host: str = Query(
1065
        default=None,
1066
        description="If present with mport and mprefix, will be the base host for deployment",
1067
    ),
1068
    dlg_mgr_port: int = Query(default=None, description=""),
1069
):
1070
    logger.debug("Received submission_method request")
2✔
1071
    if dlg_mgr_url:
2✔
1072
        mhost, mport, mprefix = parse_mgr_url(dlg_mgr_url)
×
1073
    else:
1074
        mhost = dlg_mgr_host
2✔
1075
        mport = dlg_mgr_port
2✔
1076
        mprefix = ""
2✔
1077
    available_methods = []
2✔
1078
    if check_k8s_env():
2✔
1079
        available_methods.append(DeploymentMethods.HELM)
×
1080
    if mhost is not None:
2✔
1081
        available_methods = get_mgr_deployment_methods(mhost, mport, mprefix)
×
1082
    logger.debug("Methods available: %s", available_methods)
2✔
1083
    return {"methods": available_methods}
2✔
1084

1085

1086
@app.get(
2✔
1087
    "/",
1088
    response_class=HTMLResponse,
1089
    description="The page used to view physical graphs",
1090
)
1091
def index(request: Request):
2✔
1092
    tpl = templates.TemplateResponse(
×
1093
        "pg_viewer.html",
1094
        {
1095
            "request": request,
1096
            "pgt_view_json_name": None,
1097
            "partition_info": None,
1098
            "title": "Physical Graph Template",
1099
            "error": None,
1100
        },
1101
    )
1102
    return tpl
×
1103

1104

1105
@app.head("/")
2✔
1106
def liveliness():
2✔
1107
    return {}
×
1108

1109

1110
def run(_, args):
2✔
1111
    """
1112
    FastAPI implementation of daliuge translator interface
1113
    """
1114
    parser = argparse.ArgumentParser()
2✔
1115
    parser.add_argument(
2✔
1116
        "-d",
1117
        "--lgdir",
1118
        action="store",
1119
        type=str,
1120
        dest="lg_path",
1121
        default="/tmp",
1122
        help="A path that contains at least one sub-directory, which contains logical graph files",
1123
    )
1124
    parser.add_argument(
2✔
1125
        "-t",
1126
        "--pgtdir",
1127
        action="store",
1128
        type=str,
1129
        dest="pgt_path",
1130
        default="/tmp",
1131
        help="physical graph template path (output)",
1132
    )
1133
    parser.add_argument(
2✔
1134
        "-H",
1135
        "--host",
1136
        action="store",
1137
        type=str,
1138
        dest="host",
1139
        default="0.0.0.0",
1140
        help="logical graph editor host (all by default)",
1141
    )
1142
    parser.add_argument(
2✔
1143
        "-p",
1144
        "--port",
1145
        action="store",
1146
        type=int,
1147
        dest="port",
1148
        default=8084,
1149
        help="logical graph editor port (8084 by default)",
1150
    )
1151
    parser.add_argument(
2✔
1152
        "-v",
1153
        "--verbose",
1154
        action="store_true",
1155
        dest="verbose",
1156
        default=False,
1157
        help="Enable more logging",
1158
    )
1159
    parser.add_argument(
2✔
1160
        "-l",
1161
        "--log-dir",
1162
        action="store",
1163
        type=str,
1164
        dest="logdir",
1165
        help="The directory where the logging files will be stored",
1166
        default=utils.getDlgLogsDir(),
1167
    )
1168

1169
    options = parser.parse_args(args)
2✔
1170

1171
    if options.lg_path is None or options.pgt_path is None:
2✔
1172
        parser.error("Graph paths missing (-d/-t)")
×
1173
    elif not os.path.exists(options.lg_path):
2✔
1174
        parser.error(f"{options.lg_path} does not exist")
×
1175

1176
    if options.verbose or options.logdir:
2✔
1177
        fmt = logging.Formatter(
2✔
1178
            "%(asctime)-15s [%(levelname)5.5s] [%(threadName)15.15s] "
1179
            "%(name)s#%(funcName)s:%(lineno)s %(message)s"
1180
        )
1181
        fmt.converter = time.gmtime
2✔
1182
        if options.verbose:
2✔
1183
            stream_handler = logging.StreamHandler(sys.stdout)
2✔
1184
            stream_handler.setFormatter(fmt)
2✔
1185
            logging.root.addHandler(stream_handler)
2✔
1186
            logging.root.setLevel(logging.DEBUG)
2✔
1187
        if options.logdir:
2✔
1188
            # This is the logfile we'll use from now on
1189
            logdir = options.logdir
2✔
1190
            utils.createDirIfMissing(logdir)
2✔
1191
            logfile = os.path.join(logdir, "dlgTranslator.log")
2✔
1192
            fileHandler = logging.FileHandler(logfile)
2✔
1193
            fileHandler.setFormatter(fmt)
2✔
1194
            logging.root.addHandler(fileHandler)
2✔
1195

1196
    try:
2✔
1197
        os.makedirs(options.pgt_path, exist_ok=True)
2✔
1198
    except OSError:
×
1199
        logging.warning("Cannot create path %s", options.pgt_path)
×
1200

1201
    global lg_dir
1202
    global pgt_dir
1203
    global pg_mgr
1204

1205
    lg_dir = options.lg_path
2✔
1206
    pgt_dir = options.pgt_path
2✔
1207
    pg_mgr = PGManager(pgt_dir)
2✔
1208

1209
    def handler(*_args):
2✔
1210
        raise KeyboardInterrupt
×
1211

1212
    signal.signal(signal.SIGTERM, handler)
2✔
1213
    signal.signal(signal.SIGINT, handler)
2✔
1214

1215
    logging.debug("Starting uvicorn verbose %s", options.verbose)
2✔
1216
    uvicorn.run(app=app, host=options.host, port=options.port, debug=options.verbose)
2✔
1217

1218

1219
if __name__ == "__main__":
2✔
1220
    run(None, sys.argv[1:])
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc