• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / pysqa / 7788017394

05 Feb 2024 04:56PM UTC coverage: 96.116% (-0.2%) from 96.342%
7788017394

Pull #263

github

web-flow
Merge 31be44312 into ef4b08a35
Pull Request #263: Ask for SSH password using getpass

4 of 7 new or added lines in 1 file covered. (57.14%)

1 existing line in 1 file now uncovered.

1163 of 1210 relevant lines covered (96.12%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.39
/pysqa/queueadapter.py
1
# coding: utf-8
2
# Copyright (c) Jan Janssen
3

4
import os
1✔
5

6
from pysqa.ext.modular import ModularQueueAdapter
1✔
7
from pysqa.utils.basic import BasisQueueAdapter
1✔
8
from pysqa.utils.config import read_config
1✔
9
from pysqa.utils.execute import execute_command
1✔
10

11
__author__ = "Jan Janssen"
1✔
12
__copyright__ = "Copyright 2019, Jan Janssen"
1✔
13
__version__ = "0.0.3"
1✔
14
__maintainer__ = "Jan Janssen"
1✔
15
__email__ = "janssen@mpie.de"
1✔
16
__status__ = "production"
1✔
17
__date__ = "Feb 9, 2019"
1✔
18

19

20
class QueueAdapter(object):
1✔
21
    """
22
    The goal of the QueueAdapter class is to make submitting to a queue system as easy as starting another sub process
23
    locally.
24

25
    Args:
26
        directory (str): directory containing the queue.yaml files as well as corresponding jinja2 templates for the
27
                         individual queues.
28

29
    Attributes:
30

31
        .. attribute:: config
32

33
            QueueAdapter configuration read from the queue.yaml file.
34

35
        .. attribute:: queue_list
36

37
            List of available queues
38

39
        .. attribute:: queue_view
40

41
            Pandas DataFrame representation of the available queues, read from queue.yaml.
42

43
        .. attribute:: queues
44

45
            Queues available for auto completion QueueAdapter().queues.<queue name> returns the queue name.
46
    """
47

48
    def __init__(self, directory="~/.queues", execute_command=execute_command):
1✔
49
        queue_yaml = os.path.join(directory, "queue.yaml")
1✔
50
        clusters_yaml = os.path.join(directory, "clusters.yaml")
1✔
51
        self._adapter = None
1✔
52
        if os.path.exists(queue_yaml):
1✔
53
            self._queue_dict = {
1✔
54
                "default": set_queue_adapter(
1✔
55
                    config=read_config(file_name=queue_yaml),
1✔
56
                    directory=directory,
1✔
57
                    execute_command=execute_command,
1✔
58
                )
59
            }
60
            primary_queue = "default"
1✔
61
        elif os.path.exists(clusters_yaml):
1✔
62
            config = read_config(file_name=clusters_yaml)
1✔
63
            self._queue_dict = {
1✔
64
                k: set_queue_adapter(
1✔
65
                    config=read_config(file_name=os.path.join(directory, v)),
1✔
66
                    directory=directory,
1✔
67
                    execute_command=execute_command,
1✔
68
                )
69
                for k, v in config["cluster"].items()
1✔
70
            }
71
            primary_queue = config["cluster_primary"]
1✔
72
        else:
×
73
            raise ValueError(
74
                "Neither a queue.yaml file nor a clusters.yaml file were found in "
75
                + directory
76
            )
77
        self._adapter = self._queue_dict[primary_queue]
1✔
78

79
    def list_clusters(self):
1✔
80
        """
81
        List available computing clusters for remote submission
82

83
        Returns:
84
            list: List of computing clusters
85
        """
86
        return list(self._queue_dict.keys())
1✔
87

88
    def switch_cluster(self, cluster_name):
1✔
89
        """
90
        Switch to a different computing cluster
91

92
        Args:
93
            cluster_name (str): name of the computing cluster
94
        """
95
        self._adapter = self._queue_dict[cluster_name]
×
96

97
    @property
1✔
98
    def config(self):
1✔
99
        """
100

101
        Returns:
102
            dict:
103
        """
104
        return self._adapter.config
1✔
105

106
    @property
1✔
107
    def ssh_delete_file_on_remote(self):
1✔
108
        return self._adapter.ssh_delete_file_on_remote
1✔
109

110
    @property
1✔
111
    def remote_flag(self):
1✔
112
        """
113

114
        Returns:
115
            bool:
116
        """
117
        return self._adapter.remote_flag
×
118

119
    @property
1✔
120
    def queue_list(self):
1✔
121
        """
122

123
        Returns:
124
            list:
125
        """
126
        return self._adapter.queue_list
1✔
127

128
    @property
1✔
129
    def queue_view(self):
1✔
130
        """
131

132
        Returns:
133
            pandas.DataFrame:
134
        """
135
        return self._adapter.queue_view
1✔
136

137
    @property
1✔
138
    def queues(self):
1✔
139
        return self._adapter.queues
1✔
140

141
    def submit_job(
1✔
142
        self,
143
        queue=None,
1✔
144
        job_name=None,
1✔
145
        working_directory=None,
1✔
146
        cores=None,
1✔
147
        memory_max=None,
1✔
148
        run_time_max=None,
1✔
149
        dependency_list=None,
1✔
150
        command=None,
1✔
151
        **kwargs,
152
    ):
153
        """
154
        Submits command to the given queue.
155

156
        Args:
157
            queue (str/None):  Name of the queue to submit to, must be one of the names configured for this adapter
158
                               (optional)
159
            job_name (str/None):  Name of the job for the underlying queuing system (optional)
160
            working_directory (str/None):  Directory to run the job in (optional)
161
            cores (int/None):  Number of hardware threads requested (optional)
162
            memory_max (int/None):  Amount of memory requested per node in GB (optional)
163
            run_time_max (int/None):  Maximum runtime in seconds (optional)
164
            dependency_list(list[str]/None: Job ids of jobs to be completed before starting (optional)
165
            command (str/None): shell command to run in the job
166
            **kwargs: allows writing additional parameters to the job submission script if they are available in the
167
                      corresponding template.
168

169
        Returns:
170
            int: Job id received from the queuing system for the job which was submitted \
171
        """
172
        return self._adapter.submit_job(
1✔
173
            queue=queue,
1✔
174
            job_name=job_name,
1✔
175
            working_directory=working_directory,
1✔
176
            cores=cores,
1✔
177
            memory_max=memory_max,
1✔
178
            run_time_max=run_time_max,
1✔
179
            dependency_list=dependency_list,
1✔
180
            command=command,
1✔
181
            **kwargs,
1✔
182
        )
183

184
    def enable_reservation(self, process_id):
1✔
185
        """
186

187
        Args:
188
            process_id (int):
189

190
        Returns:
191
            str:
192
        """
193
        return self._adapter.enable_reservation(process_id=process_id)
194

195
    def get_job_from_remote(self, working_directory):
1✔
196
        """
197
        Get the results of the calculation - this is necessary when the calculation was executed on a remote host.
198

199
        Args:
200
            working_directory (str):
201
        """
202
        self._adapter.get_job_from_remote(working_directory=working_directory)
203

204
    def transfer_file_to_remote(
1✔
205
        self, file, transfer_back=False, delete_file_on_remote=False
1✔
206
    ):
207
        """
208
        Transfer file from remote host to local host
209

210
        Args:
211
            file (str):
212
            transfer_back (bool):
213
            delete_file_on_remote (bool):
214
        """
215
        self._adapter.transfer_file(
216
            file=file,
217
            transfer_back=transfer_back,
218
            delete_file_on_remote=delete_file_on_remote,
219
        )
220

221
    def convert_path_to_remote(self, path):
1✔
222
        """
223

224
        Args:
225
            path (str):
226

227
        Returns:
228
            str:
229
        """
230
        return self._adapter.convert_path_to_remote(path=path)
1✔
231

232
    def delete_job(self, process_id):
1✔
233
        """
234

235
        Args:
236
            process_id (int):
237

238
        Returns:
239
            str:
240
        """
241
        return self._adapter.delete_job(process_id=process_id)
1✔
242

243
    def get_queue_status(self, user=None):
1✔
244
        """
245

246
        Args:
247
            user (str):
248

249
        Returns:
250
            pandas.DataFrame:
251
        """
252
        return self._adapter.get_queue_status(user=user)
1✔
253

254
    def get_status_of_my_jobs(self):
1✔
255
        """
256

257
        Returns:
258
           pandas.DataFrame:
259
        """
260
        return self._adapter.get_status_of_my_jobs()
261

262
    def get_status_of_job(self, process_id):
1✔
263
        """
264

265
        Args:
266
            process_id:
267

268
        Returns:
269
             str: ['running', 'pending', 'error']
270
        """
271
        return self._adapter.get_status_of_job(process_id=process_id)
1✔
272

273
    def get_status_of_jobs(self, process_id_lst):
1✔
274
        """
275

276
        Args:
277
            process_id_lst:
278

279
        Returns:
280
             list: ['running', 'pending', 'error', ...]
281
        """
282
        return self._adapter.get_status_of_jobs(process_id_lst=process_id_lst)
1✔
283

284
    def check_queue_parameters(
1✔
285
        self, queue, cores=1, run_time_max=None, memory_max=None, active_queue=None
1✔
286
    ):
287
        """
288

289
        Args:
290
            queue (str/None):
291
            cores (int):
292
            run_time_max (int/None):
293
            memory_max (int/None):
294
            active_queue (dict):
295

296
        Returns:
297
            list: [cores, run_time_max, memory_max]
298
        """
299
        return self._adapter.check_queue_parameters(
1✔
300
            queue=queue,
1✔
301
            cores=cores,
1✔
302
            run_time_max=run_time_max,
1✔
303
            memory_max=memory_max,
1✔
304
            active_queue=active_queue,
1✔
305
        )
306

307

308
def set_queue_adapter(config, directory, execute_command=execute_command):
1✔
309
    """
310
    Initialize the queue adapter
311

312
    Args:
313
        config (dict): configuration for one cluster
314
        directory (str): directory which contains the queue configurations
315
    """
316
    if config["queue_type"] in ["SGE", "TORQUE", "SLURM", "LSF", "MOAB", "FLUX"]:
1✔
317
        return BasisQueueAdapter(
1✔
318
            config=config, directory=directory, execute_command=execute_command
1✔
319
        )
320
    elif config["queue_type"] in ["GENT"]:
1✔
321
        return ModularQueueAdapter(
1✔
322
            config=config, directory=directory, execute_command=execute_command
1✔
323
        )
324
    elif config["queue_type"] in ["REMOTE"]:
1✔
325
        # The RemoteQueueAdapter has additional dependencies, namely paramiko and tqdm.
326
        # By moving the import to this line it only fails when the user specifies the
327
        # RemoteQueueAdapter in their pysqa configuration.
328
        from pysqa.ext.remote import RemoteQueueAdapter
1✔
329

330
        return RemoteQueueAdapter(
1✔
331
            config=config, directory=directory, execute_command=execute_command
1✔
332
        )
333
    else:
334
        raise ValueError(
1✔
335
            "The queue_type "
1✔
336
            + config["queue_type"]
1✔
337
            + " is not found in the list of supported queue types "
1✔
338
            + str(["SGE", "TORQUE", "SLURM", "LSF", "MOAB", "FLUX", "GENT", "REMOTE"])
1✔
339
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc