• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / pysqa / 4877114177

pending completion
4877114177

Pull #177

github-actions

GitHub
Merge 6fb76db91 into aea219873
Pull Request #177: Add test for squeue command

3 of 3 new or added lines in 1 file covered. (100.0%)

668 of 821 relevant lines covered (81.36%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.34
/pysqa/utils/basic.py
1
# coding: utf-8
2
# Copyright (c) Jan Janssen
3

4
import getpass
1✔
5
import importlib
1✔
6
from jinja2 import Template
1✔
7
import os
1✔
8
import re
1✔
9
import pandas
1✔
10
from pysqa.utils.queues import Queues
1✔
11
from pysqa.utils.execute import execute_command
1✔
12

13

14
class BasisQueueAdapter(object):
1✔
15
    """
16
    The goal of the QueueAdapter class is to make submitting to a queue system as easy as starting another sub process
17
    locally.
18

19
    Args:
20
        config (dict):
21
        directory (str): directory containing the queue.yaml files as well as corresponding jinja2 templates for the
22
                         individual queues.
23
        execute_command(funct):
24

25
    Attributes:
26

27
        .. attribute:: config
28

29
            QueueAdapter configuration read from the queue.yaml file.
30

31
        .. attribute:: queue_list
32

33
            List of available queues
34

35
        .. attribute:: queue_view
36

37
            Pandas DataFrame representation of the available queues, read from queue.yaml.
38

39
        .. attribute:: queues
40

41
            Queues available for auto completion QueueAdapter().queues.<queue name> returns the queue name.
42
    """
43

44
    def __init__(self, config, directory="~/.queues", execute_command=execute_command):
1✔
45
        self._config = config
1✔
46
        self._fill_queue_dict(queue_lst_dict=self._config["queues"])
1✔
47
        self._load_templates(queue_lst_dict=self._config["queues"], directory=directory)
1✔
48
        if self._config["queue_type"] == "SGE":
1✔
49
            class_name = "SunGridEngineCommands"
1✔
50
            module_name = "pysqa.wrapper.sge"
1✔
51
        elif self._config["queue_type"] == "TORQUE":
1✔
52
            class_name = "TorqueCommands"
1✔
53
            module_name = "pysqa.wrapper.torque"
1✔
54
        elif self._config["queue_type"] == "SLURM":
1✔
55
            class_name = "SlurmCommands"
1✔
56
            module_name = "pysqa.wrapper.slurm"
1✔
57
        elif self._config["queue_type"] == "LSF":
1✔
58
            class_name = "LsfCommands"
1✔
59
            module_name = "pysqa.wrapper.lsf"
1✔
60
        elif self._config["queue_type"] == "MOAB":
1✔
61
            class_name = "MoabCommands"
1✔
62
            module_name = "pysqa.wrapper.moab"
1✔
63
        elif self._config["queue_type"] == "GENT":
1✔
64
            class_name = "GentCommands"
1✔
65
            module_name = "pysqa.wrapper.gent"
1✔
66
        elif self._config["queue_type"] == "REMOTE":
1✔
67
            class_name = None
1✔
68
            module_name = None
1✔
69
        else:
70
            raise ValueError()
1✔
71
        if self._config["queue_type"] != "REMOTE":
1✔
72
            self._commands = getattr(importlib.import_module(module_name), class_name)()
1✔
73
        self._queues = Queues(self.queue_list)
1✔
74
        self._remote_flag = False
1✔
75
        self._ssh_delete_file_on_remote = True
1✔
76
        self._execute_command_function = execute_command
1✔
77

78
    @property
1✔
79
    def ssh_delete_file_on_remote(self):
1✔
80
        return self._ssh_delete_file_on_remote
1✔
81

82
    @property
1✔
83
    def remote_flag(self):
1✔
84
        return self._remote_flag
1✔
85

86
    @property
1✔
87
    def config(self):
1✔
88
        """
89

90
        Returns:
91
            dict:
92
        """
93
        return self._config
1✔
94

95
    @property
1✔
96
    def queue_list(self):
1✔
97
        """
98

99
        Returns:
100
            list:
101
        """
102
        return list(self._config["queues"].keys())
1✔
103

104
    @property
1✔
105
    def queue_view(self):
1✔
106
        """
107

108
        Returns:
109
            pandas.DataFrame:
110
        """
111
        return pandas.DataFrame(self._config["queues"]).T.drop(
1✔
112
            ["script", "template"], axis=1
113
        )
114

115
    @property
1✔
116
    def queues(self):
1✔
117
        return self._queues
1✔
118

119
    def submit_job(
1✔
120
        self,
121
        queue=None,
122
        job_name=None,
123
        working_directory=None,
124
        cores=None,
125
        memory_max=None,
126
        run_time_max=None,
127
        dependency_list=None,
128
        command=None,
129
        **kwargs
130
    ):
131
        """
132

133
        Args:
134
            queue (str/None):
135
            job_name (str/None):
136
            working_directory (str/None):
137
            cores (int/None):
138
            memory_max (int/None):
139
            run_time_max (int/None):
140
            dependency_list (list[str]/None:
141
            command (str/None):
142

143
        Returns:
144
            int:
145
        """
146
        if " " in working_directory:
1✔
147
            raise ValueError(
1✔
148
                "Whitespaces in the working_directory name are not supported!"
149
            )
150
        working_directory, queue_script_path = self._write_queue_script(
1✔
151
            queue=queue,
152
            job_name=job_name,
153
            working_directory=working_directory,
154
            cores=cores,
155
            memory_max=memory_max,
156
            run_time_max=run_time_max,
157
            command=command,
158
            **kwargs
159
        )
160
        out = self._execute_command(
1✔
161
            commands=self._list_command_to_be_executed(
162
                dependency_list, queue_script_path
163
            ),
164
            working_directory=working_directory,
165
            split_output=False,
166
        )
167
        if out is not None:
1✔
168
            return self._commands.get_job_id_from_output(out)
1✔
169
        else:
170
            return None
1✔
171

172
    def _list_command_to_be_executed(self, dependency_list, queue_script_path) -> list:
1✔
173
        return (
1✔
174
            self._commands.submit_job_command
175
            + self._commands.dependencies(dependency_list)
176
            + [queue_script_path]
177
        )
178

179
    def enable_reservation(self, process_id):
1✔
180
        """
181

182
        Args:
183
            process_id (int):
184

185
        Returns:
186
            str:
187
        """
188
        out = self._execute_command(
×
189
            commands=self._commands.enable_reservation_command + [str(process_id)],
190
            split_output=True,
191
        )
192
        if out is not None:
×
193
            return out[0]
×
194
        else:
195
            return None
×
196

197
    def delete_job(self, process_id):
1✔
198
        """
199

200
        Args:
201
            process_id (int):
202

203
        Returns:
204
            str:
205
        """
206
        out = self._execute_command(
1✔
207
            commands=self._commands.delete_job_command + [str(process_id)],
208
            split_output=True,
209
        )
210
        if out is not None:
1✔
211
            return out[0]
1✔
212
        else:
213
            return None
1✔
214

215
    def get_queue_status(self, user=None):
1✔
216
        """
217

218
        Args:
219
            user (str):
220

221
        Returns:
222
            pandas.DataFrame:
223
        """
224
        out = self._execute_command(
1✔
225
            commands=self._commands.get_queue_status_command, split_output=False
226
        )
227
        df = self._commands.convert_queue_status(queue_status_output=out)
1✔
228
        if user is None:
1✔
229
            return df
1✔
230
        else:
231
            return df[df["user"] == user]
1✔
232

233
    def get_status_of_my_jobs(self):
1✔
234
        """
235

236
        Returns:
237
           pandas.DataFrame:
238
        """
239
        return self.get_queue_status(user=self._get_user())
×
240

241
    def get_status_of_job(self, process_id):
1✔
242
        """
243

244
        Args:
245
            process_id:
246

247
        Returns:
248
             str: ['running', 'pending', 'error']
249
        """
250
        df = self.get_queue_status()
1✔
251
        df_selected = df[df["jobid"] == process_id]["status"]
1✔
252
        if len(df_selected) != 0:
1✔
253
            return df_selected.values[0]
1✔
254
        else:
255
            return None
1✔
256

257
    def get_status_of_jobs(self, process_id_lst):
1✔
258
        """
259

260
        Args:
261
            process_id_lst:
262

263
        Returns:
264
             list: ['running', 'pending', 'error', ...]
265
        """
266
        df = self.get_queue_status()
1✔
267
        results_lst = []
1✔
268
        for process_id in process_id_lst:
1✔
269
            df_selected = df[df["jobid"] == process_id]["status"]
1✔
270
            if len(df_selected) != 0:
1✔
271
                results_lst.append(df_selected.values[0])
1✔
272
            else:
273
                results_lst.append("finished")
1✔
274
        return results_lst
1✔
275

276
    def get_job_from_remote(self, working_directory):
1✔
277
        """
278
        Get the results of the calculation - this is necessary when the calculation was executed on a remote host.
279
        """
280
        raise NotImplementedError
1✔
281

282
    def convert_path_to_remote(self, path):
1✔
283
        raise NotImplementedError
1✔
284

285
    def transfer_file(self, file, transfer_back=False):
1✔
286
        raise NotImplementedError
1✔
287

288
    def check_queue_parameters(
1✔
289
        self, queue, cores=1, run_time_max=None, memory_max=None, active_queue=None
290
    ):
291
        """
292

293
        Args:
294
            queue (str/None):
295
            cores (int):
296
            run_time_max (int/None):
297
            memory_max (int/None):
298
            active_queue (dict):
299

300
        Returns:
301
            list: [cores, run_time_max, memory_max]
302
        """
303
        if active_queue is None:
1✔
304
            active_queue = self._config["queues"][queue]
1✔
305
        cores = self._value_in_range(
1✔
306
            value=cores,
307
            value_min=active_queue["cores_min"],
308
            value_max=active_queue["cores_max"],
309
        )
310
        run_time_max = self._value_in_range(
1✔
311
            value=run_time_max, value_max=active_queue["run_time_max"]
312
        )
313
        memory_max = self._value_in_range(
1✔
314
            value=memory_max, value_max=active_queue["memory_max"]
315
        )
316
        return cores, run_time_max, memory_max
1✔
317

318
    def _write_queue_script(
1✔
319
        self,
320
        queue=None,
321
        job_name=None,
322
        working_directory=None,
323
        cores=None,
324
        memory_max=None,
325
        run_time_max=None,
326
        command=None,
327
        **kwargs
328
    ):
329
        """
330

331
        Args:
332
            queue (str/None):
333
            job_name (str/None):
334
            working_directory (str/None):
335
            cores (int/None):
336
            memory_max (int/None):
337
            run_time_max (int/None):
338
            command (str/None):
339

340
        """
341
        if isinstance(command, list):
1✔
342
            command = "".join(command)
×
343
        if working_directory is None:
1✔
344
            working_directory = "."
1✔
345
        queue_script = self._job_submission_template(
1✔
346
            queue=queue,
347
            job_name=job_name,
348
            working_directory=working_directory,
349
            cores=cores,
350
            memory_max=memory_max,
351
            run_time_max=run_time_max,
352
            command=command,
353
            **kwargs
354
        )
355
        if not os.path.exists(working_directory):
1✔
356
            os.makedirs(working_directory)
×
357
        queue_script_path = os.path.join(working_directory, "run_queue.sh")
1✔
358
        with open(queue_script_path, "w") as f:
1✔
359
            f.writelines(queue_script)
1✔
360
        return working_directory, queue_script_path
1✔
361

362
    def _job_submission_template(
1✔
363
        self,
364
        queue=None,
365
        job_name="job.py",
366
        working_directory=".",
367
        cores=None,
368
        memory_max=None,
369
        run_time_max=None,
370
        command=None,
371
        **kwargs
372
    ):
373
        """
374

375
        Args:
376
            queue (str/None):
377
            job_name (str):
378
            working_directory (str):
379
            cores (int/None):
380
            memory_max (int/None):
381
            run_time_max (int/None):
382
            command (str/None):
383

384
        Returns:
385
            str:
386
        """
387
        if queue is None:
1✔
388
            queue = self._config["queue_primary"]
1✔
389
        self._value_error_if_none(value=command)
1✔
390
        if queue not in self.queue_list:
1✔
391
            raise ValueError()
1✔
392
        active_queue = self._config["queues"][queue]
1✔
393
        cores, run_time_max, memory_max = self.check_queue_parameters(
1✔
394
            queue=None,
395
            cores=cores,
396
            run_time_max=run_time_max,
397
            memory_max=memory_max,
398
            active_queue=active_queue,
399
        )
400
        template = active_queue["template"]
1✔
401
        return template.render(
1✔
402
            job_name=job_name,
403
            working_directory=working_directory,
404
            cores=cores,
405
            memory_max=memory_max,
406
            run_time_max=run_time_max,
407
            command=command,
408
            **kwargs
409
        )
410

411
    def _execute_command(
1✔
412
        self,
413
        commands,
414
        working_directory=None,
415
        split_output=True,
416
        shell=False,
417
        error_filename="pysqa.err",
418
    ):
419
        """
420

421
        Args:
422
            commands (list/str):
423
            working_directory (str):
424
            split_output (bool):
425
            shell (bool):
426
            error_filename (str):
427

428
        Returns:
429
            str:
430
        """
431
        return self._execute_command_function(
1✔
432
            commands=commands,
433
            working_directory=working_directory,
434
            split_output=split_output,
435
            shell=shell,
436
            error_filename=error_filename,
437
        )
438

439
    @staticmethod
1✔
440
    def _get_user():
1✔
441
        """
442

443
        Returns:
444
            str:
445
        """
446
        return getpass.getuser()
1✔
447

448
    @staticmethod
1✔
449
    def _fill_queue_dict(queue_lst_dict):
1✔
450
        """
451

452
        Args:
453
            queue_lst_dict (dict):
454
        """
455
        queue_keys = ["cores_min", "cores_max", "run_time_max", "memory_max"]
1✔
456
        for queue_dict in queue_lst_dict.values():
1✔
457
            for key in set(queue_keys) - set(queue_dict.keys()):
1✔
458
                queue_dict[key] = None
1✔
459

460
    @staticmethod
1✔
461
    def _load_templates(queue_lst_dict, directory="."):
1✔
462
        """
463

464
        Args:
465
            queue_lst_dict (dict):
466
            directory (str):
467
        """
468
        for queue_dict in queue_lst_dict.values():
1✔
469
            if "script" in queue_dict.keys():
1✔
470
                with open(os.path.join(directory, queue_dict["script"]), "r") as f:
1✔
471
                    queue_dict["template"] = Template(f.read())
1✔
472

473
    @staticmethod
1✔
474
    def _value_error_if_none(value):
1✔
475
        """
476

477
        Args:
478
            value (str/None):
479
        """
480
        if value is None:
1✔
481
            raise ValueError()
1✔
482
        if not isinstance(value, str):
1✔
483
            raise TypeError()
1✔
484

485
    @classmethod
1✔
486
    def _value_in_range(cls, value, value_min=None, value_max=None):
1✔
487
        """
488

489
        Args:
490
            value (int/float/None):
491
            value_min (int/float/None):
492
            value_max (int/float/None):
493

494
        Returns:
495
            int/float/None:
496
        """
497

498
        if value is not None:
1✔
499
            value_, value_min_, value_max_ = [
1✔
500
                cls._memory_spec_string_to_value(v)
501
                if v is not None and isinstance(v, str)
502
                else v
503
                for v in (value, value_min, value_max)
504
            ]
505
            # ATTENTION: '60000' is interpreted as '60000M' since default magnitude is 'M'
506
            # ATTENTION: int('60000') is interpreted as '60000B' since _memory_spec_string_to_value return the size in
507
            # ATTENTION: bytes, as target_magnitude = 'b'
508
            # We want to compare the the actual (k,m,g)byte value if there is any
509
            if value_min_ is not None and value_ < value_min_:
1✔
510
                return value_min
1✔
511
            if value_max_ is not None and value_ > value_max_:
1✔
512
                return value_max
1✔
513
            return value
1✔
514
        else:
515
            if value_min is not None:
1✔
516
                return value_min
1✔
517
            if value_max is not None:
1✔
518
                return value_max
1✔
519
            return value
1✔
520

521
    @staticmethod
1✔
522
    def _is_memory_string(value):
1✔
523
        """
524
        Tests a string if it specifies a certain amount of memory e.g.: '20G', '60b'. Also pure integer strings are
525
        also valid.
526

527
        Args:
528
            value (str): the string to test
529

530
        Returns:
531
            (bool): A boolean value if the string matches a memory specification
532
        """
533
        memory_spec_pattern = r"[0-9]+[bBkKmMgGtT]?"
1✔
534
        return re.findall(memory_spec_pattern, value)[0] == value
1✔
535

536
    @classmethod
1✔
537
    def _memory_spec_string_to_value(
1✔
538
        cls, value, default_magnitude="m", target_magnitude="b"
539
    ):
540
        """
541
        Converts a valid memory string (tested by _is_memory_string) into an integer/float value of desired
542
        magnitude `default_magnitude`. If it is a plain integer string (e.g.: '50000') it will be interpreted with
543
        the magnitude passed in by the `default_magnitude`. The output will rescaled to `target_magnitude`
544

545
        Args:
546
            value (str): the string
547
            default_magnitude (str): magnitude for interpreting plain integer strings [b, B, k, K, m, M, g, G, t, T]
548
            target_magnitude (str): to which the output value should be converted [b, B, k, K, m, M, g, G, t, T]
549

550
        Returns:
551
            (float/int): the value of the string in `target_magnitude` units
552
        """
553
        magnitude_mapping = {"b": 0, "k": 1, "m": 2, "g": 3, "t": 4}
1✔
554
        if cls._is_memory_string(value):
1✔
555
            integer_pattern = r"[0-9]+"
1✔
556
            magnitude_pattern = r"[bBkKmMgGtT]+"
1✔
557
            integer_value = int(re.findall(integer_pattern, value)[0])
1✔
558

559
            magnitude = re.findall(magnitude_pattern, value)
1✔
560
            if len(magnitude) > 0:
1✔
561
                magnitude = magnitude[0].lower()
1✔
562
            else:
563
                magnitude = default_magnitude.lower()
1✔
564
            # Convert it to default magnitude = megabytes
565
            return (integer_value * 1024 ** magnitude_mapping[magnitude]) / (
1✔
566
                1024 ** magnitude_mapping[target_magnitude]
567
            )
568
        else:
569
            return value
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc