• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ICRAR / daliuge / 10363213063

13 Aug 2024 03:39AM UTC coverage: 79.63% (-0.09%) from 79.722%
10363213063

Pull #271

github

web-flow
Merge branch 'master' into liu-377
Pull Request #271: Liu 377

70 of 122 new or added lines in 13 files covered. (57.38%)

12 existing lines in 6 files now uncovered.

15375 of 19308 relevant lines covered (79.63%)

1.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.39
/daliuge-engine/dlg/deploy/deployment_utils.py
1
#
2
#    ICRAR - International Centre for Radio Astronomy Research
3
#    (c) UWA - The University of Western Australia, 2019
4
#    Copyright by UWA (in the framework of the ICRAR)
5
#    All rights reserved
6
#
7
#    This library is free software; you can redistribute it and/or
8
#    modify it under the terms of the GNU Lesser General Public
9
#    License as published by the Free Software Foundation; either
10
#    version 2.1 of the License, or (at your option) any later version.
11
#
12
#    This library is distributed in the hope that it will be useful,
13
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
14
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15
#    Lesser General Public License for more details.
16
#
17
#    You should have received a copy of the GNU Lesser General Public
18
#    License along with this library; if not, write to the Free Software
19
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston,
20
#    MA 02111-1307  USA
21
#
22
import json
2✔
23
import logging
2✔
24
import re
2✔
25
import subprocess
2✔
26
import time
2✔
27

28
logger = logging.getLogger(__name__)
2✔
29

30

31
class ListTokens(object):
2✔
32
    STRING, COMMA, RANGE_SEP, MULTICASE_START, MULTICASE_END = range(5)
2✔
33

34

35
def _list_tokenizer(s):
2✔
36
    buff = []
2✔
37
    in_square_brackets = False
2✔
38
    for char in s:
2✔
39
        if char == "-" and in_square_brackets:
2✔
40
            yield ListTokens.STRING, "".join(buff)
2✔
41
            buff = []
2✔
42
            yield ListTokens.RANGE_SEP, None
2✔
43
        elif char == ",":
2✔
44
            if buff:
2✔
45
                yield ListTokens.STRING, "".join(buff)
2✔
46
                buff = []
2✔
47
            yield ListTokens.COMMA, None
2✔
48
        elif char == "[":
2✔
49
            in_square_brackets = True
2✔
50
            if buff:
2✔
51
                yield ListTokens.STRING, "".join(buff)
2✔
52
                buff = []
2✔
53
            yield ListTokens.MULTICASE_START, None
2✔
54
        elif char == "]":
2✔
55
            in_square_brackets = False
2✔
56
            if buff:
2✔
57
                yield ListTokens.STRING, "".join(buff)
2✔
58
                buff = []
2✔
59
            yield ListTokens.MULTICASE_END, None
2✔
60
        else:
61
            buff.append(char)
2✔
62
    if buff:
2✔
63
        yield ListTokens.STRING, "".join(buff)
×
64
        buff = []
×
65

66

67
def _parse_list_tokens(token_iter):
2✔
68
    def finish_element(sub_values, range_start):
2✔
69
        if sub_values:
2✔
70
            values.extend(sub_values)
2✔
71
        elif range_start is not None:
2✔
72
            range_end = values.pop()
2✔
73
            str_len = max(len(range_start), len(range_end))
2✔
74
            str_format = "%%0%dd" % str_len
2✔
75
            num_vals = [
2✔
76
                str_format % num for num in range(int(range_start), int(range_end) + 1)
77
            ]
78
            values.extend(num_vals)
2✔
79

80
    values = []
2✔
81
    sub_values = []
2✔
82
    range_start = None
2✔
83
    while True:
84
        try:
2✔
85
            token, value = next(token_iter)
2✔
86
        except StopIteration:
2✔
87
            finish_element(sub_values, range_start)
2✔
88
            return values
2✔
89
        if token == ListTokens.MULTICASE_END:
2✔
90
            finish_element(sub_values, range_start)
2✔
91
            return values
2✔
92
        if token == ListTokens.MULTICASE_START:
2✔
93
            prefix = ""
2✔
94
            if values:
2✔
95
                prefix = values.pop()
2✔
96
            sub_values = _parse_list_tokens(token_iter)
2✔
97
            sub_values = [prefix + s for s in sub_values]
2✔
98
        if token == ListTokens.RANGE_SEP:
2✔
99
            range_start = values.pop()
2✔
100
        elif token == ListTokens.COMMA:
2✔
101
            finish_element(sub_values, range_start)
2✔
102
            sub_values = None
2✔
103
            range_start = None
2✔
104
        elif token == ListTokens.STRING:
2✔
105
            if sub_values:
2✔
106
                sub_values = [s + value for s in sub_values]
×
107
            else:
108
                values.append(value)
2✔
109

110

111
def list_as_string(s):
2✔
112
    """'a008,b[072-073,076]' --> ['a008', 'b072', 'b073', 'b076']"""
113
    return _parse_list_tokens(iter(_list_tokenizer(s)))
2✔
114

115

116
def find_numislands(physical_graph_template_file):
2✔
117
    """
118
    Given the physical graph data extract the graph name and the total number of
119
    nodes. We are not making a decision whether the island managers are running
120
    on separate nodes here, thus the number is the sum of all island
121
    managers and node managers. The values are only populated if not given on the
122
    init already.
123
    TODO: We will probably need to do the same with job duration and CPU number
124
    """
NEW
125
    with open(physical_graph_template_file, "r") as f:
×
NEW
126
        pgt_data = json.load(f, strict=False)
×
127
    try:
×
128
        (pgt_name, pgt) = pgt_data
×
129
    except:
×
130
        raise ValueError(type(pgt_data))
×
NEW
131
    try:
×
NEW
132
        nodes = list(map(lambda x: x["node"], pgt))
×
NEW
133
        islands = list(map(lambda x: x["island"], pgt))
×
NEW
134
    except KeyError:
×
NEW
135
        return None, None, pgt_name
×
136
    num_islands = len(dict(zip(islands, range(len(islands)))))
×
137
    num_nodes = len(dict(zip(nodes, range(len(nodes)))))
×
NEW
138
    return num_islands, num_nodes, pgt_name
×
139

140

141
def label_job_dur(job_dur):
2✔
142
    """
143
    e.g. 135 min --> 02:15:00
144
    """
145
    seconds = job_dur * 60
×
146
    minute, sec = divmod(seconds, 60)
×
147
    hour, minute = divmod(minute, 60)
×
148
    return "%02d:%02d:%02d" % (hour, minute, sec)
×
149

150

151
def num_daliuge_nodes(num_nodes: int, run_proxy: bool):
2✔
152
    """
153
    Returns the number of daliuge nodes available to run workflow
154
    """
155
    if run_proxy:
×
156
        ret = num_nodes - 1  # exclude the proxy node
×
157
    else:
158
        ret = num_nodes - 0  # exclude the data island node?
×
159
    if ret <= 0:
×
NEW
160
        raise Exception("Not enough nodes {0} to run DALiuGE.".format(num_nodes))
×
UNCOV
161
    return ret
×
162

163

164
def find_node_ips():
2✔
165
    query = subprocess.check_output(
×
166
        [
167
            r"kubectl get nodes --selector=kubernetes.io/role!=master -o jsonpath={.items[*].status.addresses[?\(@.type==\"InternalIP\"\)].address}"
168
        ],
169
        shell=True,
170
    )
171
    node_ips = query.decode(encoding="utf-8").split(" ")
×
172
    return node_ips
×
173

174

175
def find_service_ips(num_expected, retries=3, timeout=10):
2✔
176
    pattern = r"^daliuge-daemon-service-.*\s*ClusterIP\s*\d+\.\d+\.\d+\.\d+"
×
177
    ip_pattern = r"\d+\.\d+\.\d+\.\d+"
×
178
    ips = []
×
179
    attempts = 0
×
180
    while len(ips) < num_expected and attempts < retries:
×
181
        ips = []
×
182
        query = subprocess.check_output(
×
183
            [r"kubectl get svc -o wide"], shell=True
184
        ).decode(encoding="utf-8")
185
        outcome = re.findall(pattern, query, re.M)
×
186
        for service in outcome:
×
187
            ip = re.search(ip_pattern, service)
×
188
            if ip:
×
189
                ips.append(ip.group(0))
×
190
        logger.info(f"K8s service ips: {ips}")
×
191
        time.sleep(timeout)
×
192
    return ips
×
193

194

195
def find_pod_ips(num_expected, retries=3, timeout=10):
2✔
196
    ips = []
×
197
    attempts = 0
×
198
    while len(ips) < num_expected and attempts < retries:
×
199
        ips = []
×
200
        query = str(
×
201
            subprocess.check_output([r"kubectl get pods -o wide"], shell=True).decode(
202
                encoding="utf-8"
203
            )
204
        )
205
        pattern = r"^daliuge-daemon.*"
×
206
        ip_pattern = r"\d+\.\d+\.\d+\.\d+"
×
207
        outcome = re.findall(pattern, query, re.M)
×
208
        for pod in outcome:
×
209
            ip = re.search(ip_pattern, pod)
×
210
            if ip:
×
211
                ips.append(ip.group(0))
×
212
        logger.info(f"K8s pod ips: {ips}")
×
213
        time.sleep(timeout)
×
214
    return ips
×
215

216

217
def _status_all_running(statuses):
2✔
218
    if statuses == []:
×
219
        return False
×
220
    for status in statuses:
×
221
        if status != "Running":
×
222
            return False
×
223
    return True
×
224

225

226
def wait_for_pods(num_expected, retries=18, timeout=10):
2✔
227
    all_running = False
×
228
    attempts = 0
×
229
    while not all_running and attempts < retries:
×
230
        query = str(
×
231
            subprocess.check_output([r"kubectl get pods -o wide"], shell=True).decode(
232
                encoding="utf-8"
233
            )
234
        )
235
        logger.info(query)
×
236
        pattern = r"^daliuge-daemon.*"
×
237
        outcome = re.findall(pattern, query, re.M)
×
238
        if len(outcome) < num_expected:
×
239
            all_running = False
×
240
            continue
×
241
        all_running = True
×
242
        for pod in outcome:
×
243
            if "Running" not in pod:
×
244
                all_running = False
×
245
        attempts += 1
×
246
        time.sleep(timeout)
×
247
    return all_running
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc