• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / pysqa / 4878120914

pending completion
4878120914

Pull #178

github-actions

GitHub
Merge c719d5640 into 43cb5ffa3
Pull Request #178: Move command line interface from python -m pysqa.cmd to python -m pysqa

4 of 4 new or added lines in 2 files covered. (100.0%)

680 of 815 relevant lines covered (83.44%)

0.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.14
/pysqa/queueadapter.py
1
# coding: utf-8
2
# Copyright (c) Jan Janssen
3

4
import os
1✔
5
from pysqa.utils.basic import BasisQueueAdapter
1✔
6
from pysqa.ext.modular import ModularQueueAdapter
1✔
7
from pysqa.ext.remote import RemoteQueueAdapter
1✔
8
from pysqa.utils.config import read_config
1✔
9
from pysqa.utils.execute import execute_command
1✔
10

11
__author__ = "Jan Janssen"
1✔
12
__copyright__ = "Copyright 2019, Jan Janssen"
1✔
13
__version__ = "0.0.3"
1✔
14
__maintainer__ = "Jan Janssen"
1✔
15
__email__ = "janssen@mpie.de"
1✔
16
__status__ = "production"
1✔
17
__date__ = "Feb 9, 2019"
1✔
18

19

20
class QueueAdapter(object):
1✔
21
    """
22
    The goal of the QueueAdapter class is to make submitting to a queue system as easy as starting another sub process
23
    locally.
24

25
    Args:
26
        directory (str): directory containing the queue.yaml files as well as corresponding jinja2 templates for the
27
                         individual queues.
28

29
    Attributes:
30

31
        .. attribute:: config
32

33
            QueueAdapter configuration read from the queue.yaml file.
34

35
        .. attribute:: queue_list
36

37
            List of available queues
38

39
        .. attribute:: queue_view
40

41
            Pandas DataFrame representation of the available queues, read from queue.yaml.
42

43
        .. attribute:: queues
44

45
            Queues available for auto completion QueueAdapter().queues.<queue name> returns the queue name.
46
    """
47

48
    def __init__(self, directory="~/.queues", execute_command=execute_command):
1✔
49
        queue_yaml = os.path.join(directory, "queue.yaml")
1✔
50
        clusters_yaml = os.path.join(directory, "clusters.yaml")
1✔
51
        self._adapter = None
1✔
52
        if os.path.exists(queue_yaml):
1✔
53
            self._queue_dict = {
1✔
54
                "default": set_queue_adapter(
55
                    config=read_config(file_name=queue_yaml),
56
                    directory=directory,
57
                    execute_command=execute_command,
58
                )
59
            }
60
            primary_queue = "default"
1✔
61
        elif os.path.exists(clusters_yaml):
1✔
62
            config = read_config(file_name=clusters_yaml)
1✔
63
            self._queue_dict = {
1✔
64
                k: set_queue_adapter(
65
                    config=read_config(file_name=os.path.join(directory, v)),
66
                    directory=directory,
67
                    execute_command=execute_command,
68
                )
69
                for k, v in config["cluster"].items()
70
            }
71
            primary_queue = config["cluster_primary"]
1✔
72
        else:
73
            raise ValueError
×
74
        self._adapter = self._queue_dict[primary_queue]
1✔
75

76
    def list_clusters(self):
1✔
77
        """
78
        List available computing clusters for remote submission
79

80
        Returns:
81
            list: List of computing clusters
82
        """
83
        return list(self._queue_dict.keys())
1✔
84

85
    def switch_cluster(self, cluster_name):
1✔
86
        """
87
        Switch to a different computing cluster
88

89
        Args:
90
            cluster_name (str): name of the computing cluster
91
        """
92
        self._adapter = self._queue_dict[cluster_name]
×
93

94
    @property
1✔
95
    def config(self):
1✔
96
        """
97

98
        Returns:
99
            dict:
100
        """
101
        return self._adapter.config
1✔
102

103
    @property
1✔
104
    def ssh_delete_file_on_remote(self):
1✔
105
        return self._adapter.ssh_delete_file_on_remote
1✔
106

107
    @property
1✔
108
    def remote_flag(self):
1✔
109
        """
110

111
        Returns:
112
            bool:
113
        """
114
        return self._adapter.remote_flag
×
115

116
    @property
1✔
117
    def queue_list(self):
1✔
118
        """
119

120
        Returns:
121
            list:
122
        """
123
        return self._adapter.queue_list
1✔
124

125
    @property
1✔
126
    def queue_view(self):
1✔
127
        """
128

129
        Returns:
130
            pandas.DataFrame:
131
        """
132
        return self._adapter.queue_view
1✔
133

134
    @property
1✔
135
    def queues(self):
1✔
136
        return self._adapter.queues
1✔
137

138
    def submit_job(
1✔
139
        self,
140
        queue=None,
141
        job_name=None,
142
        working_directory=None,
143
        cores=None,
144
        memory_max=None,
145
        run_time_max=None,
146
        dependency_list=None,
147
        command=None,
148
        **kwargs
149
    ):
150
        """
151
        Submits command to the given queue.
152

153
        Args:
154
            queue (str/None):  Name of the queue to submit to, must be one of the names configured for this adapter
155
            job_name (str/None):  Name of the job for the underlying queuing system
156
            working_directory (str/None):  Directory to run the job in
157
            cores (int/None):  Number of hardware threads requested
158
            memory_max (int/None):  Amount of memory requested per node in GB
159
            run_time_max (int/None):  Maximum runtime in seconds
160
            dependency_list(list[str]/None: Job ids of jobs to be completed before starting
161
            command (str/None):  shell command to run in the job
162

163
        Returns:
164
            int:
165
        """
166
        return self._adapter.submit_job(
1✔
167
            queue=queue,
168
            job_name=job_name,
169
            working_directory=working_directory,
170
            cores=cores,
171
            memory_max=memory_max,
172
            run_time_max=run_time_max,
173
            dependency_list=dependency_list,
174
            command=command,
175
            **kwargs
176
        )
177

178
    def enable_reservation(self, process_id):
1✔
179
        """
180

181
        Args:
182
            process_id (int):
183

184
        Returns:
185
            str:
186
        """
187
        return self._adapter.enable_reservation(process_id=process_id)
×
188

189
    def get_job_from_remote(self, working_directory, delete_remote=False):
1✔
190
        """
191
        Get the results of the calculation - this is necessary when the calculation was executed on a remote host.
192

193
        Args:
194
            working_directory (str):
195
            delete_remote (bool):
196
        """
197
        self._adapter.get_job_from_remote(
×
198
            working_directory=working_directory, delete_remote=delete_remote
199
        )
200

201
    def transfer_file_to_remote(self, file, transfer_back=False, delete_remote=False):
1✔
202
        """
203

204
        Args:
205
            file (str):
206
            transfer_back (bool):
207
            delete_remote (bool):
208

209
        Returns:
210
            str:
211
        """
212
        self._adapter.transfer_file(
×
213
            file=file, transfer_back=transfer_back, delete_remote=delete_remote
214
        )
215

216
    def convert_path_to_remote(self, path):
1✔
217
        """
218

219
        Args:
220
            path (str):
221

222
        Returns:
223
            str:
224
        """
225
        return self._adapter.convert_path_to_remote(path=path)
1✔
226

227
    def delete_job(self, process_id):
1✔
228
        """
229

230
        Args:
231
            process_id (int):
232

233
        Returns:
234
            str:
235
        """
236
        return self._adapter.delete_job(process_id=process_id)
1✔
237

238
    def get_queue_status(self, user=None):
1✔
239
        """
240

241
        Args:
242
            user (str):
243

244
        Returns:
245
            pandas.DataFrame:
246
        """
247
        return self._adapter.get_queue_status(user=user)
1✔
248

249
    def get_status_of_my_jobs(self):
1✔
250
        """
251

252
        Returns:
253
           pandas.DataFrame:
254
        """
255
        return self._adapter.get_status_of_my_jobs()
×
256

257
    def get_status_of_job(self, process_id):
1✔
258
        """
259

260
        Args:
261
            process_id:
262

263
        Returns:
264
             str: ['running', 'pending', 'error']
265
        """
266
        return self._adapter.get_status_of_job(process_id=process_id)
1✔
267

268
    def get_status_of_jobs(self, process_id_lst):
1✔
269
        """
270

271
        Args:
272
            process_id_lst:
273

274
        Returns:
275
             list: ['running', 'pending', 'error', ...]
276
        """
277
        return self._adapter.get_status_of_jobs(process_id_lst=process_id_lst)
1✔
278

279
    def check_queue_parameters(
1✔
280
        self, queue, cores=1, run_time_max=None, memory_max=None, active_queue=None
281
    ):
282
        """
283

284
        Args:
285
            queue (str/None):
286
            cores (int):
287
            run_time_max (int/None):
288
            memory_max (int/None):
289
            active_queue (dict):
290

291
        Returns:
292
            list: [cores, run_time_max, memory_max]
293
        """
294
        return self._adapter.check_queue_parameters(
1✔
295
            queue=queue,
296
            cores=cores,
297
            run_time_max=run_time_max,
298
            memory_max=memory_max,
299
            active_queue=active_queue,
300
        )
301

302

303
def set_queue_adapter(config, directory, execute_command=execute_command):
1✔
304
    """
305
    Initialize the queue adapter
306

307
    Args:
308
        config (dict): configuration for one cluster
309
        directory (str): directory which contains the queue configurations
310
    """
311
    if config["queue_type"] in ["SGE", "TORQUE", "SLURM", "LSF", "MOAB"]:
1✔
312
        return BasisQueueAdapter(
1✔
313
            config=config, directory=directory, execute_command=execute_command
314
        )
315
    elif config["queue_type"] in ["GENT"]:
1✔
316
        return ModularQueueAdapter(
1✔
317
            config=config, directory=directory, execute_command=execute_command
318
        )
319
    elif config["queue_type"] in ["REMOTE"]:
1✔
320
        return RemoteQueueAdapter(
1✔
321
            config=config, directory=directory, execute_command=execute_command
322
        )
323
    else:
324
        raise ValueError
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc