• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / pysqa / 11125805398

01 Oct 2024 12:56PM UTC coverage: 80.879%. Remained the same
11125805398

push

github

Han Lin Mai
Revert "queues"

This reverts commit 2f5d3aafa.

791 of 978 relevant lines covered (80.88%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.0
/pysqa/queueadapter.py
1
import os
1✔
2
from typing import List, Optional, Tuple, Union
1✔
3

4
import pandas
1✔
5
from jinja2 import Template
1✔
6

7
from pysqa.base.abstract import QueueAdapterAbstractClass
1✔
8
from pysqa.base.config import QueueAdapterWithConfig, Queues, read_config
1✔
9
from pysqa.base.core import QueueAdapterCore, execute_command
1✔
10
from pysqa.base.modular import ModularQueueAdapter
1✔
11

12

13
class QueueAdapter(QueueAdapterAbstractClass):
1✔
14
    """
15
    The goal of the QueueAdapter class is to make submitting to a queue system as easy as starting another sub process
16
    locally.
17

18
    Args:
19
        directory (str): directory containing the queue.yaml files as well as corresponding jinja2 templates for the
20
                         individual queues.
21

22
    Attributes:
23

24
        .. attribute:: config
25

26
            QueueAdapter configuration read from the queue.yaml file.
27

28
        .. attribute:: queue_list
29

30
            List of available queues
31

32
        .. attribute:: queue_view
33

34
            Pandas DataFrame representation of the available queues, read from queue.yaml.
35

36
        .. attribute:: queues
37

38
            Queues available for auto completion QueueAdapter().queues.<queue name> returns the queue name.
39
    """
40

41
    def __init__(
1✔
42
        self,
43
        directory: Optional[str] = None,
44
        queue_type: Optional[str] = None,
45
        execute_command: callable = execute_command,
46
    ):
47
        """
48
        Initialize the QueueAdapter.
49

50
        Args:
51
            directory (str): Directory containing the queue.yaml files and corresponding templates.
52
            execute_command (callable): Function to execute commands.
53
        """
54
        if directory is not None:
1✔
55
            queue_yaml = os.path.join(directory, "queue.yaml")
1✔
56
            clusters_yaml = os.path.join(directory, "clusters.yaml")
1✔
57
            self._adapter = None
1✔
58
            if os.path.exists(queue_yaml):
1✔
59
                self._queue_dict = {
1✔
60
                    "default": set_queue_adapter(
61
                        config=read_config(file_name=queue_yaml),
62
                        directory=directory,
63
                        execute_command=execute_command,
64
                    )
65
                }
66
                primary_queue = "default"
1✔
67
            elif os.path.exists(clusters_yaml):
1✔
68
                config = read_config(file_name=clusters_yaml)
1✔
69
                self._queue_dict = {
1✔
70
                    k: set_queue_adapter(
71
                        config=read_config(file_name=os.path.join(directory, v)),
72
                        directory=directory,
73
                        execute_command=execute_command,
74
                    )
75
                    for k, v in config["cluster"].items()
76
                }
77
                primary_queue = config["cluster_primary"]
1✔
78
            else:
79
                raise ValueError(
×
80
                    "Neither a queue.yaml file nor a clusters.yaml file were found in "
81
                    + directory
82
                )
83
            self._adapter = self._queue_dict[primary_queue]
1✔
84
        elif queue_type is not None:
1✔
85
            self._queue_dict = {}
1✔
86
            self._adapter = QueueAdapterCore(queue_type=queue_type.upper())
1✔
87
        else:
88
            raise ValueError()
1✔
89

90
    def list_clusters(self) -> List[str]:
1✔
91
        """
92
        List available computing clusters for remote submission
93

94
        Returns:
95
            List of computing clusters
96
        """
97
        return list(self._queue_dict.keys())
1✔
98

99
    def switch_cluster(self, cluster_name: str):
1✔
100
        """
101
        Switch to a different computing cluster
102

103
        Args:
104
            cluster_name (str): name of the computing cluster
105
        """
106
        self._adapter = self._queue_dict[cluster_name]
×
107

108
    @property
1✔
109
    def config(self) -> Union[dict, None]:
1✔
110
        """
111
        Get the QueueAdapter configuration.
112

113
        Returns:
114
            dict: The QueueAdapter configuration.
115
        """
116
        if isinstance(self._adapter, QueueAdapterWithConfig):
1✔
117
            return self._adapter.config
1✔
118
        else:
119
            return None
×
120

121
    @property
1✔
122
    def ssh_delete_file_on_remote(self) -> bool:
1✔
123
        """
124
        Get the value of ssh_delete_file_on_remote property.
125

126
        Returns:
127
            bool: The value of ssh_delete_file_on_remote property.
128
        """
129
        if isinstance(self._adapter, QueueAdapterWithConfig):
1✔
130
            return self._adapter.ssh_delete_file_on_remote
1✔
131
        else:
132
            return False
1✔
133

134
    @property
1✔
135
    def remote_flag(self) -> bool:
1✔
136
        """
137
        Get the value of remote_flag property.
138

139
        Returns:
140
            bool: The value of remote_flag property.
141
        """
142
        if isinstance(self._adapter, QueueAdapterWithConfig):
1✔
143
            return self._adapter.remote_flag
1✔
144
        else:
145
            return False
1✔
146

147
    @property
1✔
148
    def queue_list(self) -> Union[List[str], None]:
1✔
149
        """
150
        Get the list of available queues.
151

152
        Returns:
153
            List[str]: The list of available queues.
154
        """
155
        if isinstance(self._adapter, QueueAdapterWithConfig):
1✔
156
            return self._adapter.queue_list
1✔
157
        else:
158
            return None
×
159

160
    @property
1✔
161
    def queue_view(self) -> Union[pandas.DataFrame, None]:
1✔
162
        """
163
        Get the Pandas DataFrame representation of the available queues.
164

165
        Returns:
166
            pandas.DataFrame: The Pandas DataFrame representation of the available queues.
167
        """
168
        if isinstance(self._adapter, QueueAdapterWithConfig):
1✔
169
            return self._adapter.queue_view
1✔
170
        else:
171
            return None
×
172

173
    @property
1✔
174
    def queues(self) -> Union[Queues, None]:
1✔
175
        """
176
        Get the list of available queues.
177

178
        Returns:
179
            List[str]: The list of available queues.
180
        """
181
        if isinstance(self._adapter, QueueAdapterWithConfig):
1✔
182
            return self._adapter.queues
1✔
183
        else:
184
            return None
×
185

186
    def submit_job(
1✔
187
        self,
188
        queue: Optional[str] = None,
189
        job_name: Optional[str] = None,
190
        working_directory: Optional[str] = None,
191
        cores: Optional[int] = None,
192
        memory_max: Optional[int] = None,
193
        run_time_max: Optional[int] = None,
194
        dependency_list: Optional[List[str]] = None,
195
        command: Optional[str] = None,
196
        submission_template: Optional[Union[str, Template]] = None,
197
        **kwargs,
198
    ) -> int:
199
        """
200
        Submits command to the given queue.
201

202
        Args:
203
            queue (str/None):  Name of the queue to submit to, must be one of the names configured for this adapter
204
                               (optional)
205
            job_name (str/None):  Name of the job for the underlying queuing system (optional)
206
            working_directory (str/None):  Directory to run the job in (optional)
207
            cores (int/None):  Number of hardware threads requested (optional)
208
            memory_max (int/None):  Amount of memory requested per node in GB (optional)
209
            run_time_max (int/None):  Maximum runtime in seconds (optional)
210
            dependency_list(list[str]/None: Job ids of jobs to be completed before starting (optional)
211
            command (str/None): shell command to run in the job
212
            **kwargs: allows writing additional parameters to the job submission script if they are available in the
213
                      corresponding template.
214

215
        Returns:
216
            int: Job id received from the queuing system for the job which was submitted
217
        """
218
        return self._adapter.submit_job(
1✔
219
            queue=queue,
220
            job_name=job_name,
221
            working_directory=working_directory,
222
            cores=cores,
223
            memory_max=memory_max,
224
            run_time_max=run_time_max,
225
            dependency_list=dependency_list,
226
            command=command,
227
            submission_template=submission_template,
228
            **kwargs,
229
        )
230

231
    def enable_reservation(self, process_id: int) -> str:
1✔
232
        """
233
        Enable reservation for a process.
234

235
        Args:
236
            process_id (int): The process id.
237

238
        Returns:
239
            str: The result of enabling reservation.
240
        """
241
        return self._adapter.enable_reservation(process_id=process_id)
×
242

243
    def get_job_from_remote(self, working_directory: str):
1✔
244
        """
245
        Get the results of the calculation - this is necessary when the calculation was executed on a remote host.
246

247
        Args:
248
            working_directory (str): The working directory.
249
        """
250
        if isinstance(self._adapter, QueueAdapterWithConfig):
×
251
            self._adapter.get_job_from_remote(working_directory=working_directory)
×
252
        else:
253
            raise TypeError()
×
254

255
    def transfer_file_to_remote(
1✔
256
        self,
257
        file: str,
258
        transfer_back: bool = False,
259
        delete_file_on_remote: bool = False,
260
    ):
261
        """
262
        Transfer file from remote host to local host.
263

264
        Args:
265
            file (str): The file to transfer.
266
            transfer_back (bool): Whether to transfer the file back.
267
            delete_file_on_remote (bool): Whether to delete the file on the remote host.
268
        """
269
        if isinstance(self._adapter, QueueAdapterWithConfig):
×
270
            self._adapter.transfer_file(
×
271
                file=file,
272
                transfer_back=transfer_back,
273
                delete_file_on_remote=delete_file_on_remote,
274
            )
275
        else:
276
            raise TypeError()
×
277

278
    def convert_path_to_remote(self, path: str) -> str:
1✔
279
        """
280
        Convert a local path to a remote path.
281

282
        Args:
283
            path (str): The local path.
284

285
        Returns:
286
            str: The remote path.
287
        """
288
        if isinstance(self._adapter, QueueAdapterWithConfig):
1✔
289
            return self._adapter.convert_path_to_remote(path=path)
1✔
290
        else:
291
            raise TypeError()
×
292

293
    def delete_job(self, process_id: int) -> str:
1✔
294
        """
295
        Delete a job.
296

297
        Args:
298
            process_id (int): The process id.
299

300
        Returns:
301
            str: The result of deleting the job.
302
        """
303
        return self._adapter.delete_job(process_id=process_id)
1✔
304

305
    def get_queue_status(self, user: Optional[str] = None) -> pandas.DataFrame:
1✔
306
        """
307
        Get the status of the queue.
308

309
        Args:
310
            user (str/None): The user.
311

312
        Returns:
313
            pandas.DataFrame: The status of the queue.
314
        """
315
        return self._adapter.get_queue_status(user=user)
1✔
316

317
    def get_status_of_my_jobs(self) -> pandas.DataFrame:
1✔
318
        """
319
        Get the status of the user's jobs.
320

321
        Returns:
322
           pandas.DataFrame: The status of the user's jobs.
323
        """
324
        return self._adapter.get_status_of_my_jobs()
×
325

326
    def get_status_of_job(self, process_id: int) -> str:
1✔
327
        """
328
        Get the status of a job.
329

330
        Args:
331
            process_id: The process id.
332

333
        Returns:
334
             str: The status of the job. Possible values are ['running', 'pending', 'error'].
335
        """
336
        return self._adapter.get_status_of_job(process_id=process_id)
1✔
337

338
    def get_status_of_jobs(self, process_id_lst: List[int]) -> List[str]:
1✔
339
        """
340
        Get the status of multiple jobs.
341

342
        Args:
343
            process_id_lst: The list of process ids.
344

345
        Returns:
346
             List[str]: The status of the jobs. Possible values are ['running', 'pending', 'error', ...].
347
        """
348
        return self._adapter.get_status_of_jobs(process_id_lst=process_id_lst)
1✔
349

350
    def check_queue_parameters(
1✔
351
        self,
352
        queue: str,
353
        cores: int = 1,
354
        run_time_max: Optional[int] = None,
355
        memory_max: Optional[int] = None,
356
        active_queue: Optional[dict] = None,
357
    ) -> Tuple[
358
        Union[float, int, None], Union[float, int, None], Union[float, int, None]
359
    ]:
360
        """
361
        Check the parameters of a queue.
362

363
        Args:
364
            queue (str/None): The queue name.
365
            cores (int): The number of cores.
366
            run_time_max (int/None): The maximum runtime.
367
            memory_max (int/None): The maximum memory.
368
            active_queue (dict/None): The active queue.
369

370
        Returns:
371
            List: A list containing the checked parameters [cores, run_time_max, memory_max].
372
        """
373
        if isinstance(self._adapter, QueueAdapterWithConfig):
1✔
374
            return self._adapter.check_queue_parameters(
1✔
375
                queue=queue,
376
                cores=cores,
377
                run_time_max=run_time_max,
378
                memory_max=memory_max,
379
                active_queue=active_queue,
380
            )
381
        else:
382
            return cores, run_time_max, memory_max
×
383

384

385
def set_queue_adapter(
1✔
386
    config: dict, directory: str, execute_command: callable = execute_command
387
):
388
    """
389
    Initialize the queue adapter
390

391
    Args:
392
        config (dict): configuration for one cluster
393
        directory (str): directory which contains the queue configurations
394
    """
395
    if config["queue_type"] in ["SGE", "TORQUE", "SLURM", "LSF", "MOAB", "FLUX"]:
1✔
396
        return QueueAdapterWithConfig(
1✔
397
            config=config, directory=directory, execute_command=execute_command
398
        )
399
    elif config["queue_type"] in ["GENT"]:
1✔
400
        return ModularQueueAdapter(
1✔
401
            config=config, directory=directory, execute_command=execute_command
402
        )
403
    elif config["queue_type"] in ["REMOTE"]:
1✔
404
        # The RemoteQueueAdapter has additional dependencies, namely paramiko and tqdm.
405
        # By moving the import to this line it only fails when the user specifies the
406
        # RemoteQueueAdapter in their pysqa configuration.
407
        from pysqa.base.remote import RemoteQueueAdapter
1✔
408

409
        return RemoteQueueAdapter(
1✔
410
            config=config, directory=directory, execute_command=execute_command
411
        )
412
    else:
413
        raise ValueError(
1✔
414
            "The queue_type "
415
            + config["queue_type"]
416
            + " is not found in the list of supported queue types "
417
            + str(["SGE", "TORQUE", "SLURM", "LSF", "MOAB", "FLUX", "GENT", "REMOTE"])
418
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc