• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / pysqa / 7772851850

04 Feb 2024 08:40AM UTC coverage: 96.262% (-0.2%) from 96.497%
7772851850

Pull #259

github

web-flow
Merge 8bb4585bf into 90516679d
Pull Request #259: Add manual two factor authentication

4 of 7 new or added lines in 1 file covered. (57.14%)

6 existing lines in 1 file now uncovered.

1159 of 1204 relevant lines covered (96.26%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.65
/pysqa/utils/basic.py
1
# coding: utf-8
2
# Copyright (c) Jan Janssen
3

4
import getpass
1✔
5
import importlib
1✔
6
import os
1✔
7
import re
1✔
8

9
import pandas
1✔
10
from jinja2 import Template
1✔
11
from jinja2.exceptions import TemplateSyntaxError
1✔
12

13
from pysqa.utils.execute import execute_command
1✔
14
from pysqa.utils.queues import Queues
1✔
15

16

17
class BasisQueueAdapter(object):
1✔
18
    """
19
    The goal of the QueueAdapter class is to make submitting to a queue system as easy as starting another sub process
20
    locally.
21

22
    Args:
23
        config (dict):
24
        directory (str): directory containing the queue.yaml files as well as corresponding jinja2 templates for the
25
                         individual queues.
26
        execute_command(funct):
27

28
    Attributes:
29

30
        .. attribute:: config
31

32
            QueueAdapter configuration read from the queue.yaml file.
33

34
        .. attribute:: queue_list
35

36
            List of available queues
37

38
        .. attribute:: queue_view
39

40
            Pandas DataFrame representation of the available queues, read from queue.yaml.
41

42
        .. attribute:: queues
43

44
            Queues available for auto completion QueueAdapter().queues.<queue name> returns the queue name.
45
    """
46

47
    def __init__(self, config, directory="~/.queues", execute_command=execute_command):
1✔
48
        self._config = config
1✔
49
        self._fill_queue_dict(queue_lst_dict=self._config["queues"])
1✔
50
        self._load_templates(queue_lst_dict=self._config["queues"], directory=directory)
1✔
51
        if self._config["queue_type"] == "SGE":
1✔
52
            class_name = "SunGridEngineCommands"
1✔
53
            module_name = "pysqa.wrapper.sge"
1✔
54
        elif self._config["queue_type"] == "TORQUE":
1✔
55
            class_name = "TorqueCommands"
1✔
56
            module_name = "pysqa.wrapper.torque"
1✔
57
        elif self._config["queue_type"] == "SLURM":
1✔
58
            class_name = "SlurmCommands"
1✔
59
            module_name = "pysqa.wrapper.slurm"
1✔
60
        elif self._config["queue_type"] == "LSF":
1✔
61
            class_name = "LsfCommands"
1✔
62
            module_name = "pysqa.wrapper.lsf"
1✔
63
        elif self._config["queue_type"] == "MOAB":
1✔
64
            class_name = "MoabCommands"
1✔
65
            module_name = "pysqa.wrapper.moab"
1✔
66
        elif self._config["queue_type"] == "GENT":
1✔
67
            class_name = "GentCommands"
1✔
68
            module_name = "pysqa.wrapper.gent"
1✔
69
        elif self._config["queue_type"] == "REMOTE":
1✔
70
            class_name = None
1✔
71
            module_name = None
1✔
72
        elif self._config["queue_type"] == "FLUX":
1✔
73
            class_name = "FluxCommands"
1✔
74
            module_name = "pysqa.wrapper.flux"
1✔
75
        else:
×
76
            raise ValueError(
1✔
77
                "The queue_type "
1✔
78
                + self._config["queue_type"]
1✔
79
                + " is not found in the list of supported queue types "
1✔
80
                + str(
1✔
81
                    ["SGE", "TORQUE", "SLURM", "LSF", "MOAB", "FLUX", "GENT", "REMOTE"]
1✔
82
                )
83
            )
84
        if self._config["queue_type"] != "REMOTE":
1✔
85
            self._commands = getattr(importlib.import_module(module_name), class_name)()
1✔
86
        self._queues = Queues(self.queue_list)
1✔
87
        self._remote_flag = False
1✔
88
        self._ssh_delete_file_on_remote = True
1✔
89
        self._execute_command_function = execute_command
1✔
90

91
    @property
1✔
92
    def ssh_delete_file_on_remote(self):
1✔
93
        return self._ssh_delete_file_on_remote
1✔
94

95
    @property
1✔
96
    def remote_flag(self):
1✔
97
        return self._remote_flag
1✔
98

99
    @property
1✔
100
    def config(self):
1✔
101
        """
102

103
        Returns:
104
            dict:
105
        """
106
        return self._config
1✔
107

108
    @property
1✔
109
    def queue_list(self):
1✔
110
        """
111

112
        Returns:
113
            list:
114
        """
115
        return list(self._config["queues"].keys())
1✔
116

117
    @property
1✔
118
    def queue_view(self):
1✔
119
        """
120

121
        Returns:
122
            pandas.DataFrame:
123
        """
124
        return pandas.DataFrame(self._config["queues"]).T.drop(
1✔
125
            ["script", "template"], axis=1
1✔
126
        )
127

128
    @property
1✔
129
    def queues(self):
1✔
130
        return self._queues
1✔
131

132
    def submit_job(
1✔
133
        self,
134
        queue=None,
1✔
135
        job_name=None,
1✔
136
        working_directory=None,
1✔
137
        cores=None,
1✔
138
        memory_max=None,
1✔
139
        run_time_max=None,
1✔
140
        dependency_list=None,
1✔
141
        command=None,
1✔
142
        **kwargs,
143
    ):
144
        """
145

146
        Args:
147
            queue (str/None):
148
            job_name (str/None):
149
            working_directory (str/None):
150
            cores (int/None):
151
            memory_max (int/None):
152
            run_time_max (int/None):
153
            dependency_list (list[str]/None:
154
            command (str/None):
155

156
        Returns:
157
            int:
158
        """
159
        if " " in working_directory:
1✔
160
            raise ValueError(
1✔
161
                "Whitespaces in the working_directory name are not supported!"
1✔
162
            )
163
        working_directory, queue_script_path = self._write_queue_script(
1✔
164
            queue=queue,
1✔
165
            job_name=job_name,
1✔
166
            working_directory=working_directory,
1✔
167
            cores=cores,
1✔
168
            memory_max=memory_max,
1✔
169
            run_time_max=run_time_max,
1✔
170
            command=command,
1✔
171
            **kwargs,
1✔
172
        )
173
        out = self._execute_command(
1✔
174
            commands=self._list_command_to_be_executed(
1✔
175
                dependency_list, queue_script_path
1✔
176
            ),
177
            working_directory=working_directory,
1✔
178
            split_output=False,
1✔
179
        )
180
        if out is not None:
1✔
181
            return self._commands.get_job_id_from_output(out)
1✔
182
        else:
183
            return None
1✔
184

185
    def _list_command_to_be_executed(self, dependency_list, queue_script_path) -> list:
1✔
186
        return (
1✔
187
            self._commands.submit_job_command
1✔
188
            + self._commands.dependencies(dependency_list)
1✔
189
            + [queue_script_path]
1✔
190
        )
191

192
    def enable_reservation(self, process_id):
1✔
193
        """
194

195
        Args:
196
            process_id (int):
197

198
        Returns:
199
            str:
200
        """
201
        out = self._execute_command(
202
            commands=self._commands.enable_reservation_command + [str(process_id)],
203
            split_output=True,
204
        )
205
        if out is not None:
206
            return out[0]
207
        else:
208
            return None
209

210
    def delete_job(self, process_id):
1✔
211
        """
212

213
        Args:
214
            process_id (int):
215

216
        Returns:
217
            str:
218
        """
219
        out = self._execute_command(
1✔
220
            commands=self._commands.delete_job_command + [str(process_id)],
1✔
221
            split_output=True,
1✔
222
        )
223
        if out is not None:
1✔
224
            return out[0]
1✔
225
        else:
226
            return None
1✔
227

228
    def get_queue_status(self, user=None):
1✔
229
        """
230

231
        Args:
232
            user (str):
233

234
        Returns:
235
            pandas.DataFrame:
236
        """
237
        out = self._execute_command(
1✔
238
            commands=self._commands.get_queue_status_command, split_output=False
1✔
239
        )
240
        df = self._commands.convert_queue_status(queue_status_output=out)
1✔
241
        if user is None:
1✔
242
            return df
1✔
243
        else:
244
            return df[df["user"] == user]
1✔
245

246
    def get_status_of_my_jobs(self):
1✔
247
        """
248

249
        Returns:
250
           pandas.DataFrame:
251
        """
252
        return self.get_queue_status(user=self._get_user())
253

254
    def get_status_of_job(self, process_id):
1✔
255
        """
256

257
        Args:
258
            process_id:
259

260
        Returns:
261
             str: ['running', 'pending', 'error']
262
        """
263
        df = self.get_queue_status()
1✔
264
        df_selected = df[df["jobid"] == process_id]["status"]
1✔
265
        if len(df_selected) != 0:
1✔
266
            return df_selected.values[0]
1✔
267
        else:
268
            return None
1✔
269

270
    def get_status_of_jobs(self, process_id_lst):
1✔
271
        """
272

273
        Args:
274
            process_id_lst:
275

276
        Returns:
277
             list: ['running', 'pending', 'error', ...]
278
        """
279
        df = self.get_queue_status()
1✔
280
        results_lst = []
1✔
281
        for process_id in process_id_lst:
1✔
282
            df_selected = df[df["jobid"] == process_id]["status"]
1✔
283
            if len(df_selected) != 0:
1✔
284
                results_lst.append(df_selected.values[0])
1✔
285
            else:
286
                results_lst.append("finished")
1✔
287
        return results_lst
1✔
288

289
    def get_job_from_remote(self, working_directory):
1✔
290
        """
291
        Get the results of the calculation - this is necessary when the calculation was executed on a remote host.
292
        """
293
        raise NotImplementedError
1✔
294

295
    def convert_path_to_remote(self, path):
1✔
296
        raise NotImplementedError
1✔
297

298
    def transfer_file(self, file, transfer_back=False, delete_file_on_remote=False):
1✔
299
        raise NotImplementedError
1✔
300

301
    def check_queue_parameters(
1✔
302
        self, queue, cores=1, run_time_max=None, memory_max=None, active_queue=None
1✔
303
    ):
304
        """
305

306
        Args:
307
            queue (str/None):
308
            cores (int):
309
            run_time_max (int/None):
310
            memory_max (int/None):
311
            active_queue (dict):
312

313
        Returns:
314
            list: [cores, run_time_max, memory_max]
315
        """
316
        if active_queue is None:
1✔
317
            active_queue = self._config["queues"][queue]
1✔
318
        cores = self._value_in_range(
1✔
319
            value=cores,
1✔
320
            value_min=active_queue["cores_min"],
1✔
321
            value_max=active_queue["cores_max"],
1✔
322
        )
323
        run_time_max = self._value_in_range(
1✔
324
            value=run_time_max, value_max=active_queue["run_time_max"]
1✔
325
        )
326
        memory_max = self._value_in_range(
1✔
327
            value=memory_max, value_max=active_queue["memory_max"]
1✔
328
        )
329
        return cores, run_time_max, memory_max
1✔
330

331
    def _write_queue_script(
1✔
332
        self,
333
        queue=None,
1✔
334
        job_name=None,
1✔
335
        working_directory=None,
1✔
336
        cores=None,
1✔
337
        memory_max=None,
1✔
338
        run_time_max=None,
1✔
339
        command=None,
1✔
340
        **kwargs,
341
    ):
342
        """
343

344
        Args:
345
            queue (str/None):
346
            job_name (str/None):
347
            working_directory (str/None):
348
            cores (int/None):
349
            memory_max (int/None):
350
            run_time_max (int/None):
351
            command (str/None):
352

353
        """
354
        if isinstance(command, list):
1✔
355
            command = "".join(command)
356
        if working_directory is None:
1✔
357
            working_directory = "."
1✔
358
        queue_script = self._job_submission_template(
1✔
359
            queue=queue,
1✔
360
            job_name=job_name,
1✔
361
            working_directory=working_directory,
1✔
362
            cores=cores,
1✔
363
            memory_max=memory_max,
1✔
364
            run_time_max=run_time_max,
1✔
365
            command=command,
1✔
366
            **kwargs,
1✔
367
        )
368
        if not os.path.exists(working_directory):
1✔
369
            os.makedirs(working_directory)
370
        queue_script_path = os.path.join(working_directory, "run_queue.sh")
1✔
371
        with open(queue_script_path, "w") as f:
1✔
372
            f.writelines(queue_script)
1✔
373
        return working_directory, queue_script_path
1✔
374

375
    def _job_submission_template(
1✔
376
        self,
377
        queue=None,
1✔
378
        job_name="job.py",
1✔
379
        working_directory=".",
1✔
380
        cores=None,
1✔
381
        memory_max=None,
1✔
382
        run_time_max=None,
1✔
383
        command=None,
1✔
384
        **kwargs,
385
    ):
386
        """
387

388
        Args:
389
            queue (str/None):
390
            job_name (str):
391
            working_directory (str):
392
            cores (int/None):
393
            memory_max (int/None):
394
            run_time_max (int/None):
395
            command (str/None):
396

397
        Returns:
398
            str:
399
        """
400
        if queue is None:
1✔
401
            queue = self._config["queue_primary"]
1✔
402
        self._value_error_if_none(value=command)
1✔
403
        if queue not in self.queue_list:
1✔
404
            raise ValueError(
1✔
405
                "The queue "
1✔
406
                + queue
1✔
407
                + " was not found in the list of queues: "
1✔
408
                + str(self.queue_list)
1✔
409
            )
410
        active_queue = self._config["queues"][queue]
1✔
411
        cores, run_time_max, memory_max = self.check_queue_parameters(
1✔
412
            queue=None,
1✔
413
            cores=cores,
1✔
414
            run_time_max=run_time_max,
1✔
415
            memory_max=memory_max,
1✔
416
            active_queue=active_queue,
1✔
417
        )
418
        template = active_queue["template"]
1✔
419
        return template.render(
1✔
420
            job_name=job_name,
1✔
421
            working_directory=working_directory,
1✔
422
            cores=cores,
1✔
423
            memory_max=memory_max,
1✔
424
            run_time_max=run_time_max,
1✔
425
            command=command,
1✔
426
            **kwargs,
1✔
427
        )
428

429
    def _execute_command(
1✔
430
        self,
431
        commands,
432
        working_directory=None,
1✔
433
        split_output=True,
1✔
434
        shell=False,
1✔
435
        error_filename="pysqa.err",
1✔
436
    ):
437
        """
438

439
        Args:
440
            commands (list/str):
441
            working_directory (str):
442
            split_output (bool):
443
            shell (bool):
444
            error_filename (str):
445

446
        Returns:
447
            str:
448
        """
449
        return self._execute_command_function(
1✔
450
            commands=commands,
1✔
451
            working_directory=working_directory,
1✔
452
            split_output=split_output,
1✔
453
            shell=shell,
1✔
454
            error_filename=error_filename,
1✔
455
        )
456

457
    @staticmethod
1✔
458
    def _get_user():
1✔
459
        """
460

461
        Returns:
462
            str:
463
        """
464
        return getpass.getuser()
1✔
465

466
    @staticmethod
1✔
467
    def _fill_queue_dict(queue_lst_dict):
1✔
468
        """
469

470
        Args:
471
            queue_lst_dict (dict):
472
        """
473
        queue_keys = ["cores_min", "cores_max", "run_time_max", "memory_max"]
1✔
474
        for queue_dict in queue_lst_dict.values():
1✔
475
            for key in set(queue_keys) - set(queue_dict.keys()):
1✔
476
                queue_dict[key] = None
1✔
477

478
    @staticmethod
1✔
479
    def _load_templates(queue_lst_dict, directory="."):
1✔
480
        """
481

482
        Args:
483
            queue_lst_dict (dict):
484
            directory (str):
485
        """
486
        for queue_dict in queue_lst_dict.values():
1✔
487
            if "script" in queue_dict.keys():
1✔
488
                with open(os.path.join(directory, queue_dict["script"]), "r") as f:
1✔
489
                    try:
1✔
490
                        queue_dict["template"] = Template(f.read())
1✔
491
                    except TemplateSyntaxError as error:
1✔
492
                        raise TemplateSyntaxError(
1✔
493
                            message="File: "
1✔
494
                            + queue_dict["script"]
1✔
495
                            + " - "
1✔
496
                            + error.message,
1✔
497
                            lineno=error.lineno,
1✔
498
                        )
499

500
    @staticmethod
1✔
501
    def _value_error_if_none(value):
1✔
502
        """
503

504
        Args:
505
            value (str/None):
506
        """
507
        if value is None:
1✔
508
            raise ValueError()
1✔
509
        if not isinstance(value, str):
1✔
510
            raise TypeError()
1✔
511

512
    @classmethod
1✔
513
    def _value_in_range(cls, value, value_min=None, value_max=None):
1✔
514
        """
515

516
        Args:
517
            value (int/float/None):
518
            value_min (int/float/None):
519
            value_max (int/float/None):
520

521
        Returns:
522
            int/float/None:
523
        """
524

525
        if value is not None:
1✔
526
            value_, value_min_, value_max_ = [
1✔
527
                (
528
                    cls._memory_spec_string_to_value(v)
1✔
529
                    if v is not None and isinstance(v, str)
1✔
530
                    else v
1✔
531
                )
532
                for v in (value, value_min, value_max)
1✔
533
            ]
534
            # ATTENTION: '60000' is interpreted as '60000M' since default magnitude is 'M'
535
            # ATTENTION: int('60000') is interpreted as '60000B' since _memory_spec_string_to_value return the size in
536
            # ATTENTION: bytes, as target_magnitude = 'b'
537
            # We want to compare the the actual (k,m,g)byte value if there is any
538
            if value_min_ is not None and value_ < value_min_:
1✔
539
                return value_min
1✔
540
            if value_max_ is not None and value_ > value_max_:
1✔
541
                return value_max
1✔
542
            return value
1✔
543
        else:
544
            if value_min is not None:
1✔
545
                return value_min
1✔
546
            if value_max is not None:
1✔
547
                return value_max
1✔
548
            return value
1✔
549

550
    @staticmethod
1✔
551
    def _is_memory_string(value):
1✔
552
        """
553
        Tests a string if it specifies a certain amount of memory e.g.: '20G', '60b'. Also pure integer strings are
554
        also valid.
555

556
        Args:
557
            value (str): the string to test
558

559
        Returns:
560
            (bool): A boolean value if the string matches a memory specification
561
        """
562
        memory_spec_pattern = r"[0-9]+[bBkKmMgGtT]?"
1✔
563
        return re.findall(memory_spec_pattern, value)[0] == value
1✔
564

565
    @classmethod
1✔
566
    def _memory_spec_string_to_value(
1✔
567
        cls, value, default_magnitude="m", target_magnitude="b"
1✔
568
    ):
569
        """
570
        Converts a valid memory string (tested by _is_memory_string) into an integer/float value of desired
571
        magnitude `default_magnitude`. If it is a plain integer string (e.g.: '50000') it will be interpreted with
572
        the magnitude passed in by the `default_magnitude`. The output will rescaled to `target_magnitude`
573

574
        Args:
575
            value (str): the string
576
            default_magnitude (str): magnitude for interpreting plain integer strings [b, B, k, K, m, M, g, G, t, T]
577
            target_magnitude (str): to which the output value should be converted [b, B, k, K, m, M, g, G, t, T]
578

579
        Returns:
580
            (float/int): the value of the string in `target_magnitude` units
581
        """
582
        magnitude_mapping = {"b": 0, "k": 1, "m": 2, "g": 3, "t": 4}
1✔
583
        if cls._is_memory_string(value):
1✔
584
            integer_pattern = r"[0-9]+"
1✔
585
            magnitude_pattern = r"[bBkKmMgGtT]+"
1✔
586
            integer_value = int(re.findall(integer_pattern, value)[0])
1✔
587

588
            magnitude = re.findall(magnitude_pattern, value)
1✔
589
            if len(magnitude) > 0:
1✔
590
                magnitude = magnitude[0].lower()
1✔
591
            else:
592
                magnitude = default_magnitude.lower()
1✔
593
            # Convert it to default magnitude = megabytes
594
            return (integer_value * 1024 ** magnitude_mapping[magnitude]) / (
1✔
595
                1024 ** magnitude_mapping[target_magnitude]
1✔
596
            )
597
        else:
598
            return value
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc