• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / pysqa / 4788443268

pending completion
4788443268

Pull #162

github-actions

GitHub
Merge 79f3ca770 into e11ebc314
Pull Request #162:

458 of 809 relevant lines covered (56.61%)

0.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.16
/pysqa/utils/basic.py
1
# coding: utf-8
2
# Copyright (c) Jan Janssen
3

4
import getpass
1✔
5
import importlib
1✔
6
from jinja2 import Template
1✔
7
import os
1✔
8
import subprocess
1✔
9
import re
1✔
10
import pandas
1✔
11
from pysqa.utils.queues import Queues
1✔
12

13

14
class BasisQueueAdapter(object):
1✔
15
    """
16
    The goal of the QueueAdapter class is to make submitting to a queue system as easy as starting another sub process
17
    locally.
18

19
    Args:
20
        directory (str): directory containing the queue.yaml files as well as corresponding jinja2 templates for the
21
                         individual queues.
22

23
    Attributes:
24

25
        .. attribute:: config
26

27
            QueueAdapter configuration read from the queue.yaml file.
28

29
        .. attribute:: queue_list
30

31
            List of available queues
32

33
        .. attribute:: queue_view
34

35
            Pandas DataFrame representation of the available queues, read from queue.yaml.
36

37
        .. attribute:: queues
38

39
            Queues available for auto completion QueueAdapter().queues.<queue name> returns the queue name.
40
    """
41

42
    def __init__(self, config, directory="~/.queues"):
1✔
43
        self._config = config
1✔
44
        self._fill_queue_dict(queue_lst_dict=self._config["queues"])
1✔
45
        self._load_templates(queue_lst_dict=self._config["queues"], directory=directory)
1✔
46
        if self._config["queue_type"] == "SGE":
1✔
47
            class_name = "SunGridEngineCommands"
1✔
48
            module_name = "pysqa.wrapper.sge"
1✔
49
        elif self._config["queue_type"] == "TORQUE":
1✔
50
            class_name = "TorqueCommands"
1✔
51
            module_name = "pysqa.wrapper.torque"
1✔
52
        elif self._config["queue_type"] == "SLURM":
1✔
53
            class_name = "SlurmCommands"
1✔
54
            module_name = "pysqa.wrapper.slurm"
1✔
55
        elif self._config["queue_type"] == "LSF":
1✔
56
            class_name = "LsfCommands"
1✔
57
            module_name = "pysqa.wrapper.lsf"
1✔
58
        elif self._config["queue_type"] == "MOAB":
1✔
59
            class_name = "MoabCommands"
1✔
60
            module_name = "pysqa.wrapper.moab"
1✔
61
        elif self._config["queue_type"] == "GENT":
1✔
62
            class_name = "GentCommands"
1✔
63
            module_name = "pysqa.wrapper.gent"
1✔
64
        elif self._config["queue_type"] == "REMOTE":
×
65
            class_name = None
×
66
            module_name = None
×
67
        else:
68
            raise ValueError()
×
69
        if self._config["queue_type"] != "REMOTE":
1✔
70
            self._commands = getattr(importlib.import_module(module_name), class_name)()
1✔
71
        self._queues = Queues(self.queue_list)
1✔
72
        self._remote_flag = False
1✔
73
        self._ssh_delete_file_on_remote = True
1✔
74

75
    @property
1✔
76
    def ssh_delete_file_on_remote(self):
1✔
77
        return self._ssh_delete_file_on_remote
×
78

79
    @property
1✔
80
    def remote_flag(self):
1✔
81
        return self._remote_flag
×
82

83
    @property
1✔
84
    def config(self):
1✔
85
        """
86

87
        Returns:
88
            dict:
89
        """
90
        return self._config
1✔
91

92
    @property
1✔
93
    def queue_list(self):
1✔
94
        """
95

96
        Returns:
97
            list:
98
        """
99
        return list(self._config["queues"].keys())
1✔
100

101
    @property
1✔
102
    def queue_view(self):
1✔
103
        """
104

105
        Returns:
106
            pandas.DataFrame:
107
        """
108
        return pandas.DataFrame(self._config["queues"]).T.drop(
1✔
109
            ["script", "template"], axis=1
110
        )
111

112
    @property
1✔
113
    def queues(self):
1✔
114
        return self._queues
1✔
115

116
    def submit_job(
1✔
117
        self,
118
        queue=None,
119
        job_name=None,
120
        working_directory=None,
121
        cores=None,
122
        memory_max=None,
123
        run_time_max=None,
124
        dependency_list=None,
125
        command=None,
126
    ):
127
        """
128

129
        Args:
130
            queue (str/None):
131
            job_name (str/None):
132
            working_directory (str/None):
133
            cores (int/None):
134
            memory_max (int/None):
135
            run_time_max (int/None):
136
            dependency_list (list[str]/None:
137
            command (str/None):
138

139
        Returns:
140
            int:
141
        """
142
        if " " in working_directory:
×
143
            raise ValueError(
×
144
                "Whitespaces in the working_directory name are not supported!"
145
            )
146
        working_directory, queue_script_path = self._write_queue_script(
×
147
            queue=queue,
148
            job_name=job_name,
149
            working_directory=working_directory,
150
            cores=cores,
151
            memory_max=memory_max,
152
            run_time_max=run_time_max,
153
            command=command,
154
        )
155
        out = self._execute_command(
×
156
            commands=self._list_command_to_be_executed(
157
                dependency_list, queue_script_path
158
            ),
159
            working_directory=working_directory,
160
            split_output=False,
161
        )
162
        if out is not None:
×
163
            return self._commands.get_job_id_from_output(out)
×
164
        else:
165
            return None
×
166

167
    def _list_command_to_be_executed(self, dependency_list, queue_script_path) -> list:
1✔
168
        return (
1✔
169
            self._commands.submit_job_command
170
            + self._commands.dependencies(dependency_list)
171
            + [queue_script_path]
172
        )
173

174
    def enable_reservation(self, process_id):
1✔
175
        """
176

177
        Args:
178
            process_id (int):
179

180
        Returns:
181
            str:
182
        """
183
        out = self._execute_command(
×
184
            commands=self._commands.enable_reservation_command + [str(process_id)],
185
            split_output=True,
186
        )
187
        if out is not None:
×
188
            return out[0]
×
189
        else:
190
            return None
×
191

192
    def delete_job(self, process_id):
1✔
193
        """
194

195
        Args:
196
            process_id (int):
197

198
        Returns:
199
            str:
200
        """
201
        out = self._execute_command(
×
202
            commands=self._commands.delete_job_command + [str(process_id)],
203
            split_output=True,
204
        )
205
        if out is not None:
×
206
            return out[0]
×
207
        else:
208
            return None
×
209

210
    def get_queue_status(self, user=None):
1✔
211
        """
212

213
        Args:
214
            user (str):
215

216
        Returns:
217
            pandas.DataFrame:
218
        """
219
        out = self._execute_command(
×
220
            commands=self._commands.get_queue_status_command, split_output=False
221
        )
222
        df = self._commands.convert_queue_status(queue_status_output=out)
×
223
        if user is None:
×
224
            return df
×
225
        else:
226
            return df[df["user"] == user]
×
227

228
    def get_status_of_my_jobs(self):
1✔
229
        """
230

231
        Returns:
232
           pandas.DataFrame:
233
        """
234
        return self.get_queue_status(user=self._get_user())
×
235

236
    def get_status_of_job(self, process_id):
1✔
237
        """
238

239
        Args:
240
            process_id:
241

242
        Returns:
243
             str: ['running', 'pending', 'error']
244
        """
245
        df = self.get_queue_status()
×
246
        df_selected = df[df["jobid"] == process_id]["status"]
×
247
        if len(df_selected) != 0:
×
248
            return df_selected.values[0]
×
249
        else:
250
            return None
×
251

252
    def get_status_of_jobs(self, process_id_lst):
1✔
253
        """
254

255
        Args:
256
            process_id_lst:
257

258
        Returns:
259
             list: ['running', 'pending', 'error', ...]
260
        """
261
        df = self.get_queue_status()
×
262
        results_lst = []
×
263
        for process_id in process_id_lst:
×
264
            df_selected = df[df["jobid"] == process_id]["status"]
×
265
            if len(df_selected) != 0:
×
266
                results_lst.append(df_selected.values[0])
×
267
            else:
268
                results_lst.append("finished")
×
269
        return results_lst
×
270

271
    def get_job_from_remote(self, working_directory, delete_remote=False):
1✔
272
        """
273
        Get the results of the calculation - this is necessary when the calculation was executed on a remote host.
274
        """
275
        pass
×
276

277
    def convert_path_to_remote(self, path):
1✔
278
        pass
×
279

280
    def transfer_file(self, file, transfer_back=False, delete_remote=False):
1✔
281
        pass
×
282

283
    def check_queue_parameters(
1✔
284
        self, queue, cores=1, run_time_max=None, memory_max=None, active_queue=None
285
    ):
286
        """
287

288
        Args:
289
            queue (str/None):
290
            cores (int):
291
            run_time_max (int/None):
292
            memory_max (int/None):
293
            active_queue (dict):
294

295
        Returns:
296
            list: [cores, run_time_max, memory_max]
297
        """
298
        if active_queue is None:
1✔
299
            active_queue = self._config["queues"][queue]
1✔
300
        cores = self._value_in_range(
1✔
301
            value=cores,
302
            value_min=active_queue["cores_min"],
303
            value_max=active_queue["cores_max"],
304
        )
305
        run_time_max = self._value_in_range(
1✔
306
            value=run_time_max, value_max=active_queue["run_time_max"]
307
        )
308
        memory_max = self._value_in_range(
1✔
309
            value=memory_max, value_max=active_queue["memory_max"]
310
        )
311
        return cores, run_time_max, memory_max
1✔
312

313
    def _write_queue_script(
1✔
314
        self,
315
        queue=None,
316
        job_name=None,
317
        working_directory=None,
318
        cores=None,
319
        memory_max=None,
320
        run_time_max=None,
321
        command=None,
322
    ):
323
        """
324

325
        Args:
326
            queue (str/None):
327
            job_name (str/None):
328
            working_directory (str/None):
329
            cores (int/None):
330
            memory_max (int/None):
331
            run_time_max (int/None):
332
            command (str/None):
333

334
        """
335
        if isinstance(command, list):
×
336
            command = "".join(command)
×
337
        if working_directory is None:
×
338
            working_directory = "."
×
339
        queue_script = self._job_submission_template(
×
340
            queue=queue,
341
            job_name=job_name,
342
            working_directory=working_directory,
343
            cores=cores,
344
            memory_max=memory_max,
345
            run_time_max=run_time_max,
346
            command=command,
347
        )
348
        if not os.path.exists(working_directory):
×
349
            os.makedirs(working_directory)
×
350
        queue_script_path = os.path.join(working_directory, "run_queue.sh")
×
351
        with open(queue_script_path, "w") as f:
×
352
            f.writelines(queue_script)
×
353
        return working_directory, queue_script_path
×
354

355
    def _job_submission_template(
1✔
356
        self,
357
        queue=None,
358
        job_name="job.py",
359
        working_directory=".",
360
        cores=None,
361
        memory_max=None,
362
        run_time_max=None,
363
        command=None,
364
    ):
365
        """
366

367
        Args:
368
            queue (str/None):
369
            job_name (str):
370
            working_directory (str):
371
            cores (int/None):
372
            memory_max (int/None):
373
            run_time_max (int/None):
374
            command (str/None):
375

376
        Returns:
377
            str:
378
        """
379
        if queue is None:
1✔
380
            queue = self._config["queue_primary"]
1✔
381
        self._value_error_if_none(value=command)
1✔
382
        if queue not in self.queue_list:
1✔
383
            raise ValueError()
1✔
384
        active_queue = self._config["queues"][queue]
1✔
385
        cores, run_time_max, memory_max = self.check_queue_parameters(
1✔
386
            queue=None,
387
            cores=cores,
388
            run_time_max=run_time_max,
389
            memory_max=memory_max,
390
            active_queue=active_queue,
391
        )
392
        template = active_queue["template"]
1✔
393
        return template.render(
1✔
394
            job_name=job_name,
395
            working_directory=working_directory,
396
            cores=cores,
397
            memory_max=memory_max,
398
            run_time_max=run_time_max,
399
            command=command,
400
        )
401

402
    @staticmethod
1✔
403
    def _get_user():
1✔
404
        """
405

406
        Returns:
407
            str:
408
        """
409
        return getpass.getuser()
1✔
410

411
    @staticmethod
1✔
412
    def _execute_command(
1✔
413
        commands, working_directory=None, split_output=True, shell=False
414
    ):
415
        """
416

417
        Args:
418
            commands (list/str):
419
            working_directory (str):
420
            split_output (bool):
421
            shell (bool):
422

423
        Returns:
424
            str:
425
        """
426
        if shell and isinstance(commands, list):
×
427
            commands = " ".join(commands)
×
428
        try:
×
429
            out = subprocess.check_output(
×
430
                commands,
431
                cwd=working_directory,
432
                stderr=subprocess.STDOUT,
433
                universal_newlines=True,
434
                shell=not isinstance(commands, list),
435
            )
436
        except subprocess.CalledProcessError as e:
×
437
            with open(os.path.join(working_directory, "pysqa.err"), "w") as f:
×
438
                print(e.stdout, file=f)
×
439
            out = None
×
440
        if out is not None and split_output:
×
441
            return out.split("\n")
×
442
        else:
443
            return out
×
444

445
    @staticmethod
1✔
446
    def _fill_queue_dict(queue_lst_dict):
1✔
447
        """
448

449
        Args:
450
            queue_lst_dict (dict):
451
        """
452
        queue_keys = ["cores_min", "cores_max", "run_time_max", "memory_max"]
1✔
453
        for queue_dict in queue_lst_dict.values():
1✔
454
            for key in set(queue_keys) - set(queue_dict.keys()):
1✔
455
                queue_dict[key] = None
1✔
456

457
    @staticmethod
1✔
458
    def _load_templates(queue_lst_dict, directory="."):
1✔
459
        """
460

461
        Args:
462
            queue_lst_dict (dict):
463
            directory (str):
464
        """
465
        for queue_dict in queue_lst_dict.values():
1✔
466
            if "script" in queue_dict.keys():
1✔
467
                with open(os.path.join(directory, queue_dict["script"]), "r") as f:
1✔
468
                    queue_dict["template"] = Template(f.read())
1✔
469

470
    @staticmethod
1✔
471
    def _value_error_if_none(value):
1✔
472
        """
473

474
        Args:
475
            value (str/None):
476
        """
477
        if value is None:
1✔
478
            raise ValueError()
1✔
479
        if not isinstance(value, str):
1✔
480
            raise TypeError()
1✔
481

482
    @classmethod
1✔
483
    def _value_in_range(cls, value, value_min=None, value_max=None):
1✔
484
        """
485

486
        Args:
487
            value (int/float/None):
488
            value_min (int/float/None):
489
            value_max (int/float/None):
490

491
        Returns:
492
            int/float/None:
493
        """
494

495
        if value is not None:
1✔
496
            value_, value_min_, value_max_ = [
1✔
497
                cls._memory_spec_string_to_value(v)
498
                if v is not None and isinstance(v, str)
499
                else v
500
                for v in (value, value_min, value_max)
501
            ]
502
            # ATTENTION: '60000' is interpreted as '60000M' since default magnitude is 'M'
503
            # ATTENTION: int('60000') is interpreted as '60000B' since _memory_spec_string_to_value return the size in
504
            # ATTENTION: bytes, as target_magnitude = 'b'
505
            # We want to compare the the actual (k,m,g)byte value if there is any
506
            if value_min_ is not None and value_ < value_min_:
1✔
507
                return value_min
1✔
508
            if value_max_ is not None and value_ > value_max_:
1✔
509
                return value_max
1✔
510
            return value
1✔
511
        else:
512
            if value_min is not None:
1✔
513
                return value_min
1✔
514
            if value_max is not None:
1✔
515
                return value_max
1✔
516
            return value
1✔
517

518
    @staticmethod
1✔
519
    def _is_memory_string(value):
1✔
520
        """
521
        Tests a string if it specifies a certain amount of memory e.g.: '20G', '60b'. Also pure integer strings are
522
        also valid.
523

524
        Args:
525
            value (str): the string to test
526

527
        Returns:
528
            (bool): A boolean value if the string matches a memory specification
529
        """
530
        memory_spec_pattern = r"[0-9]+[bBkKmMgGtT]?"
1✔
531
        return re.findall(memory_spec_pattern, value)[0] == value
1✔
532

533
    @classmethod
1✔
534
    def _memory_spec_string_to_value(
1✔
535
        cls, value, default_magnitude="m", target_magnitude="b"
536
    ):
537
        """
538
        Converts a valid memory string (tested by _is_memory_string) into an integer/float value of desired
539
        magnitude `default_magnitude`. If it is a plain integer string (e.g.: '50000') it will be interpreted with
540
        the magnitude passed in by the `default_magnitude`. The output will rescaled to `target_magnitude`
541

542
        Args:
543
            value (str): the string
544
            default_magnitude (str): magnitude for interpreting plain integer strings [b, B, k, K, m, M, g, G, t, T]
545
            target_magnitude (str): to which the output value should be converted [b, B, k, K, m, M, g, G, t, T]
546

547
        Returns:
548
            (float/int): the value of the string in `target_magnitude` units
549
        """
550
        magnitude_mapping = {"b": 0, "k": 1, "m": 2, "g": 3, "t": 4}
1✔
551
        if cls._is_memory_string(value):
1✔
552
            integer_pattern = r"[0-9]+"
1✔
553
            magnitude_pattern = r"[bBkKmMgGtT]+"
1✔
554
            integer_value = int(re.findall(integer_pattern, value)[0])
1✔
555

556
            magnitude = re.findall(magnitude_pattern, value)
1✔
557
            if len(magnitude) > 0:
1✔
558
                magnitude = magnitude[0].lower()
1✔
559
            else:
560
                magnitude = default_magnitude.lower()
1✔
561
            # Convert it to default magnitude = megabytes
562
            return (integer_value * 1024 ** magnitude_mapping[magnitude]) / (
1✔
563
                1024 ** magnitude_mapping[target_magnitude]
564
            )
565
        else:
566
            return value
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc