• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / pysqa / 4895581062

pending completion
4895581062

push

github-actions

GitHub
Merge pull request #186 from pyiron/update_flux_tests

713 of 849 relevant lines covered (83.98%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.39
/pysqa/utils/basic.py
1
# coding: utf-8
2
# Copyright (c) Jan Janssen
3

4
import getpass
1✔
5
import importlib
1✔
6
from jinja2 import Template
1✔
7
import os
1✔
8
import re
1✔
9
import pandas
1✔
10
from pysqa.utils.queues import Queues
1✔
11
from pysqa.utils.execute import execute_command
1✔
12

13

14
class BasisQueueAdapter(object):
1✔
15
    """
16
    The goal of the QueueAdapter class is to make submitting to a queue system as easy as starting another sub process
17
    locally.
18

19
    Args:
20
        config (dict):
21
        directory (str): directory containing the queue.yaml files as well as corresponding jinja2 templates for the
22
                         individual queues.
23
        execute_command(funct):
24

25
    Attributes:
26

27
        .. attribute:: config
28

29
            QueueAdapter configuration read from the queue.yaml file.
30

31
        .. attribute:: queue_list
32

33
            List of available queues
34

35
        .. attribute:: queue_view
36

37
            Pandas DataFrame representation of the available queues, read from queue.yaml.
38

39
        .. attribute:: queues
40

41
            Queues available for auto completion QueueAdapter().queues.<queue name> returns the queue name.
42
    """
43

44
    def __init__(self, config, directory="~/.queues", execute_command=execute_command):
1✔
45
        self._config = config
1✔
46
        self._fill_queue_dict(queue_lst_dict=self._config["queues"])
1✔
47
        self._load_templates(queue_lst_dict=self._config["queues"], directory=directory)
1✔
48
        if self._config["queue_type"] == "SGE":
1✔
49
            class_name = "SunGridEngineCommands"
1✔
50
            module_name = "pysqa.wrapper.sge"
1✔
51
        elif self._config["queue_type"] == "TORQUE":
1✔
52
            class_name = "TorqueCommands"
1✔
53
            module_name = "pysqa.wrapper.torque"
1✔
54
        elif self._config["queue_type"] == "SLURM":
1✔
55
            class_name = "SlurmCommands"
1✔
56
            module_name = "pysqa.wrapper.slurm"
1✔
57
        elif self._config["queue_type"] == "LSF":
1✔
58
            class_name = "LsfCommands"
1✔
59
            module_name = "pysqa.wrapper.lsf"
1✔
60
        elif self._config["queue_type"] == "MOAB":
1✔
61
            class_name = "MoabCommands"
1✔
62
            module_name = "pysqa.wrapper.moab"
1✔
63
        elif self._config["queue_type"] == "GENT":
1✔
64
            class_name = "GentCommands"
1✔
65
            module_name = "pysqa.wrapper.gent"
1✔
66
        elif self._config["queue_type"] == "REMOTE":
1✔
67
            class_name = None
1✔
68
            module_name = None
1✔
69
        elif self._config["queue_type"] == "FLUX":
1✔
70
            class_name = "FluxCommands"
1✔
71
            module_name = "pysqa.wrapper.flux"
1✔
72
        else:
73
            raise ValueError()
1✔
74
        if self._config["queue_type"] != "REMOTE":
1✔
75
            self._commands = getattr(importlib.import_module(module_name), class_name)()
1✔
76
        self._queues = Queues(self.queue_list)
1✔
77
        self._remote_flag = False
1✔
78
        self._ssh_delete_file_on_remote = True
1✔
79
        self._execute_command_function = execute_command
1✔
80

81
    @property
1✔
82
    def ssh_delete_file_on_remote(self):
1✔
83
        return self._ssh_delete_file_on_remote
1✔
84

85
    @property
1✔
86
    def remote_flag(self):
1✔
87
        return self._remote_flag
1✔
88

89
    @property
1✔
90
    def config(self):
1✔
91
        """
92

93
        Returns:
94
            dict:
95
        """
96
        return self._config
1✔
97

98
    @property
1✔
99
    def queue_list(self):
1✔
100
        """
101

102
        Returns:
103
            list:
104
        """
105
        return list(self._config["queues"].keys())
1✔
106

107
    @property
1✔
108
    def queue_view(self):
1✔
109
        """
110

111
        Returns:
112
            pandas.DataFrame:
113
        """
114
        return pandas.DataFrame(self._config["queues"]).T.drop(
1✔
115
            ["script", "template"], axis=1
116
        )
117

118
    @property
1✔
119
    def queues(self):
1✔
120
        return self._queues
1✔
121

122
    def submit_job(
1✔
123
        self,
124
        queue=None,
125
        job_name=None,
126
        working_directory=None,
127
        cores=None,
128
        memory_max=None,
129
        run_time_max=None,
130
        dependency_list=None,
131
        command=None,
132
        **kwargs
133
    ):
134
        """
135

136
        Args:
137
            queue (str/None):
138
            job_name (str/None):
139
            working_directory (str/None):
140
            cores (int/None):
141
            memory_max (int/None):
142
            run_time_max (int/None):
143
            dependency_list (list[str]/None:
144
            command (str/None):
145

146
        Returns:
147
            int:
148
        """
149
        if " " in working_directory:
1✔
150
            raise ValueError(
1✔
151
                "Whitespaces in the working_directory name are not supported!"
152
            )
153
        working_directory, queue_script_path = self._write_queue_script(
1✔
154
            queue=queue,
155
            job_name=job_name,
156
            working_directory=working_directory,
157
            cores=cores,
158
            memory_max=memory_max,
159
            run_time_max=run_time_max,
160
            command=command,
161
            **kwargs
162
        )
163
        out = self._execute_command(
1✔
164
            commands=self._list_command_to_be_executed(
165
                dependency_list, queue_script_path
166
            ),
167
            working_directory=working_directory,
168
            split_output=False,
169
        )
170
        if out is not None:
1✔
171
            return self._commands.get_job_id_from_output(out)
1✔
172
        else:
173
            return None
1✔
174

175
    def _list_command_to_be_executed(self, dependency_list, queue_script_path) -> list:
1✔
176
        return (
1✔
177
            self._commands.submit_job_command
178
            + self._commands.dependencies(dependency_list)
179
            + [queue_script_path]
180
        )
181

182
    def enable_reservation(self, process_id):
1✔
183
        """
184

185
        Args:
186
            process_id (int):
187

188
        Returns:
189
            str:
190
        """
191
        out = self._execute_command(
×
192
            commands=self._commands.enable_reservation_command + [str(process_id)],
193
            split_output=True,
194
        )
195
        if out is not None:
×
196
            return out[0]
×
197
        else:
198
            return None
×
199

200
    def delete_job(self, process_id):
1✔
201
        """
202

203
        Args:
204
            process_id (int):
205

206
        Returns:
207
            str:
208
        """
209
        out = self._execute_command(
1✔
210
            commands=self._commands.delete_job_command + [str(process_id)],
211
            split_output=True,
212
        )
213
        if out is not None:
1✔
214
            return out[0]
1✔
215
        else:
216
            return None
1✔
217

218
    def get_queue_status(self, user=None):
1✔
219
        """
220

221
        Args:
222
            user (str):
223

224
        Returns:
225
            pandas.DataFrame:
226
        """
227
        out = self._execute_command(
1✔
228
            commands=self._commands.get_queue_status_command, split_output=False
229
        )
230
        df = self._commands.convert_queue_status(queue_status_output=out)
1✔
231
        if user is None:
1✔
232
            return df
1✔
233
        else:
234
            return df[df["user"] == user]
1✔
235

236
    def get_status_of_my_jobs(self):
1✔
237
        """
238

239
        Returns:
240
           pandas.DataFrame:
241
        """
242
        return self.get_queue_status(user=self._get_user())
×
243

244
    def get_status_of_job(self, process_id):
1✔
245
        """
246

247
        Args:
248
            process_id:
249

250
        Returns:
251
             str: ['running', 'pending', 'error']
252
        """
253
        df = self.get_queue_status()
1✔
254
        df_selected = df[df["jobid"] == process_id]["status"]
1✔
255
        if len(df_selected) != 0:
1✔
256
            return df_selected.values[0]
1✔
257
        else:
258
            return None
1✔
259

260
    def get_status_of_jobs(self, process_id_lst):
1✔
261
        """
262

263
        Args:
264
            process_id_lst:
265

266
        Returns:
267
             list: ['running', 'pending', 'error', ...]
268
        """
269
        df = self.get_queue_status()
1✔
270
        results_lst = []
1✔
271
        for process_id in process_id_lst:
1✔
272
            df_selected = df[df["jobid"] == process_id]["status"]
1✔
273
            if len(df_selected) != 0:
1✔
274
                results_lst.append(df_selected.values[0])
1✔
275
            else:
276
                results_lst.append("finished")
1✔
277
        return results_lst
1✔
278

279
    def get_job_from_remote(self, working_directory):
1✔
280
        """
281
        Get the results of the calculation - this is necessary when the calculation was executed on a remote host.
282
        """
283
        raise NotImplementedError
1✔
284

285
    def convert_path_to_remote(self, path):
1✔
286
        raise NotImplementedError
1✔
287

288
    def transfer_file(self, file, transfer_back=False):
1✔
289
        raise NotImplementedError
1✔
290

291
    def check_queue_parameters(
1✔
292
        self, queue, cores=1, run_time_max=None, memory_max=None, active_queue=None
293
    ):
294
        """
295

296
        Args:
297
            queue (str/None):
298
            cores (int):
299
            run_time_max (int/None):
300
            memory_max (int/None):
301
            active_queue (dict):
302

303
        Returns:
304
            list: [cores, run_time_max, memory_max]
305
        """
306
        if active_queue is None:
1✔
307
            active_queue = self._config["queues"][queue]
1✔
308
        cores = self._value_in_range(
1✔
309
            value=cores,
310
            value_min=active_queue["cores_min"],
311
            value_max=active_queue["cores_max"],
312
        )
313
        run_time_max = self._value_in_range(
1✔
314
            value=run_time_max, value_max=active_queue["run_time_max"]
315
        )
316
        memory_max = self._value_in_range(
1✔
317
            value=memory_max, value_max=active_queue["memory_max"]
318
        )
319
        return cores, run_time_max, memory_max
1✔
320

321
    def _write_queue_script(
1✔
322
        self,
323
        queue=None,
324
        job_name=None,
325
        working_directory=None,
326
        cores=None,
327
        memory_max=None,
328
        run_time_max=None,
329
        command=None,
330
        **kwargs
331
    ):
332
        """
333

334
        Args:
335
            queue (str/None):
336
            job_name (str/None):
337
            working_directory (str/None):
338
            cores (int/None):
339
            memory_max (int/None):
340
            run_time_max (int/None):
341
            command (str/None):
342

343
        """
344
        if isinstance(command, list):
1✔
345
            command = "".join(command)
×
346
        if working_directory is None:
1✔
347
            working_directory = "."
1✔
348
        queue_script = self._job_submission_template(
1✔
349
            queue=queue,
350
            job_name=job_name,
351
            working_directory=working_directory,
352
            cores=cores,
353
            memory_max=memory_max,
354
            run_time_max=run_time_max,
355
            command=command,
356
            **kwargs
357
        )
358
        if not os.path.exists(working_directory):
1✔
359
            os.makedirs(working_directory)
×
360
        queue_script_path = os.path.join(working_directory, "run_queue.sh")
1✔
361
        with open(queue_script_path, "w") as f:
1✔
362
            f.writelines(queue_script)
1✔
363
        return working_directory, queue_script_path
1✔
364

365
    def _job_submission_template(
1✔
366
        self,
367
        queue=None,
368
        job_name="job.py",
369
        working_directory=".",
370
        cores=None,
371
        memory_max=None,
372
        run_time_max=None,
373
        command=None,
374
        **kwargs
375
    ):
376
        """
377

378
        Args:
379
            queue (str/None):
380
            job_name (str):
381
            working_directory (str):
382
            cores (int/None):
383
            memory_max (int/None):
384
            run_time_max (int/None):
385
            command (str/None):
386

387
        Returns:
388
            str:
389
        """
390
        if queue is None:
1✔
391
            queue = self._config["queue_primary"]
1✔
392
        self._value_error_if_none(value=command)
1✔
393
        if queue not in self.queue_list:
1✔
394
            raise ValueError()
1✔
395
        active_queue = self._config["queues"][queue]
1✔
396
        cores, run_time_max, memory_max = self.check_queue_parameters(
1✔
397
            queue=None,
398
            cores=cores,
399
            run_time_max=run_time_max,
400
            memory_max=memory_max,
401
            active_queue=active_queue,
402
        )
403
        template = active_queue["template"]
1✔
404
        return template.render(
1✔
405
            job_name=job_name,
406
            working_directory=working_directory,
407
            cores=cores,
408
            memory_max=memory_max,
409
            run_time_max=run_time_max,
410
            command=command,
411
            **kwargs
412
        )
413

414
    def _execute_command(
1✔
415
        self,
416
        commands,
417
        working_directory=None,
418
        split_output=True,
419
        shell=False,
420
        error_filename="pysqa.err",
421
    ):
422
        """
423

424
        Args:
425
            commands (list/str):
426
            working_directory (str):
427
            split_output (bool):
428
            shell (bool):
429
            error_filename (str):
430

431
        Returns:
432
            str:
433
        """
434
        return self._execute_command_function(
1✔
435
            commands=commands,
436
            working_directory=working_directory,
437
            split_output=split_output,
438
            shell=shell,
439
            error_filename=error_filename,
440
        )
441

442
    @staticmethod
1✔
443
    def _get_user():
1✔
444
        """
445

446
        Returns:
447
            str:
448
        """
449
        return getpass.getuser()
1✔
450

451
    @staticmethod
1✔
452
    def _fill_queue_dict(queue_lst_dict):
1✔
453
        """
454

455
        Args:
456
            queue_lst_dict (dict):
457
        """
458
        queue_keys = ["cores_min", "cores_max", "run_time_max", "memory_max"]
1✔
459
        for queue_dict in queue_lst_dict.values():
1✔
460
            for key in set(queue_keys) - set(queue_dict.keys()):
1✔
461
                queue_dict[key] = None
1✔
462

463
    @staticmethod
1✔
464
    def _load_templates(queue_lst_dict, directory="."):
1✔
465
        """
466

467
        Args:
468
            queue_lst_dict (dict):
469
            directory (str):
470
        """
471
        for queue_dict in queue_lst_dict.values():
1✔
472
            if "script" in queue_dict.keys():
1✔
473
                with open(os.path.join(directory, queue_dict["script"]), "r") as f:
1✔
474
                    queue_dict["template"] = Template(f.read())
1✔
475

476
    @staticmethod
1✔
477
    def _value_error_if_none(value):
1✔
478
        """
479

480
        Args:
481
            value (str/None):
482
        """
483
        if value is None:
1✔
484
            raise ValueError()
1✔
485
        if not isinstance(value, str):
1✔
486
            raise TypeError()
1✔
487

488
    @classmethod
1✔
489
    def _value_in_range(cls, value, value_min=None, value_max=None):
1✔
490
        """
491

492
        Args:
493
            value (int/float/None):
494
            value_min (int/float/None):
495
            value_max (int/float/None):
496

497
        Returns:
498
            int/float/None:
499
        """
500

501
        if value is not None:
1✔
502
            value_, value_min_, value_max_ = [
1✔
503
                cls._memory_spec_string_to_value(v)
504
                if v is not None and isinstance(v, str)
505
                else v
506
                for v in (value, value_min, value_max)
507
            ]
508
            # ATTENTION: '60000' is interpreted as '60000M' since default magnitude is 'M'
509
            # ATTENTION: int('60000') is interpreted as '60000B' since _memory_spec_string_to_value return the size in
510
            # ATTENTION: bytes, as target_magnitude = 'b'
511
            # We want to compare the the actual (k,m,g)byte value if there is any
512
            if value_min_ is not None and value_ < value_min_:
1✔
513
                return value_min
1✔
514
            if value_max_ is not None and value_ > value_max_:
1✔
515
                return value_max
1✔
516
            return value
1✔
517
        else:
518
            if value_min is not None:
1✔
519
                return value_min
1✔
520
            if value_max is not None:
1✔
521
                return value_max
1✔
522
            return value
1✔
523

524
    @staticmethod
1✔
525
    def _is_memory_string(value):
1✔
526
        """
527
        Tests a string if it specifies a certain amount of memory e.g.: '20G', '60b'. Also pure integer strings are
528
        also valid.
529

530
        Args:
531
            value (str): the string to test
532

533
        Returns:
534
            (bool): A boolean value if the string matches a memory specification
535
        """
536
        memory_spec_pattern = r"[0-9]+[bBkKmMgGtT]?"
1✔
537
        return re.findall(memory_spec_pattern, value)[0] == value
1✔
538

539
    @classmethod
1✔
540
    def _memory_spec_string_to_value(
1✔
541
        cls, value, default_magnitude="m", target_magnitude="b"
542
    ):
543
        """
544
        Converts a valid memory string (tested by _is_memory_string) into an integer/float value of desired
545
        magnitude `default_magnitude`. If it is a plain integer string (e.g.: '50000') it will be interpreted with
546
        the magnitude passed in by the `default_magnitude`. The output will rescaled to `target_magnitude`
547

548
        Args:
549
            value (str): the string
550
            default_magnitude (str): magnitude for interpreting plain integer strings [b, B, k, K, m, M, g, G, t, T]
551
            target_magnitude (str): to which the output value should be converted [b, B, k, K, m, M, g, G, t, T]
552

553
        Returns:
554
            (float/int): the value of the string in `target_magnitude` units
555
        """
556
        magnitude_mapping = {"b": 0, "k": 1, "m": 2, "g": 3, "t": 4}
1✔
557
        if cls._is_memory_string(value):
1✔
558
            integer_pattern = r"[0-9]+"
1✔
559
            magnitude_pattern = r"[bBkKmMgGtT]+"
1✔
560
            integer_value = int(re.findall(integer_pattern, value)[0])
1✔
561

562
            magnitude = re.findall(magnitude_pattern, value)
1✔
563
            if len(magnitude) > 0:
1✔
564
                magnitude = magnitude[0].lower()
1✔
565
            else:
566
                magnitude = default_magnitude.lower()
1✔
567
            # Convert it to default magnitude = megabytes
568
            return (integer_value * 1024 ** magnitude_mapping[magnitude]) / (
1✔
569
                1024 ** magnitude_mapping[target_magnitude]
570
            )
571
        else:
572
            return value
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc