• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / pysqa / 9825005101

07 Jul 2024 05:48AM UTC coverage: 83.114%. Remained the same
9825005101

Pull #311

github

web-flow
Merge f205d4b02 into e80dd8103
Pull Request #311: Update conda-incubator/setup-miniconda from v2 to v3

886 of 1066 relevant lines covered (83.11%)

0.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.48
/pysqa/utils/basic.py
1
# coding: utf-8
2
# Copyright (c) Jan Janssen
3

4
import getpass
1✔
5
import importlib
1✔
6
import os
1✔
7
import re
1✔
8
from typing import List, Optional
1✔
9

10
import pandas
1✔
11
from jinja2 import Template
1✔
12
from jinja2.exceptions import TemplateSyntaxError
1✔
13

14
from pysqa.utils.execute import execute_command
1✔
15
from pysqa.utils.queues import Queues
1✔
16

17

18
class BasisQueueAdapter(object):
1✔
19
    """
20
    The goal of the QueueAdapter class is to make submitting to a queue system as easy as starting another sub process
21
    locally.
22

23
    Args:
24
        config (dict):
25
        directory (str): directory containing the queue.yaml files as well as corresponding jinja2 templates for the
26
                         individual queues.
27
        execute_command(funct):
28

29
    Attributes:
30

31
        .. attribute:: config
32

33
            QueueAdapter configuration read from the queue.yaml file.
34

35
        .. attribute:: queue_list
36

37
            List of available queues
38

39
        .. attribute:: queue_view
40

41
            Pandas DataFrame representation of the available queues, read from queue.yaml.
42

43
        .. attribute:: queues
44

45
            Queues available for auto completion QueueAdapter().queues.<queue name> returns the queue name.
46
    """
47

48
    def __init__(
1✔
49
        self,
50
        config: dict,
51
        directory: str = "~/.queues",
52
        execute_command: callable = execute_command,
53
    ):
54
        self._config = config
1✔
55
        self._fill_queue_dict(queue_lst_dict=self._config["queues"])
1✔
56
        self._load_templates(queue_lst_dict=self._config["queues"], directory=directory)
1✔
57
        if self._config["queue_type"] == "SGE":
1✔
58
            class_name = "SunGridEngineCommands"
1✔
59
            module_name = "pysqa.wrapper.sge"
1✔
60
        elif self._config["queue_type"] == "TORQUE":
1✔
61
            class_name = "TorqueCommands"
1✔
62
            module_name = "pysqa.wrapper.torque"
1✔
63
        elif self._config["queue_type"] == "SLURM":
1✔
64
            class_name = "SlurmCommands"
1✔
65
            module_name = "pysqa.wrapper.slurm"
1✔
66
        elif self._config["queue_type"] == "LSF":
1✔
67
            class_name = "LsfCommands"
1✔
68
            module_name = "pysqa.wrapper.lsf"
1✔
69
        elif self._config["queue_type"] == "MOAB":
1✔
70
            class_name = "MoabCommands"
1✔
71
            module_name = "pysqa.wrapper.moab"
1✔
72
        elif self._config["queue_type"] == "GENT":
1✔
73
            class_name = "GentCommands"
1✔
74
            module_name = "pysqa.wrapper.gent"
1✔
75
        elif self._config["queue_type"] == "REMOTE":
1✔
76
            class_name = None
1✔
77
            module_name = None
1✔
78
        elif self._config["queue_type"] == "FLUX":
1✔
79
            class_name = "FluxCommands"
1✔
80
            module_name = "pysqa.wrapper.flux"
1✔
81
        else:
82
            raise ValueError(
1✔
83
                "The queue_type "
84
                + self._config["queue_type"]
85
                + " is not found in the list of supported queue types "
86
                + str(
87
                    ["SGE", "TORQUE", "SLURM", "LSF", "MOAB", "FLUX", "GENT", "REMOTE"]
88
                )
89
            )
90
        if self._config["queue_type"] != "REMOTE":
1✔
91
            self._commands = getattr(importlib.import_module(module_name), class_name)()
1✔
92
        self._queues = Queues(self.queue_list)
1✔
93
        self._remote_flag = False
1✔
94
        self._ssh_delete_file_on_remote = True
1✔
95
        self._execute_command_function = execute_command
1✔
96

97
    @property
1✔
98
    def ssh_delete_file_on_remote(self) -> bool:
1✔
99
        return self._ssh_delete_file_on_remote
1✔
100

101
    @property
1✔
102
    def remote_flag(self) -> bool:
1✔
103
        return self._remote_flag
1✔
104

105
    @property
1✔
106
    def config(self):
1✔
107
        """
108

109
        Returns:
110
            dict:
111
        """
112
        return self._config
1✔
113

114
    @property
1✔
115
    def queue_list(self) -> list:
1✔
116
        """
117

118
        Returns:
119
            list:
120
        """
121
        return list(self._config["queues"].keys())
1✔
122

123
    @property
1✔
124
    def queue_view(self) -> pandas.DataFrame:
1✔
125
        """
126

127
        Returns:
128
            pandas.DataFrame:
129
        """
130
        return pandas.DataFrame(self._config["queues"]).T.drop(
1✔
131
            ["script", "template"], axis=1
132
        )
133

134
    @property
1✔
135
    def queues(self):
1✔
136
        return self._queues
1✔
137

138
    def submit_job(
1✔
139
        self,
140
        queue: Optional[str] = None,
141
        job_name: Optional[str] = None,
142
        working_directory: Optional[str] = None,
143
        cores: Optional[int] = None,
144
        memory_max: Optional[int] = None,
145
        run_time_max: Optional[int] = None,
146
        dependency_list: Optional[list[str]] = None,
147
        command: Optional[str] = None,
148
        **kwargs,
149
    ) -> int:
150
        """
151

152
        Args:
153
            queue (str/None):
154
            job_name (str/None):
155
            working_directory (str/None):
156
            cores (int/None):
157
            memory_max (int/None):
158
            run_time_max (int/None):
159
            dependency_list (list[str]/None:
160
            command (str/None):
161

162
        Returns:
163
            int:
164
        """
165
        if working_directory is not None and " " in working_directory:
1✔
166
            raise ValueError(
1✔
167
                "Whitespaces in the working_directory name are not supported!"
168
            )
169
        working_directory, queue_script_path = self._write_queue_script(
1✔
170
            queue=queue,
171
            job_name=job_name,
172
            working_directory=working_directory,
173
            cores=cores,
174
            memory_max=memory_max,
175
            run_time_max=run_time_max,
176
            command=command,
177
            **kwargs,
178
        )
179
        out = self._execute_command(
1✔
180
            commands=self._list_command_to_be_executed(
181
                queue_script_path=queue_script_path
182
            ),
183
            working_directory=working_directory,
184
            split_output=False,
185
        )
186
        if out is not None:
1✔
187
            return self._commands.get_job_id_from_output(out)
1✔
188
        else:
189
            return None
1✔
190

191
    def _list_command_to_be_executed(self, queue_script_path: str) -> list:
1✔
192
        return self._commands.submit_job_command + [queue_script_path]
1✔
193

194
    def enable_reservation(self, process_id: int):
1✔
195
        """
196

197
        Args:
198
            process_id (int):
199

200
        Returns:
201
            str:
202
        """
203
        out = self._execute_command(
×
204
            commands=self._commands.enable_reservation_command + [str(process_id)],
205
            split_output=True,
206
        )
207
        if out is not None:
×
208
            return out[0]
×
209
        else:
210
            return None
×
211

212
    def delete_job(self, process_id: int) -> str:
1✔
213
        """
214

215
        Args:
216
            process_id (int):
217

218
        Returns:
219
            str:
220
        """
221
        out = self._execute_command(
1✔
222
            commands=self._commands.delete_job_command + [str(process_id)],
223
            split_output=True,
224
        )
225
        if out is not None:
1✔
226
            return out[0]
1✔
227
        else:
228
            return None
1✔
229

230
    def get_queue_status(self, user: Optional[str] = None) -> pandas.DataFrame:
1✔
231
        """
232

233
        Args:
234
            user (str):
235

236
        Returns:
237
            pandas.DataFrame:
238
        """
239
        out = self._execute_command(
1✔
240
            commands=self._commands.get_queue_status_command, split_output=False
241
        )
242
        df = self._commands.convert_queue_status(queue_status_output=out)
1✔
243
        if user is None:
1✔
244
            return df
1✔
245
        else:
246
            return df[df["user"] == user]
1✔
247

248
    def get_status_of_my_jobs(self) -> pandas.DataFrame:
1✔
249
        """
250

251
        Returns:
252
           pandas.DataFrame:
253
        """
254
        return self.get_queue_status(user=self._get_user())
×
255

256
    def get_status_of_job(self, process_id: int) -> str:
1✔
257
        """
258

259
        Args:
260
            process_id:
261

262
        Returns:
263
             str: ['running', 'pending', 'error']
264
        """
265
        df = self.get_queue_status()
1✔
266
        df_selected = df[df["jobid"] == process_id]["status"]
1✔
267
        if len(df_selected) != 0:
1✔
268
            return df_selected.values[0]
1✔
269
        else:
270
            return None
1✔
271

272
    def get_status_of_jobs(self, process_id_lst: list[int]) -> list[str]:
1✔
273
        """
274

275
        Args:
276
            process_id_lst list[int]:
277

278
        Returns:
279
             list[str]: ['running', 'pending', 'error', ...]
280
        """
281
        df = self.get_queue_status()
1✔
282
        results_lst = []
1✔
283
        for process_id in process_id_lst:
1✔
284
            df_selected = df[df["jobid"] == process_id]["status"]
1✔
285
            if len(df_selected) != 0:
1✔
286
                results_lst.append(df_selected.values[0])
1✔
287
            else:
288
                results_lst.append("finished")
1✔
289
        return results_lst
1✔
290

291
    def get_job_from_remote(self, working_directory: str):
1✔
292
        """
293
        Get the results of the calculation - this is necessary when the calculation was executed on a remote host.
294
        """
295
        raise NotImplementedError
1✔
296

297
    def convert_path_to_remote(self, path: str):
1✔
298
        raise NotImplementedError
1✔
299

300
    def transfer_file(
1✔
301
        self,
302
        file: str,
303
        transfer_back: bool = False,
304
        delete_file_on_remote: bool = False,
305
    ):
306
        raise NotImplementedError
1✔
307

308
    def check_queue_parameters(
1✔
309
        self,
310
        queue: str,
311
        cores: int = 1,
312
        run_time_max: Optional[int] = None,
313
        memory_max: Optional[int] = None,
314
        active_queue: Optional[dict] = None,
315
    ) -> list:
316
        """
317

318
        Args:
319
            queue (str/None):
320
            cores (int):
321
            run_time_max (int/None):
322
            memory_max (int/None):
323
            active_queue (dict):
324

325
        Returns:
326
            list: [cores, run_time_max, memory_max]
327
        """
328
        if active_queue is None:
1✔
329
            active_queue = self._config["queues"][queue]
1✔
330
        cores = self._value_in_range(
1✔
331
            value=cores,
332
            value_min=active_queue["cores_min"],
333
            value_max=active_queue["cores_max"],
334
        )
335
        run_time_max = self._value_in_range(
1✔
336
            value=run_time_max, value_max=active_queue["run_time_max"]
337
        )
338
        memory_max = self._value_in_range(
1✔
339
            value=memory_max, value_max=active_queue["memory_max"]
340
        )
341
        return cores, run_time_max, memory_max
1✔
342

343
    def _write_queue_script(
1✔
344
        self,
345
        queue: Optional[str] = None,
346
        job_name: Optional[str] = None,
347
        working_directory: Optional[str] = None,
348
        cores: Optional[int] = None,
349
        memory_max: Optional[int] = None,
350
        run_time_max: Optional[int] = None,
351
        dependency_list: Optional[List[int]] = None,
352
        command: Optional[str] = None,
353
        **kwargs,
354
    ):
355
        """
356

357
        Args:
358
            queue (str/None):
359
            job_name (str/None):
360
            working_directory (str/None):
361
            cores (int/None):
362
            memory_max (int/None):
363
            run_time_max (int/None):
364
            command (str/None):
365

366
        """
367
        if isinstance(command, list):
1✔
368
            command = "".join(command)
×
369
        if working_directory is None:
1✔
370
            working_directory = "."
1✔
371
        queue_script = self._job_submission_template(
1✔
372
            queue=queue,
373
            job_name=job_name,
374
            working_directory=working_directory,
375
            cores=cores,
376
            memory_max=memory_max,
377
            run_time_max=run_time_max,
378
            dependency_list=dependency_list,
379
            command=command,
380
            **kwargs,
381
        )
382
        if not os.path.exists(working_directory):
1✔
383
            os.makedirs(working_directory)
×
384
        queue_script_path = os.path.join(working_directory, "run_queue.sh")
1✔
385
        with open(queue_script_path, "w") as f:
1✔
386
            f.writelines(queue_script)
1✔
387
        return working_directory, queue_script_path
1✔
388

389
    def _job_submission_template(
1✔
390
        self,
391
        queue: Optional[str] = None,
392
        job_name: str = "job.py",
393
        working_directory: str = ".",
394
        cores: Optional[int] = None,
395
        memory_max: Optional[int] = None,
396
        run_time_max: Optional[int] = None,
397
        dependency_list: Optional[List[int]] = None,
398
        command: Optional[str] = None,
399
        **kwargs,
400
    ) -> str:
401
        """
402

403
        Args:
404
            queue (str/None):
405
            job_name (str):
406
            working_directory (str):
407
            cores (int/None):
408
            memory_max (int/None):
409
            run_time_max (int/None):
410
            dependency_list (list/None):
411
            command (str/None):
412

413
        Returns:
414
            str:
415
        """
416
        if queue is None:
1✔
417
            queue = self._config["queue_primary"]
1✔
418
        self._value_error_if_none(value=command)
1✔
419
        if queue not in self.queue_list:
1✔
420
            raise ValueError(
1✔
421
                "The queue "
422
                + queue
423
                + " was not found in the list of queues: "
424
                + str(self.queue_list)
425
            )
426
        active_queue = self._config["queues"][queue]
1✔
427
        cores, run_time_max, memory_max = self.check_queue_parameters(
1✔
428
            queue=None,
429
            cores=cores,
430
            run_time_max=run_time_max,
431
            memory_max=memory_max,
432
            active_queue=active_queue,
433
        )
434
        template = active_queue["template"]
1✔
435
        return template.render(
1✔
436
            job_name=job_name,
437
            working_directory=working_directory,
438
            cores=cores,
439
            memory_max=memory_max,
440
            run_time_max=run_time_max,
441
            command=command,
442
            dependency_list=dependency_list,
443
            **kwargs,
444
        )
445

446
    def _execute_command(
1✔
447
        self,
448
        commands: str,
449
        working_directory: Optional[str] = None,
450
        split_output: bool = True,
451
        shell: bool = False,
452
        error_filename: str = "pysqa.err",
453
    ) -> str:
454
        """
455

456
        Args:
457
            commands (list/str):
458
            working_directory (str):
459
            split_output (bool):
460
            shell (bool):
461
            error_filename (str):
462

463
        Returns:
464
            str:
465
        """
466
        return self._execute_command_function(
1✔
467
            commands=commands,
468
            working_directory=working_directory,
469
            split_output=split_output,
470
            shell=shell,
471
            error_filename=error_filename,
472
        )
473

474
    @staticmethod
1✔
475
    def _get_user() -> str:
1✔
476
        """
477

478
        Returns:
479
            str:
480
        """
481
        return getpass.getuser()
1✔
482

483
    @staticmethod
1✔
484
    def _fill_queue_dict(queue_lst_dict: dict):
1✔
485
        """
486

487
        Args:
488
            queue_lst_dict (dict):
489
        """
490
        queue_keys = ["cores_min", "cores_max", "run_time_max", "memory_max"]
1✔
491
        for queue_dict in queue_lst_dict.values():
1✔
492
            for key in set(queue_keys) - set(queue_dict.keys()):
1✔
493
                queue_dict[key] = None
1✔
494

495
    @staticmethod
1✔
496
    def _load_templates(queue_lst_dict: dict, directory: str = "."):
1✔
497
        """
498

499
        Args:
500
            queue_lst_dict (dict):
501
            directory (str):
502
        """
503
        for queue_dict in queue_lst_dict.values():
1✔
504
            if "script" in queue_dict.keys():
1✔
505
                with open(os.path.join(directory, queue_dict["script"]), "r") as f:
1✔
506
                    try:
1✔
507
                        queue_dict["template"] = Template(f.read())
1✔
508
                    except TemplateSyntaxError as error:
1✔
509
                        raise TemplateSyntaxError(
1✔
510
                            message="File: "
511
                            + queue_dict["script"]
512
                            + " - "
513
                            + error.message,
514
                            lineno=error.lineno,
515
                        )
516

517
    @staticmethod
1✔
518
    def _value_error_if_none(value: str):
1✔
519
        """
520

521
        Args:
522
            value (str/None):
523
        """
524
        if value is None:
1✔
525
            raise ValueError()
1✔
526
        if not isinstance(value, str):
1✔
527
            raise TypeError()
1✔
528

529
    @classmethod
1✔
530
    def _value_in_range(cls, value, value_min=None, value_max=None):
1✔
531
        """
532

533
        Args:
534
            value (int/float/None):
535
            value_min (int/float/None):
536
            value_max (int/float/None):
537

538
        Returns:
539
            int/float/None:
540
        """
541

542
        if value is not None:
1✔
543
            value_, value_min_, value_max_ = [
1✔
544
                (
545
                    cls._memory_spec_string_to_value(v)
546
                    if v is not None and isinstance(v, str)
547
                    else v
548
                )
549
                for v in (value, value_min, value_max)
550
            ]
551
            # ATTENTION: '60000' is interpreted as '60000M' since default magnitude is 'M'
552
            # ATTENTION: int('60000') is interpreted as '60000B' since _memory_spec_string_to_value return the size in
553
            # ATTENTION: bytes, as target_magnitude = 'b'
554
            # We want to compare the the actual (k,m,g)byte value if there is any
555
            if value_min_ is not None and value_ < value_min_:
1✔
556
                return value_min
1✔
557
            if value_max_ is not None and value_ > value_max_:
1✔
558
                return value_max
1✔
559
            return value
1✔
560
        else:
561
            if value_min is not None:
1✔
562
                return value_min
1✔
563
            if value_max is not None:
1✔
564
                return value_max
1✔
565
            return value
1✔
566

567
    @staticmethod
1✔
568
    def _is_memory_string(value: str) -> bool:
1✔
569
        """
570
        Tests a string if it specifies a certain amount of memory e.g.: '20G', '60b'. Also pure integer strings are
571
        also valid.
572

573
        Args:
574
            value (str): the string to test
575

576
        Returns:
577
            (bool): A boolean value if the string matches a memory specification
578
        """
579
        memory_spec_pattern = r"[0-9]+[bBkKmMgGtT]?"
1✔
580
        return re.findall(memory_spec_pattern, value)[0] == value
1✔
581

582
    @classmethod
1✔
583
    def _memory_spec_string_to_value(
1✔
584
        cls, value: str, default_magnitude: str = "m", target_magnitude: str = "b"
585
    ):
586
        """
587
        Converts a valid memory string (tested by _is_memory_string) into an integer/float value of desired
588
        magnitude `default_magnitude`. If it is a plain integer string (e.g.: '50000') it will be interpreted with
589
        the magnitude passed in by the `default_magnitude`. The output will rescaled to `target_magnitude`
590

591
        Args:
592
            value (str): the string
593
            default_magnitude (str): magnitude for interpreting plain integer strings [b, B, k, K, m, M, g, G, t, T]
594
            target_magnitude (str): to which the output value should be converted [b, B, k, K, m, M, g, G, t, T]
595

596
        Returns:
597
            (float/int): the value of the string in `target_magnitude` units
598
        """
599
        magnitude_mapping = {"b": 0, "k": 1, "m": 2, "g": 3, "t": 4}
1✔
600
        if cls._is_memory_string(value):
1✔
601
            integer_pattern = r"[0-9]+"
1✔
602
            magnitude_pattern = r"[bBkKmMgGtT]+"
1✔
603
            integer_value = int(re.findall(integer_pattern, value)[0])
1✔
604

605
            magnitude = re.findall(magnitude_pattern, value)
1✔
606
            if len(magnitude) > 0:
1✔
607
                magnitude = magnitude[0].lower()
1✔
608
            else:
609
                magnitude = default_magnitude.lower()
1✔
610
            # Convert it to default magnitude = megabytes
611
            return (integer_value * 1024 ** magnitude_mapping[magnitude]) / (
1✔
612
                1024 ** magnitude_mapping[target_magnitude]
613
            )
614
        else:
615
            return value
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc