• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / pysqa / 5113499123

pending completion
5113499123

Pull #193

github-actions

web-flow
Merge e418d0686 into 287649f84
Pull Request #193: Bump paramiko from 3.1.0 to 3.2.0

713 of 849 relevant lines covered (83.98%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.39
/pysqa/utils/basic.py
1
# coding: utf-8
2
# Copyright (c) Jan Janssen
3

4
import getpass
1✔
5
import importlib
1✔
6
import os
1✔
7
import re
1✔
8

9
import pandas
1✔
10
from jinja2 import Template
1✔
11

12
from pysqa.utils.execute import execute_command
1✔
13
from pysqa.utils.queues import Queues
1✔
14

15

16
class BasisQueueAdapter(object):
1✔
17
    """
18
    The goal of the QueueAdapter class is to make submitting to a queue system as easy as starting another sub process
19
    locally.
20

21
    Args:
22
        config (dict):
23
        directory (str): directory containing the queue.yaml files as well as corresponding jinja2 templates for the
24
                         individual queues.
25
        execute_command(funct):
26

27
    Attributes:
28

29
        .. attribute:: config
30

31
            QueueAdapter configuration read from the queue.yaml file.
32

33
        .. attribute:: queue_list
34

35
            List of available queues
36

37
        .. attribute:: queue_view
38

39
            Pandas DataFrame representation of the available queues, read from queue.yaml.
40

41
        .. attribute:: queues
42

43
            Queues available for auto completion QueueAdapter().queues.<queue name> returns the queue name.
44
    """
45

46
    def __init__(self, config, directory="~/.queues", execute_command=execute_command):
1✔
47
        self._config = config
1✔
48
        self._fill_queue_dict(queue_lst_dict=self._config["queues"])
1✔
49
        self._load_templates(queue_lst_dict=self._config["queues"], directory=directory)
1✔
50
        if self._config["queue_type"] == "SGE":
1✔
51
            class_name = "SunGridEngineCommands"
1✔
52
            module_name = "pysqa.wrapper.sge"
1✔
53
        elif self._config["queue_type"] == "TORQUE":
1✔
54
            class_name = "TorqueCommands"
1✔
55
            module_name = "pysqa.wrapper.torque"
1✔
56
        elif self._config["queue_type"] == "SLURM":
1✔
57
            class_name = "SlurmCommands"
1✔
58
            module_name = "pysqa.wrapper.slurm"
1✔
59
        elif self._config["queue_type"] == "LSF":
1✔
60
            class_name = "LsfCommands"
1✔
61
            module_name = "pysqa.wrapper.lsf"
1✔
62
        elif self._config["queue_type"] == "MOAB":
1✔
63
            class_name = "MoabCommands"
1✔
64
            module_name = "pysqa.wrapper.moab"
1✔
65
        elif self._config["queue_type"] == "GENT":
1✔
66
            class_name = "GentCommands"
1✔
67
            module_name = "pysqa.wrapper.gent"
1✔
68
        elif self._config["queue_type"] == "REMOTE":
1✔
69
            class_name = None
1✔
70
            module_name = None
1✔
71
        elif self._config["queue_type"] == "FLUX":
1✔
72
            class_name = "FluxCommands"
1✔
73
            module_name = "pysqa.wrapper.flux"
1✔
74
        else:
75
            raise ValueError(
1✔
76
                "The queue_type "
77
                + self._config["queue_type"]
78
                + " is not found in the list of supported queue types "
79
                + str(
80
                    ["SGE", "TORQUE", "SLURM", "LSF", "MOAB", "FLUX", "GENT", "REMOTE"]
81
                )
82
            )
83
        if self._config["queue_type"] != "REMOTE":
1✔
84
            self._commands = getattr(importlib.import_module(module_name), class_name)()
1✔
85
        self._queues = Queues(self.queue_list)
1✔
86
        self._remote_flag = False
1✔
87
        self._ssh_delete_file_on_remote = True
1✔
88
        self._execute_command_function = execute_command
1✔
89

90
    @property
1✔
91
    def ssh_delete_file_on_remote(self):
1✔
92
        return self._ssh_delete_file_on_remote
1✔
93

94
    @property
1✔
95
    def remote_flag(self):
1✔
96
        return self._remote_flag
1✔
97

98
    @property
1✔
99
    def config(self):
1✔
100
        """
101

102
        Returns:
103
            dict:
104
        """
105
        return self._config
1✔
106

107
    @property
1✔
108
    def queue_list(self):
1✔
109
        """
110

111
        Returns:
112
            list:
113
        """
114
        return list(self._config["queues"].keys())
1✔
115

116
    @property
1✔
117
    def queue_view(self):
1✔
118
        """
119

120
        Returns:
121
            pandas.DataFrame:
122
        """
123
        return pandas.DataFrame(self._config["queues"]).T.drop(
1✔
124
            ["script", "template"], axis=1
125
        )
126

127
    @property
1✔
128
    def queues(self):
1✔
129
        return self._queues
1✔
130

131
    def submit_job(
1✔
132
        self,
133
        queue=None,
134
        job_name=None,
135
        working_directory=None,
136
        cores=None,
137
        memory_max=None,
138
        run_time_max=None,
139
        dependency_list=None,
140
        command=None,
141
        **kwargs
142
    ):
143
        """
144

145
        Args:
146
            queue (str/None):
147
            job_name (str/None):
148
            working_directory (str/None):
149
            cores (int/None):
150
            memory_max (int/None):
151
            run_time_max (int/None):
152
            dependency_list (list[str]/None:
153
            command (str/None):
154

155
        Returns:
156
            int:
157
        """
158
        if " " in working_directory:
1✔
159
            raise ValueError(
1✔
160
                "Whitespaces in the working_directory name are not supported!"
161
            )
162
        working_directory, queue_script_path = self._write_queue_script(
1✔
163
            queue=queue,
164
            job_name=job_name,
165
            working_directory=working_directory,
166
            cores=cores,
167
            memory_max=memory_max,
168
            run_time_max=run_time_max,
169
            command=command,
170
            **kwargs
171
        )
172
        out = self._execute_command(
1✔
173
            commands=self._list_command_to_be_executed(
174
                dependency_list, queue_script_path
175
            ),
176
            working_directory=working_directory,
177
            split_output=False,
178
        )
179
        if out is not None:
1✔
180
            return self._commands.get_job_id_from_output(out)
1✔
181
        else:
182
            return None
1✔
183

184
    def _list_command_to_be_executed(self, dependency_list, queue_script_path) -> list:
1✔
185
        return (
1✔
186
            self._commands.submit_job_command
187
            + self._commands.dependencies(dependency_list)
188
            + [queue_script_path]
189
        )
190

191
    def enable_reservation(self, process_id):
1✔
192
        """
193

194
        Args:
195
            process_id (int):
196

197
        Returns:
198
            str:
199
        """
200
        out = self._execute_command(
×
201
            commands=self._commands.enable_reservation_command + [str(process_id)],
202
            split_output=True,
203
        )
204
        if out is not None:
×
205
            return out[0]
×
206
        else:
207
            return None
×
208

209
    def delete_job(self, process_id):
1✔
210
        """
211

212
        Args:
213
            process_id (int):
214

215
        Returns:
216
            str:
217
        """
218
        out = self._execute_command(
1✔
219
            commands=self._commands.delete_job_command + [str(process_id)],
220
            split_output=True,
221
        )
222
        if out is not None:
1✔
223
            return out[0]
1✔
224
        else:
225
            return None
1✔
226

227
    def get_queue_status(self, user=None):
1✔
228
        """
229

230
        Args:
231
            user (str):
232

233
        Returns:
234
            pandas.DataFrame:
235
        """
236
        out = self._execute_command(
1✔
237
            commands=self._commands.get_queue_status_command, split_output=False
238
        )
239
        df = self._commands.convert_queue_status(queue_status_output=out)
1✔
240
        if user is None:
1✔
241
            return df
1✔
242
        else:
243
            return df[df["user"] == user]
1✔
244

245
    def get_status_of_my_jobs(self):
1✔
246
        """
247

248
        Returns:
249
           pandas.DataFrame:
250
        """
251
        return self.get_queue_status(user=self._get_user())
×
252

253
    def get_status_of_job(self, process_id):
1✔
254
        """
255

256
        Args:
257
            process_id:
258

259
        Returns:
260
             str: ['running', 'pending', 'error']
261
        """
262
        df = self.get_queue_status()
1✔
263
        df_selected = df[df["jobid"] == process_id]["status"]
1✔
264
        if len(df_selected) != 0:
1✔
265
            return df_selected.values[0]
1✔
266
        else:
267
            return None
1✔
268

269
    def get_status_of_jobs(self, process_id_lst):
1✔
270
        """
271

272
        Args:
273
            process_id_lst:
274

275
        Returns:
276
             list: ['running', 'pending', 'error', ...]
277
        """
278
        df = self.get_queue_status()
1✔
279
        results_lst = []
1✔
280
        for process_id in process_id_lst:
1✔
281
            df_selected = df[df["jobid"] == process_id]["status"]
1✔
282
            if len(df_selected) != 0:
1✔
283
                results_lst.append(df_selected.values[0])
1✔
284
            else:
285
                results_lst.append("finished")
1✔
286
        return results_lst
1✔
287

288
    def get_job_from_remote(self, working_directory):
1✔
289
        """
290
        Get the results of the calculation - this is necessary when the calculation was executed on a remote host.
291
        """
292
        raise NotImplementedError
1✔
293

294
    def convert_path_to_remote(self, path):
1✔
295
        raise NotImplementedError
1✔
296

297
    def transfer_file(self, file, transfer_back=False):
1✔
298
        raise NotImplementedError
1✔
299

300
    def check_queue_parameters(
1✔
301
        self, queue, cores=1, run_time_max=None, memory_max=None, active_queue=None
302
    ):
303
        """
304

305
        Args:
306
            queue (str/None):
307
            cores (int):
308
            run_time_max (int/None):
309
            memory_max (int/None):
310
            active_queue (dict):
311

312
        Returns:
313
            list: [cores, run_time_max, memory_max]
314
        """
315
        if active_queue is None:
1✔
316
            active_queue = self._config["queues"][queue]
1✔
317
        cores = self._value_in_range(
1✔
318
            value=cores,
319
            value_min=active_queue["cores_min"],
320
            value_max=active_queue["cores_max"],
321
        )
322
        run_time_max = self._value_in_range(
1✔
323
            value=run_time_max, value_max=active_queue["run_time_max"]
324
        )
325
        memory_max = self._value_in_range(
1✔
326
            value=memory_max, value_max=active_queue["memory_max"]
327
        )
328
        return cores, run_time_max, memory_max
1✔
329

330
    def _write_queue_script(
1✔
331
        self,
332
        queue=None,
333
        job_name=None,
334
        working_directory=None,
335
        cores=None,
336
        memory_max=None,
337
        run_time_max=None,
338
        command=None,
339
        **kwargs
340
    ):
341
        """
342

343
        Args:
344
            queue (str/None):
345
            job_name (str/None):
346
            working_directory (str/None):
347
            cores (int/None):
348
            memory_max (int/None):
349
            run_time_max (int/None):
350
            command (str/None):
351

352
        """
353
        if isinstance(command, list):
1✔
354
            command = "".join(command)
×
355
        if working_directory is None:
1✔
356
            working_directory = "."
1✔
357
        queue_script = self._job_submission_template(
1✔
358
            queue=queue,
359
            job_name=job_name,
360
            working_directory=working_directory,
361
            cores=cores,
362
            memory_max=memory_max,
363
            run_time_max=run_time_max,
364
            command=command,
365
            **kwargs
366
        )
367
        if not os.path.exists(working_directory):
1✔
368
            os.makedirs(working_directory)
×
369
        queue_script_path = os.path.join(working_directory, "run_queue.sh")
1✔
370
        with open(queue_script_path, "w") as f:
1✔
371
            f.writelines(queue_script)
1✔
372
        return working_directory, queue_script_path
1✔
373

374
    def _job_submission_template(
1✔
375
        self,
376
        queue=None,
377
        job_name="job.py",
378
        working_directory=".",
379
        cores=None,
380
        memory_max=None,
381
        run_time_max=None,
382
        command=None,
383
        **kwargs
384
    ):
385
        """
386

387
        Args:
388
            queue (str/None):
389
            job_name (str):
390
            working_directory (str):
391
            cores (int/None):
392
            memory_max (int/None):
393
            run_time_max (int/None):
394
            command (str/None):
395

396
        Returns:
397
            str:
398
        """
399
        if queue is None:
1✔
400
            queue = self._config["queue_primary"]
1✔
401
        self._value_error_if_none(value=command)
1✔
402
        if queue not in self.queue_list:
1✔
403
            raise ValueError(
1✔
404
                "The queue "
405
                + queue
406
                + " was not found in the list of queues: "
407
                + str(self.queue_list)
408
            )
409
        active_queue = self._config["queues"][queue]
1✔
410
        cores, run_time_max, memory_max = self.check_queue_parameters(
1✔
411
            queue=None,
412
            cores=cores,
413
            run_time_max=run_time_max,
414
            memory_max=memory_max,
415
            active_queue=active_queue,
416
        )
417
        template = active_queue["template"]
1✔
418
        return template.render(
1✔
419
            job_name=job_name,
420
            working_directory=working_directory,
421
            cores=cores,
422
            memory_max=memory_max,
423
            run_time_max=run_time_max,
424
            command=command,
425
            **kwargs
426
        )
427

428
    def _execute_command(
1✔
429
        self,
430
        commands,
431
        working_directory=None,
432
        split_output=True,
433
        shell=False,
434
        error_filename="pysqa.err",
435
    ):
436
        """
437

438
        Args:
439
            commands (list/str):
440
            working_directory (str):
441
            split_output (bool):
442
            shell (bool):
443
            error_filename (str):
444

445
        Returns:
446
            str:
447
        """
448
        return self._execute_command_function(
1✔
449
            commands=commands,
450
            working_directory=working_directory,
451
            split_output=split_output,
452
            shell=shell,
453
            error_filename=error_filename,
454
        )
455

456
    @staticmethod
1✔
457
    def _get_user():
1✔
458
        """
459

460
        Returns:
461
            str:
462
        """
463
        return getpass.getuser()
1✔
464

465
    @staticmethod
1✔
466
    def _fill_queue_dict(queue_lst_dict):
1✔
467
        """
468

469
        Args:
470
            queue_lst_dict (dict):
471
        """
472
        queue_keys = ["cores_min", "cores_max", "run_time_max", "memory_max"]
1✔
473
        for queue_dict in queue_lst_dict.values():
1✔
474
            for key in set(queue_keys) - set(queue_dict.keys()):
1✔
475
                queue_dict[key] = None
1✔
476

477
    @staticmethod
1✔
478
    def _load_templates(queue_lst_dict, directory="."):
1✔
479
        """
480

481
        Args:
482
            queue_lst_dict (dict):
483
            directory (str):
484
        """
485
        for queue_dict in queue_lst_dict.values():
1✔
486
            if "script" in queue_dict.keys():
1✔
487
                with open(os.path.join(directory, queue_dict["script"]), "r") as f:
1✔
488
                    queue_dict["template"] = Template(f.read())
1✔
489

490
    @staticmethod
1✔
491
    def _value_error_if_none(value):
1✔
492
        """
493

494
        Args:
495
            value (str/None):
496
        """
497
        if value is None:
1✔
498
            raise ValueError()
1✔
499
        if not isinstance(value, str):
1✔
500
            raise TypeError()
1✔
501

502
    @classmethod
1✔
503
    def _value_in_range(cls, value, value_min=None, value_max=None):
1✔
504
        """
505

506
        Args:
507
            value (int/float/None):
508
            value_min (int/float/None):
509
            value_max (int/float/None):
510

511
        Returns:
512
            int/float/None:
513
        """
514

515
        if value is not None:
1✔
516
            value_, value_min_, value_max_ = [
1✔
517
                cls._memory_spec_string_to_value(v)
518
                if v is not None and isinstance(v, str)
519
                else v
520
                for v in (value, value_min, value_max)
521
            ]
522
            # ATTENTION: '60000' is interpreted as '60000M' since default magnitude is 'M'
523
            # ATTENTION: int('60000') is interpreted as '60000B' since _memory_spec_string_to_value return the size in
524
            # ATTENTION: bytes, as target_magnitude = 'b'
525
            # We want to compare the the actual (k,m,g)byte value if there is any
526
            if value_min_ is not None and value_ < value_min_:
1✔
527
                return value_min
1✔
528
            if value_max_ is not None and value_ > value_max_:
1✔
529
                return value_max
1✔
530
            return value
1✔
531
        else:
532
            if value_min is not None:
1✔
533
                return value_min
1✔
534
            if value_max is not None:
1✔
535
                return value_max
1✔
536
            return value
1✔
537

538
    @staticmethod
1✔
539
    def _is_memory_string(value):
1✔
540
        """
541
        Tests a string if it specifies a certain amount of memory e.g.: '20G', '60b'. Also pure integer strings are
542
        also valid.
543

544
        Args:
545
            value (str): the string to test
546

547
        Returns:
548
            (bool): A boolean value if the string matches a memory specification
549
        """
550
        memory_spec_pattern = r"[0-9]+[bBkKmMgGtT]?"
1✔
551
        return re.findall(memory_spec_pattern, value)[0] == value
1✔
552

553
    @classmethod
1✔
554
    def _memory_spec_string_to_value(
1✔
555
        cls, value, default_magnitude="m", target_magnitude="b"
556
    ):
557
        """
558
        Converts a valid memory string (tested by _is_memory_string) into an integer/float value of desired
559
        magnitude `default_magnitude`. If it is a plain integer string (e.g.: '50000') it will be interpreted with
560
        the magnitude passed in by the `default_magnitude`. The output will rescaled to `target_magnitude`
561

562
        Args:
563
            value (str): the string
564
            default_magnitude (str): magnitude for interpreting plain integer strings [b, B, k, K, m, M, g, G, t, T]
565
            target_magnitude (str): to which the output value should be converted [b, B, k, K, m, M, g, G, t, T]
566

567
        Returns:
568
            (float/int): the value of the string in `target_magnitude` units
569
        """
570
        magnitude_mapping = {"b": 0, "k": 1, "m": 2, "g": 3, "t": 4}
1✔
571
        if cls._is_memory_string(value):
1✔
572
            integer_pattern = r"[0-9]+"
1✔
573
            magnitude_pattern = r"[bBkKmMgGtT]+"
1✔
574
            integer_value = int(re.findall(integer_pattern, value)[0])
1✔
575

576
            magnitude = re.findall(magnitude_pattern, value)
1✔
577
            if len(magnitude) > 0:
1✔
578
                magnitude = magnitude[0].lower()
1✔
579
            else:
580
                magnitude = default_magnitude.lower()
1✔
581
            # Convert it to default magnitude = megabytes
582
            return (integer_value * 1024 ** magnitude_mapping[magnitude]) / (
1✔
583
                1024 ** magnitude_mapping[target_magnitude]
584
            )
585
        else:
586
            return value
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc