• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / pyiron_base / 9757327958

02 Jul 2024 08:11AM UTC coverage: 71.455% (+0.02%) from 71.438%
9757327958

Pull #1505

github

web-flow
Merge 8c7bf806e into 51ba6c28a
Pull Request #1505: PythonFunctionContainerJob: Use calculate() function

36 of 39 new or added lines in 2 files covered. (92.31%)

2 existing lines in 1 file now uncovered.

7232 of 10121 relevant lines covered (71.46%)

0.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

31.71
/pyiron_base/jobs/job/extension/server/queuestatus.py
1
# coding: utf-8
2
# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department
3
# Distributed under the terms of "New BSD License", see the LICENSE file.
4
"""
5
Set of functions to interact with the queuing system directly from within pyiron - optimized for the Sun grid engine.
6
"""
7

8
from concurrent.futures import Future
1✔
9
import pandas
1✔
10
import time
1✔
11
import numpy as np
1✔
12
from pyiron_base.state import state
1✔
13
from pyiron_base.utils.instance import static_isinstance
1✔
14
from pyiron_base.jobs.job.extension.jobstatus import job_status_finished_lst
1✔
15

16
__author__ = "Jan Janssen"
1✔
17
__copyright__ = (
1✔
18
    "Copyright 2020, Max-Planck-Institut für Eisenforschung GmbH - "
19
    "Computational Materials Design (CM) Department"
20
)
21
__version__ = "1.0"
1✔
22
__maintainer__ = "Jan Janssen"
1✔
23
__email__ = "janssen@mpie.de"
1✔
24
__status__ = "production"
1✔
25
__date__ = "Sep 1, 2017"
1✔
26

27
QUEUE_SCRIPT_PREFIX = "pi_"
1✔
28

29

30
def queue_table(
1✔
31
    job_ids=None, working_directory_lst=None, project_only=True, full_table=False
32
):
33
    """
34
    Display the queuing system table as pandas.Dataframe
35

36
    Args:
37
        job_ids (list): check for a specific list of job IDs - empty list by default
38
        working_directory_lst (list): list of working directories to include - empty list by default
39
        project_only (bool): Query only for jobs within the current project - True by default
40
        full_table (bool): Return all entries from the queuing system without filtering - False by default
41

42
    Returns:
43
        pandas.DataFrame: Output from the queuing system - optimized for the Sun grid engine
44
    """
45
    job_ids = [] if job_ids is None else job_ids
×
46
    working_directory_lst = (
×
47
        [] if working_directory_lst is None else working_directory_lst
48
    )
49
    if project_only and not job_ids and not working_directory_lst:
×
50
        return []
×
51
    if state.queue_adapter is not None:
×
52
        if full_table:
×
53
            pandas.set_option("display.max_rows", None)
×
54
            pandas.set_option("display.max_columns", None)
×
55
        else:
56
            pandas.reset_option("display.max_rows")
×
57
            pandas.reset_option("display.max_columns")
×
58
        df = state.queue_adapter.get_status_of_my_jobs()
×
59
        if not project_only:
×
60
            return df[
×
61
                [
62
                    True if QUEUE_SCRIPT_PREFIX in job_name else False
63
                    for job_name in list(df.jobname)
64
                ]
65
            ]
66
        else:
67
            if len(job_ids) > len(working_directory_lst):
×
68
                job_name_lst = [QUEUE_SCRIPT_PREFIX + str(job_id) for job_id in job_ids]
×
69
                return df[
×
70
                    [
71
                        True if job_name in job_name_lst else False
72
                        for job_name in list(df.jobname)
73
                    ]
74
                ]
75
            else:
76
                if len(df) > 0 and "working_directory" in df.columns:
×
77
                    return df[
×
78
                        [
79
                            any(
80
                                [
81
                                    working_dir.startswith(p)
82
                                    for p in working_directory_lst
83
                                ]
84
                            )
85
                            for working_dir in list(df.working_directory)
86
                        ]
87
                    ]
88
                else:
89
                    return df
×
90
    else:
91
        return None
×
92

93

94
def queue_check_job_is_waiting_or_running(item):
1✔
95
    """
96
    Check if a job is still listed in the queue system as either waiting or running.
97

98
    Args:
99
        item (int, GenericJob): Provide either the job_ID or the full hamiltonian
100

101
    Returns:
102
        bool: [True/False]
103
    """
104
    que_id = validate_que_request(item)
×
105
    if state.queue_adapter is not None:
×
106
        return state.queue_adapter.get_status_of_job(process_id=que_id) in [
×
107
            "pending",
108
            "running",
109
        ]
110
    else:
111
        return None
×
112

113

114
def queue_info_by_job_id(job_id):
1✔
115
    """
116
    Display the queuing system info of job by qstat | grep  shell command
117
    as dictionary
118

119
    Args:
120
        job_id (int): query for a specific job_id
121

122
    Returns:
123
        dict: Dictionary with the output from the queuing system - optimized for the Sun grid engine
124
    """
125
    if state.queue_adapter is not None:
×
126
        return state.queue_adapter.get_status_of_job(process_id=job_id)
×
127
    else:
128
        return None
×
129

130

131
def queue_is_empty():
1✔
132
    """
133
    Check if the queue table is currently empty - no more jobs to wait for.
134

135
    Returns:
136
        bool: True if the table is empty, else False - optimized for the Sun grid engine
137
    """
138
    if state.queue_adapter is not None:
×
139
        return len(state.queue_adapter.get_status_of_my_jobs()) == 0
×
140
    else:
141
        return True
×
142

143

144
def queue_delete_job(item):
1✔
145
    """
146
    Delete a job from the queuing system
147

148
    Args:
149
        item (int, pyiron_base.jobs.job.generic.GenericJob): Provide either the job_ID or the full hamiltonian
150

151
    Returns:
152
        str: Output from the queuing system as string - optimized for the Sun grid engine
153
    """
154
    que_id = validate_que_request(item)
×
155
    if state.queue_adapter is not None:
×
156
        return state.queue_adapter.delete_job(process_id=que_id)
×
157
    else:
158
        return None
×
159

160

161
def queue_enable_reservation(item):
1✔
162
    """
163
    Enable a reservation for a particular job within the queuing system
164

165
    Args:
166
        item (int, pyiron_base.jobs.job.generic.GenericJob): Provide either the job_ID or the full hamiltonian
167

168
    Returns:
169
        str: Output from the queuing system as string - optimized for the Sun grid engine
170
    """
171
    que_id = validate_que_request(item)
×
172
    if state.queue_adapter is not None:
×
173
        if isinstance(que_id, list):
×
174
            return [
×
175
                state.queue_adapter.enable_reservation(process_id=q) for q in que_id
176
            ]
177
        else:
178
            return state.queue_adapter.enable_reservation(process_id=que_id)
×
179
    else:
180
        return None
×
181

182

183
def wait_for_job(job, interval_in_s=5, max_iterations=100):
1✔
184
    """
185
    Sleep until the job is finished but maximum interval_in_s * max_iterations seconds.
186

187
    Args:
188
        job (pyiron_base.job.utils.GenericJob): Job to wait for
189
        interval_in_s (int): interval when the job status is queried from the database - default 5 sec.
190
        max_iterations (int): maximum number of iterations - default 100
191

192
    Raises:
193
        ValueError: max_iterations reached, job still running
194
    """
195
    if job.status.string not in job_status_finished_lst:
1✔
196
        if (
1✔
197
            state.queue_adapter is not None
198
            and state.queue_adapter.remote_flag
199
            and job.server.queue is not None
200
        ):
201
            finished = False
×
202
            for _ in range(max_iterations):
×
203
                if not queue_check_job_is_waiting_or_running(item=job):
×
204
                    state.queue_adapter.transfer_file_to_remote(
×
205
                        file=job.project_hdf5.file_name,
206
                        transfer_back=True,
207
                        delete_file_on_remote=False,
208
                    )
209
                    status_hdf5 = job.project_hdf5["status"]
×
210
                    job.status.string = status_hdf5
×
211
                else:
212
                    status_hdf5 = job.status.string
×
213
                if status_hdf5 in job_status_finished_lst:
×
214
                    job.transfer_from_remote()
×
215
                    finished = True
×
216
                    break
×
217
                time.sleep(interval_in_s)
×
218
            if not finished:
×
219
                raise ValueError(
×
220
                    "Maximum iterations reached, but the job was not finished."
221
                )
222
        else:
223
            finished = False
1✔
224
            for _ in range(max_iterations):
1✔
225
                if state.database.database_is_disabled:
1✔
226
                    job.project.db.update()
×
227
                job.refresh_job_status()
1✔
228
                if job.status.string in job_status_finished_lst:
1✔
UNCOV
229
                    finished = True
×
UNCOV
230
                    break
×
231
                elif isinstance(job.server.future, Future):
1✔
232
                    try:
1✔
233
                        job.server.future.result(timeout=interval_in_s)
1✔
234
                    except TimeoutError:
1✔
235
                        pass
1✔
236
                    else:
237
                        finished = job.server.future.done()
1✔
238
                        break
1✔
239
                else:
240
                    time.sleep(interval_in_s)
×
241
            if not finished:
1✔
242
                raise ValueError(
×
243
                    "Maximum iterations reached, but the job was not finished."
244
                )
245

246

247
def wait_for_jobs(
1✔
248
    project,
249
    interval_in_s=5,
250
    max_iterations=100,
251
    recursive=True,
252
    ignore_exceptions=False,
253
    try_collecting=False,
254
):
255
    """
256
    Wait for the calculation in the project to be finished
257

258
    Args:
259
        project: Project instance the jobs is located in
260
        interval_in_s (int): interval when the job status is queried from the database - default 5 sec.
261
        max_iterations (int): maximum number of iterations - default 100
262
        recursive (bool): search subprojects [True/False] - default=True
263
        ignore_exceptions (bool): ignore eventual exceptions when retrieving jobs - default=False
264
        try_collecting (bool): try to run collect for fetched jobs that don't have a status counting as finished - default=False
265

266
    Raises:
267
        ValueError: max_iterations reached, but jobs still running
268
    """
269
    finished = False
1✔
270
    for _ in range(max_iterations):
1✔
271
        project.update_from_remote(recursive=True, ignore_exceptions=ignore_exceptions)
1✔
272
        project.refresh_job_status()
1✔
273
        df = project.job_table(recursive=recursive)
1✔
274
        if all(df.status.isin(job_status_finished_lst)):
1✔
275
            finished = True
1✔
276
            break
1✔
277
        time.sleep(interval_in_s)
1✔
278
    if not finished:
1✔
279
        raise ValueError("Maximum iterations reached, but the job was not finished.")
×
280

281

282
def update_from_remote(
1✔
283
    project, recursive=True, ignore_exceptions=False, try_collecting=False
284
):
285
    """
286
    Update jobs from the remote server
287

288
    Args:
289
        project: Project instance the jobs is located in
290
        recursive (bool): search subprojects [True/False] - default=True
291
        ignore_exceptions (bool): ignore eventual exceptions when retrieving jobs - default=False
292
        try_collecting (bool): try to collect jobs that don't have a status counting as finished - default=False
293

294
    Returns:
295
        returns None if ignore_exceptions is False or when no error occured.
296
        returns a list with job ids when errors occured, but were ignored
297
    """
298
    if state.queue_adapter is not None and state.queue_adapter.remote_flag:
1✔
299
        df_project = project.job_table(recursive=recursive)
×
300
        df_submitted = df_project[df_project.status == "submitted"]
×
301
        df_combined = df_project[df_project.status.isin(["running", "submitted"])]
×
302
        df_queue = state.queue_adapter.get_status_of_my_jobs()
×
303
        if (
×
304
            len(df_queue) > 0
305
            and len(df_queue[df_queue.jobname.str.contains(QUEUE_SCRIPT_PREFIX)]) > 0
306
        ):
307
            df_queue = df_queue[df_queue.jobname.str.contains(QUEUE_SCRIPT_PREFIX)]
×
308
            df_queue["pyiron_id"] = df_queue.apply(
×
309
                lambda x: int(x["jobname"].split(QUEUE_SCRIPT_PREFIX)[-1]), axis=1
310
            )
311
            queue_running = df_queue[df_queue.status == "running"].pyiron_id.values
×
312
            jobs_now_running_lst = df_submitted.id.values[
×
313
                np.isin(df_submitted.id.values, queue_running)
314
            ]
315
            project.db.set_job_status(status="running", job_id=jobs_now_running_lst)
×
316

317
            fetch_ids = df_combined.id.values[
×
318
                np.isin(df_combined.id.values, df_queue.pyiron_id.values, invert=True)
319
            ]
320
        else:  # handle empty pyiron queue case for fetching
321
            fetch_ids = df_combined.id.values
×
322

323
        failed_jobs = []
×
324
        for job_id in fetch_ids:
×
325
            try:
×
326
                job = project.load(job_id)
×
327
                retrieve_job(job, try_collecting=try_collecting)
×
328
            except Exception as e:
×
329
                if ignore_exceptions:
×
330
                    state.logger.warning(
×
331
                        f"An error occurred while trying to retrieve job {job_id}\n"
332
                        f"Error message: \n{e}"
333
                    )
334
                    failed_jobs.append(job_id)
×
335
                else:
336
                    raise e
×
337

338
        if len(failed_jobs) > 0:
×
339
            return failed_jobs
×
340

341

342
def retrieve_job(job, try_collecting=False):
1✔
343
    """
344
    Retrieve a job from remote server and check if it has a "finished status".
345
    Optionally try to collect its output.
346

347
    Args:
348
        job: pyiron job
349
        try_collecting (bool): whether to run collect if not finished - default=False
350

351
    Returns:
352
        returns None
353
    """
354
    job.transfer_from_remote()
×
355
    if job.status in job_status_finished_lst:
×
356
        return
×
357

358
    if try_collecting:
×
359
        job.status.collect = True
×
360
        job.run()
×
361

362

363
def validate_que_request(item):
1✔
364
    """
365
    Internal function to convert the job_ID or hamiltonian to the queuing system ID.
366

367
    Args:
368
        item (int, pyiron_base.jobs.job.generic.GenericJob): Provide either the job_ID or the full hamiltonian
369

370
    Returns:
371
        int: queuing system ID
372
    """
373

374
    if isinstance(item, int):
×
375
        que_id = item
×
376
    elif static_isinstance(
×
377
        item.__class__, "pyiron_base.jobs.master.generic.GenericMaster"
378
    ):
379
        if item.server.queue_id:
×
380
            que_id = item.server.queue_id
×
381
        else:
382
            queue_id_lst = [
×
383
                item.project.load(child_id).server.queue_id
384
                for child_id in item.child_ids
385
            ]
386
            que_id = [queue_id for queue_id in queue_id_lst if queue_id is not None]
×
387
            if len(que_id) == 0:
×
388
                raise ValueError("This job does not have a queue ID.")
×
389
    elif static_isinstance(item.__class__, "pyiron_base.jobs.job.generic.GenericJob"):
×
390
        if item.server.queue_id:
×
391
            que_id = item.server.queue_id
×
392
        else:
393
            raise ValueError("This job does not have a queue ID.")
×
394
    elif static_isinstance(item.__class__, "pyiron_base.jobs.job.core.JobCore"):
×
395
        if "server" in item.project_hdf5.list_nodes():
×
396
            server_hdf_dict = item.project_hdf5["server"]
×
397
            if "qid" in server_hdf_dict.keys():
×
398
                que_id = server_hdf_dict["qid"]
×
399
            else:
400
                raise ValueError("This job does not have a queue ID.")
×
401
        else:
402
            raise ValueError("This job does not have a queue ID.")
×
403
    else:
404
        raise TypeError(
×
405
            "The queue can either query for IDs or for pyiron GenericJobObjects."
406
        )
407
    return que_id
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc