• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / pysqa / 7755602394

02 Feb 2024 12:01PM UTC coverage: 96.647% (-0.6%) from 97.284%
7755602394

Pull #257

github

web-flow
Merge 507bc6274 into 159fcd033
Pull Request #257: Add support for submitting to systems behind two factor authentication

8 of 16 new or added lines in 1 file covered. (50.0%)

2 existing lines in 1 file now uncovered.

1153 of 1193 relevant lines covered (96.65%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.37
/pysqa/queueadapter.py
1
# coding: utf-8
2
# Copyright (c) Jan Janssen
3

4
import os
1✔
5

6
from pysqa.ext.modular import ModularQueueAdapter
1✔
7
from pysqa.utils.basic import BasisQueueAdapter
1✔
8
from pysqa.utils.config import read_config
1✔
9
from pysqa.utils.execute import execute_command
1✔
10

11
__author__ = "Jan Janssen"
1✔
12
__copyright__ = "Copyright 2019, Jan Janssen"
1✔
13
__version__ = "0.0.3"
1✔
14
__maintainer__ = "Jan Janssen"
1✔
15
__email__ = "janssen@mpie.de"
1✔
16
__status__ = "production"
1✔
17
__date__ = "Feb 9, 2019"
1✔
18

19

20
class QueueAdapter(object):
1✔
21
    """
22
    The goal of the QueueAdapter class is to make submitting to a queue system as easy as starting another sub process
23
    locally.
24

25
    Args:
26
        directory (str): directory containing the queue.yaml files as well as corresponding jinja2 templates for the
27
                         individual queues.
28

29
    Attributes:
30

31
        .. attribute:: config
32

33
            QueueAdapter configuration read from the queue.yaml file.
34

35
        .. attribute:: queue_list
36

37
            List of available queues
38

39
        .. attribute:: queue_view
40

41
            Pandas DataFrame representation of the available queues, read from queue.yaml.
42

43
        .. attribute:: queues
44

45
            Queues available for auto completion QueueAdapter().queues.<queue name> returns the queue name.
46
    """
47

48
    def __init__(self, directory="~/.queues", execute_command=execute_command):
1✔
49
        queue_yaml = os.path.join(directory, "queue.yaml")
1✔
50
        clusters_yaml = os.path.join(directory, "clusters.yaml")
1✔
51
        self._adapter = None
1✔
52
        if os.path.exists(queue_yaml):
1✔
53
            self._queue_dict = {
1✔
54
                "default": set_queue_adapter(
1✔
55
                    config=read_config(file_name=queue_yaml),
1✔
56
                    directory=directory,
1✔
57
                    execute_command=execute_command,
1✔
58
                )
59
            }
60
            primary_queue = "default"
1✔
61
        elif os.path.exists(clusters_yaml):
1✔
62
            config = read_config(file_name=clusters_yaml)
1✔
63
            self._queue_dict = {
1✔
64
                k: set_queue_adapter(
1✔
65
                    config=read_config(file_name=os.path.join(directory, v)),
1✔
66
                    directory=directory,
1✔
67
                    execute_command=execute_command,
1✔
68
                )
69
                for k, v in config["cluster"].items()
1✔
70
            }
71
            primary_queue = config["cluster_primary"]
1✔
72
        else:
×
73
            raise ValueError(
74
                "Neither a queue.yaml file nor a clusters.yaml file were found in "
75
                + directory
76
            )
77
        self._adapter = self._queue_dict[primary_queue]
1✔
78

79
    def list_clusters(self):
1✔
80
        """
81
        List available computing clusters for remote submission
82

83
        Returns:
84
            list: List of computing clusters
85
        """
86
        return list(self._queue_dict.keys())
1✔
87

88
    def switch_cluster(self, cluster_name):
1✔
89
        """
90
        Switch to a different computing cluster
91

92
        Args:
93
            cluster_name (str): name of the computing cluster
94
        """
95
        self._adapter = self._queue_dict[cluster_name]
×
96

97
    @property
1✔
98
    def config(self):
1✔
99
        """
100

101
        Returns:
102
            dict:
103
        """
104
        return self._adapter.config
1✔
105

106
    @property
1✔
107
    def ssh_delete_file_on_remote(self):
1✔
108
        return self._adapter.ssh_delete_file_on_remote
1✔
109

110
    @property
1✔
111
    def remote_flag(self):
1✔
112
        """
113

114
        Returns:
115
            bool:
116
        """
117
        return self._adapter.remote_flag
×
118

119
    @property
1✔
120
    def queue_list(self):
1✔
121
        """
122

123
        Returns:
124
            list:
125
        """
126
        return self._adapter.queue_list
1✔
127

128
    @property
1✔
129
    def queue_view(self):
1✔
130
        """
131

132
        Returns:
133
            pandas.DataFrame:
134
        """
135
        return self._adapter.queue_view
1✔
136

137
    @property
1✔
138
    def queues(self):
1✔
139
        return self._adapter.queues
1✔
140

141
    def submit_job(
1✔
142
        self,
143
        queue=None,
1✔
144
        job_name=None,
1✔
145
        working_directory=None,
1✔
146
        cores=None,
1✔
147
        memory_max=None,
1✔
148
        run_time_max=None,
1✔
149
        dependency_list=None,
1✔
150
        command=None,
1✔
151
        **kwargs,
152
    ):
153
        """
154
        Submits command to the given queue.
155

156
        Args:
157
            queue (str/None):  Name of the queue to submit to, must be one of the names configured for this adapter
158
                               (optional)
159
            job_name (str/None):  Name of the job for the underlying queuing system (optional)
160
            working_directory (str/None):  Directory to run the job in (optional)
161
            cores (int/None):  Number of hardware threads requested (optional)
162
            memory_max (int/None):  Amount of memory requested per node in GB (optional)
163
            run_time_max (int/None):  Maximum runtime in seconds (optional)
164
            dependency_list(list[str]/None: Job ids of jobs to be completed before starting (optional)
165
            command (str/None): shell command to run in the job
166
            **kwargs: allows writing additional parameters to the job submission script if they are available in the
167
                      corresponding template.
168

169
        Returns:
170
            int: Job id received from the queuing system for the job which was submitted \
171
        """
172
        return self._adapter.submit_job(
1✔
173
            queue=queue,
1✔
174
            job_name=job_name,
1✔
175
            working_directory=working_directory,
1✔
176
            cores=cores,
1✔
177
            memory_max=memory_max,
1✔
178
            run_time_max=run_time_max,
1✔
179
            dependency_list=dependency_list,
1✔
180
            command=command,
1✔
181
            **kwargs,
1✔
182
        )
183

184
    def enable_reservation(self, process_id):
1✔
185
        """
186

187
        Args:
188
            process_id (int):
189

190
        Returns:
191
            str:
192
        """
193
        return self._adapter.enable_reservation(process_id=process_id)
194

195
    def get_job_from_remote(self, working_directory):
1✔
196
        """
197
        Get the results of the calculation - this is necessary when the calculation was executed on a remote host.
198

199
        Args:
200
            working_directory (str):
201
        """
202
        self._adapter.get_job_from_remote(working_directory=working_directory)
203

204
    def transfer_file_to_remote(self, file, transfer_back=False):
1✔
205
        """
206

207
        Args:
208
            file (str):
209
            transfer_back (bool):
210
        Returns:
211
            str:
212
        """
213
        self._adapter.transfer_file(file=file, transfer_back=transfer_back)
214

215
    def convert_path_to_remote(self, path):
1✔
216
        """
217

218
        Args:
219
            path (str):
220

221
        Returns:
222
            str:
223
        """
224
        return self._adapter.convert_path_to_remote(path=path)
1✔
225

226
    def delete_job(self, process_id):
1✔
227
        """
228

229
        Args:
230
            process_id (int):
231

232
        Returns:
233
            str:
234
        """
235
        return self._adapter.delete_job(process_id=process_id)
1✔
236

237
    def get_queue_status(self, user=None):
1✔
238
        """
239

240
        Args:
241
            user (str):
242

243
        Returns:
244
            pandas.DataFrame:
245
        """
246
        return self._adapter.get_queue_status(user=user)
1✔
247

248
    def get_status_of_my_jobs(self):
1✔
249
        """
250

251
        Returns:
252
           pandas.DataFrame:
253
        """
254
        return self._adapter.get_status_of_my_jobs()
255

256
    def get_status_of_job(self, process_id):
1✔
257
        """
258

259
        Args:
260
            process_id:
261

262
        Returns:
263
             str: ['running', 'pending', 'error']
264
        """
265
        return self._adapter.get_status_of_job(process_id=process_id)
1✔
266

267
    def get_status_of_jobs(self, process_id_lst):
1✔
268
        """
269

270
        Args:
271
            process_id_lst:
272

273
        Returns:
274
             list: ['running', 'pending', 'error', ...]
275
        """
276
        return self._adapter.get_status_of_jobs(process_id_lst=process_id_lst)
1✔
277

278
    def check_queue_parameters(
1✔
279
        self, queue, cores=1, run_time_max=None, memory_max=None, active_queue=None
1✔
280
    ):
281
        """
282

283
        Args:
284
            queue (str/None):
285
            cores (int):
286
            run_time_max (int/None):
287
            memory_max (int/None):
288
            active_queue (dict):
289

290
        Returns:
291
            list: [cores, run_time_max, memory_max]
292
        """
293
        return self._adapter.check_queue_parameters(
1✔
294
            queue=queue,
1✔
295
            cores=cores,
1✔
296
            run_time_max=run_time_max,
1✔
297
            memory_max=memory_max,
1✔
298
            active_queue=active_queue,
1✔
299
        )
300

301

302
def set_queue_adapter(config, directory, execute_command=execute_command):
1✔
303
    """
304
    Initialize the queue adapter
305

306
    Args:
307
        config (dict): configuration for one cluster
308
        directory (str): directory which contains the queue configurations
309
    """
310
    if config["queue_type"] in ["SGE", "TORQUE", "SLURM", "LSF", "MOAB", "FLUX"]:
1✔
311
        return BasisQueueAdapter(
1✔
312
            config=config, directory=directory, execute_command=execute_command
1✔
313
        )
314
    elif config["queue_type"] in ["GENT"]:
1✔
315
        return ModularQueueAdapter(
1✔
316
            config=config, directory=directory, execute_command=execute_command
1✔
317
        )
318
    elif config["queue_type"] in ["REMOTE"]:
1✔
319
        # The RemoteQueueAdapter has additional dependencies, namely paramiko and tqdm.
320
        # By moving the import to this line it only fails when the user specifies the
321
        # RemoteQueueAdapter in their pysqa configuration.
322
        from pysqa.ext.remote import RemoteQueueAdapter
1✔
323

324
        return RemoteQueueAdapter(
1✔
325
            config=config, directory=directory, execute_command=execute_command
1✔
326
        )
327
    else:
328
        raise ValueError(
1✔
329
            "The queue_type "
1✔
330
            + config["queue_type"]
1✔
331
            + " is not found in the list of supported queue types "
1✔
332
            + str(["SGE", "TORQUE", "SLURM", "LSF", "MOAB", "FLUX", "GENT", "REMOTE"])
1✔
333
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc