• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

desihub / desispec / 8885824765

29 Apr 2024 09:43PM UTC coverage: 28.042%. First build
8885824765

push

github

akremin
Reduce sleeps in submissions and introduce opt-in qid status caching

6 of 21 new or added lines in 1 file covered. (28.57%)

13254 of 47265 relevant lines covered (28.04%)

0.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.74
/py/desispec/workflow/queue.py
1
"""
2
desispec.workflow.queue
3
=======================
4

5
"""
6
import os
1✔
7
import numpy as np
1✔
8
from astropy.table import Table
1✔
9
import subprocess
1✔
10
from desiutil.log import get_logger
1✔
11
import time, datetime
1✔
12

13
global _cached_slurm_states
14
_cached_slurm_states = dict()
1✔
15

16
def get_resubmission_states():
1✔
17
    """
18
    Defines what Slurm job failure modes should be resubmitted in the hopes of the job succeeding the next time.
19

20
    Possible values that Slurm returns are::
21

22
        CA or ca or CANCELLED for cancelled jobs will only show currently running jobs in queue unless times are explicitly given
23
        BF BOOT_FAIL   Job terminated due to launch failure
24
        CA CANCELLED Job was explicitly cancelled by the user or system administrator. The job may or may not have been initiated.
25
        CD COMPLETED Job has terminated all processes on all nodes with an exit code of zero.
26
        DL DEADLINE Job terminated on deadline.
27
        F FAILED Job terminated with non-zero exit code or other failure condition.
28
        NF NODE_FAIL Job terminated due to failure of one or more allocated nodes.
29
        OOM OUT_OF_MEMORY Job experienced out of memory error.
30
        PD PENDING Job is awaiting resource allocation.
31
        PR PREEMPTED Job terminated due to preemption.
32
        R RUNNING Job currently has an allocation.
33
        RQ REQUEUED Job was requeued.
34
        RS RESIZING Job is about to change size.
35
        RV REVOKED Sibling was removed from cluster due to other cluster starting the job.
36
        S SUSPENDED Job has an allocation, but execution has been suspended and CPUs have been released for other jobs.
37
        TO TIMEOUT Job terminated upon reaching its time limit.
38

39
    Returns:
40
        list. A list of strings outlining the job states that should be resubmitted.
41
    """
42
    return ['UNSUBMITTED', 'BOOT_FAIL', 'DEADLINE', 'NODE_FAIL', 'OUT_OF_MEMORY', 'PREEMPTED', 'TIMEOUT', 'CANCELLED']
×
43

44

45
def get_termination_states():
1✔
46
    """
47
    Defines what Slurm job states that are final and aren't in question about needing resubmission.
48

49
    Possible values that Slurm returns are::
50

51
        CA or ca or CANCELLED for cancelled jobs will only show currently running jobs in queue unless times are explicitly given
52
        BF BOOT_FAIL   Job terminated due to launch failure
53
        CA CANCELLED Job was explicitly cancelled by the user or system administrator. The job may or may not have been initiated.
54
        CD COMPLETED Job has terminated all processes on all nodes with an exit code of zero.
55
        DL DEADLINE Job terminated on deadline.
56
        F FAILED Job terminated with non-zero exit code or other failure condition.
57
        NF NODE_FAIL Job terminated due to failure of one or more allocated nodes.
58
        OOM OUT_OF_MEMORY Job experienced out of memory error.
59
        PD PENDING Job is awaiting resource allocation.
60
        PR PREEMPTED Job terminated due to preemption.
61
        R RUNNING Job currently has an allocation.
62
        RQ REQUEUED Job was requeued.
63
        RS RESIZING Job is about to change size.
64
        RV REVOKED Sibling was removed from cluster due to other cluster starting the job.
65
        S SUSPENDED Job has an allocation, but execution has been suspended and CPUs have been released for other jobs.
66
        TO TIMEOUT Job terminated upon reaching its time limit.
67

68
    Returns:
69
        list. A list of strings outlining the job states that are considered final (without human investigation/intervention)
70
    """
71
    return ['COMPLETED', 'CANCELLED', 'FAILED']
×
72

73
def get_failed_states():
1✔
74
    """ 
75
    Defines what Slurm job states should be considered failed or problematic
76

77
    All possible values that Slurm returns are:
78
        BF BOOT_FAIL Job terminated due to launch failure, typically due to a hardware failure (e.g. unable to boot the node or block and the job can not be requeued).
79
        CA CANCELLED Job was explicitly cancelled by the user or system administrator. The job may or may not have been initiated.
80
        CD COMPLETED Job has terminated all processes on all nodes with an exit code of zero.
81
        CF CONFIGURING Job has been allocated resources, but are waiting for them to become ready for use (e.g. booting).
82
        CG COMPLETING Job is in the process of completing. Some processes on some nodes may still be active.
83
        DL DEADLINE Job terminated on deadline.
84
        F FAILED Job terminated with non-zero exit code or other failure condition.
85
        NF NODE_FAIL Job terminated due to failure of one or more allocated nodes.
86
        OOM OUT_OF_MEMORY Job experienced out of memory error.
87
        PD PENDING Job is awaiting resource allocation.
88
        PR PREEMPTED Job terminated due to preemption.
89
        R RUNNING Job currently has an allocation.
90
        RD RESV_DEL_HOLD Job is being held after requested reservation was deleted.
91
        RF REQUEUE_FED Job is being requeued by a federation.
92
        RH REQUEUE_HOLD Held job is being requeued.
93
        RQ REQUEUED Completing job is being requeued.
94
        RS RESIZING Job is about to change size.
95
        RV REVOKED Sibling was removed from cluster due to other cluster starting the job.
96
        SI SIGNALING Job is being signaled.
97
        SE SPECIAL_EXIT The job was requeued in a special state. This state can be set by users, typically in EpilogSlurmctld, if the job has terminated with a particular exit value.
98
        SO STAGE_OUT Job is staging out files.
99
        ST STOPPED Job has an allocation, but execution has been stopped with SIGSTOP signal. CPUS have been retained by this job.
100
        S SUSPENDED Job has an allocation, but execution has been suspended and CPUs have been released for other jobs.
101
        TO TIMEOUT Job terminated upon reaching its time limit.
102
    
103
    Returns:
104
        list. A list of strings outlining the job states that are considered to be
105
            failed or problematic.
106
    """
107
    return ['BOOT_FAIL', 'CANCELLED', 'DEADLINE', 'FAILED', 'NODE_FAIL',
×
108
            'OUT_OF_MEMORY', 'PREEMPTED', 'REVOKED', 'SUSPENDED', 'TIMEOUT']
109

110

111
def queue_info_from_time_window(start_time=None, end_time=None, user=None, \
1✔
112
                             columns='jobid,jobname,partition,submit,eligible,'+
113
                                     'start,end,elapsed,state,exitcode',
114
                             dry_run=0):
115
    """
116
    Queries the NERSC Slurm database using sacct with appropriate flags to get information within a specified time
117
    window of all jobs submitted or executed during that time.
118

119
    Parameters
120
    ----------
121
    start_time : str
122
        String of the form YYYY-mm-ddTHH:MM:SS. Based on the given night and the earliest hour you
123
        want to see queue information about.
124
    end_time : str
125
        String of the form YYYY-mm-ddTHH:MM:SS. Based on the given night and the latest hour you
126
        want to see queue information about.
127
    user : str
128
        The username at NERSC that you want job information about. The default is an the environment name if
129
        if exists, otherwise 'desi'.
130
    columns : str
131
        Comma seperated string of valid sacct column names, in lower case. To be useful for the workflow,
132
        it should have MUST have columns "JOBID" and "STATE". Other columns available that aren't included
133
        in the default list are: jobid,jobname,partition,submit,eligible,start,end,elapsed,state,exitcode.
134
        Other options include: suspended,derivedexitcode,reason,priority,jobname.
135
    dry_run : int
136
        Whether this is a simulated run or real run. If nonzero, it is a simulation and it returns a default
137
        table that doesn't query the Slurm scheduler.
138

139
    Returns
140
    -------
141
    Table
142
        Table with the columns defined by the input variable 'columns' and information relating
143
        to all jobs submitted by the specified user in the specified time frame.
144
    """
145
    # global queue_info_table
146
    if dry_run:
×
147
        string = 'JobID,JobName,Partition,Submit,Eligible,Start,End,State,ExitCode\n'
×
148
        string += '49482394,arc-20211102-00107062-a0123456789,realtime,2021-11-02'\
×
149
                  +'T18:31:14,2021-11-02T18:36:33,2021-11-02T18:36:33,2021-11-02T'\
150
                  +'18:48:32,COMPLETED,0:0' + '\n'
151
        string += '49482395,arc-20211102-00107063-a0123456789,realtime,2021-11-02'\
×
152
                  +'T18:31:16,2021-11-02T18:36:33,2021-11-02T18:48:34,2021-11-02T'\
153
                  +'18:57:02,COMPLETED,0:0' + '\n'
154
        string += '49482397,arc-20211102-00107064-a0123456789,realtime,2021-11-02'\
×
155
                  +'T18:31:19,2021-11-02T18:36:33,2021-11-02T18:57:05,2021-11-02T'\
156
                  +'19:06:17,COMPLETED,0:0' + '\n'
157
        string += '49482398,arc-20211102-00107065-a0123456789,realtime,2021-11-02'\
×
158
                  +'T18:31:24,2021-11-02T18:36:33,2021-11-02T19:06:18,2021-11-02T'\
159
                  +'19:13:59,COMPLETED,0:0' + '\n'
160
        string += '49482399,arc-20211102-00107066-a0123456789,realtime,2021-11-02'\
×
161
                  +'T18:31:27,2021-11-02T18:36:33,2021-11-02T19:14:00,2021-11-02T'\
162
                  +'19:24:49,COMPLETED,0:0'
163
        cmd_as_list = ['echo', string]
×
164
    else:
165
        if user is None:
×
166
            if 'USER' in os.environ:
×
167
                user = os.environ['USER']
×
168
            else:
169
                user = 'desi'
×
170
        if start_time is None:
×
171
            start_time = '2020-04-26T00:00'
×
172
        if end_time is None:
×
173
            end_time = '2020-05-01T00:00'
×
174
        cmd_as_list = ['sacct', '-X', '--parsable2', '--delimiter=,', \
×
175
                       '-S', start_time, \
176
                       '-E', end_time, \
177
                       '-u', user, \
178
                       f'--format={columns}']
179

180
    table_as_string = subprocess.check_output(cmd_as_list, text=True,
×
181
                                              stderr=subprocess.STDOUT)
182
    queue_info_table = Table.read(table_as_string, format='ascii.csv')
×
183

184
    for col in queue_info_table.colnames:
×
185
        queue_info_table.rename_column(col, col.upper())
×
186

187
    ## Update the cached states of these jobids if we have that info to update
NEW
188
    if 'JOBID' in queue_info_table.colnames and 'STATE' in queue_info_table.colnames:
×
NEW
189
        for row in queue_info_table:
×
NEW
190
            _cached_slurm_states[int(row['JOBID'])] = row['STATE']
×
191

NEW
192
    return queue_info_table
×
193

194
def queue_info_from_qids(qids, columns='jobid,jobname,partition,submit,'+
1✔
195
                         'eligible,start,end,elapsed,state,exitcode', dry_run=0):
196
    """
197
    Queries the NERSC Slurm database using sacct with appropriate flags to get
198
    information about specific jobs based on their jobids.
199

200
    Parameters
201
    ----------
202
    jobids : list or array of ints
203
        Slurm QID's at NERSC that you want to return information about.
204
    columns : str
205
        Comma seperated string of valid sacct column names, in lower case. To be useful for the workflow,
206
        it should have MUST have columns "JOBID" and "STATE". Other columns available that aren't included
207
        in the default list are: jobid,jobname,partition,submit,eligible,start,end,elapsed,state,exitcode.
208
        Other options include: suspended,derivedexitcode,reason,priority,jobname.
209
    dry_run : int
210
        Whether this is a simulated run or real run. If nonzero, it is a simulation and it returns a default
211
        table that doesn't query the Slurm scheduler.
212

213
    Returns
214
    -------
215
    Table
216
        Table with the columns defined by the input variable 'columns' and information relating
217
        to all jobs submitted by the specified user in the specified time frame.
218
    """
219
    qids = np.atleast_1d(qids).astype(int)
1✔
220
    log = get_logger()
1✔
221

222
    ## Turn the queue id's into a list
223
    ## this should work with str or int type also, though not officially supported
224
    qid_str = ','.join(np.atleast_1d(qids).astype(str)).replace(' ','')
1✔
225

226
    cmd_as_list = ['sacct', '-X', '--parsable2', '--delimiter=,',
1✔
227
                   f'--format={columns}', '-j', qid_str]
228
    if dry_run:
1✔
229
        log.info("Dry run, would have otherwise queried Slurm with the"
1✔
230
                 +f" following: {' '.join(cmd_as_list)}")
231
        string = 'JobID,JobName,Partition,Submit,Eligible,Start,End,State,ExitCode\n'
1✔
232
        string += '49482394,arc-20211102-00107062-a0123456789,realtime,2021-11-02'\
1✔
233
                  +'T18:31:14,2021-11-02T18:36:33,2021-11-02T18:36:33,2021-11-02T'\
234
                  +'18:48:32,COMPLETED,0:0' + '\n'
235
        string += '49482395,arc-20211102-00107063-a0123456789,realtime,2021-11-02'\
1✔
236
                  +'T18:31:16,2021-11-02T18:36:33,2021-11-02T18:48:34,2021-11-02T'\
237
                  +'18:57:02,COMPLETED,0:0' + '\n'
238
        string += '49482397,arc-20211102-00107064-a0123456789,realtime,2021-11-02'\
1✔
239
                  +'T18:31:19,2021-11-02T18:36:33,2021-11-02T18:57:05,2021-11-02T'\
240
                  +'19:06:17,COMPLETED,0:0' + '\n'
241
        string += '49482398,arc-20211102-00107065-a0123456789,realtime,2021-11-02'\
1✔
242
                  +'T18:31:24,2021-11-02T18:36:33,2021-11-02T19:06:18,2021-11-02T'\
243
                  +'19:13:59,COMPLETED,0:0' + '\n'
244
        string += '49482399,arc-20211102-00107066-a0123456789,realtime,2021-11-02'\
1✔
245
                  +'T18:31:27,2021-11-02T18:36:33,2021-11-02T19:14:00,2021-11-02T'\
246
                  +'19:24:49,COMPLETED,0:0'
247
        cmd_as_list = ['echo', string]
1✔
248
        table_as_string = subprocess.check_output(cmd_as_list, text=True,
1✔
249
                                      stderr=subprocess.STDOUT)
250
    else:
251
        log.info(f"Querying Slurm with the following: {' '.join(cmd_as_list)}")
×
252

253
        #- sacct sometimes fails; try several times before giving up
254
        max_attempts = 3
×
255
        for attempt in range(max_attempts):
×
256
            try:
×
257
                table_as_string = subprocess.check_output(cmd_as_list, text=True,
×
258
                                              stderr=subprocess.STDOUT)
259
                break
×
260
            except subprocess.CalledProcessError as err:
×
261
                log.error(f'{qid_str} job query via sacct failure at {datetime.datetime.now()}')
×
262
                log.error(f'{qid_str} {cmd_as_list}')
×
263
                log.error(f'{qid_str} {err.output=}')
×
264
        else:  #- for/else happens if loop doesn't succeed
265
            msg = f'{qid_str} job query via sacct failed {max_attempts} times; exiting'
×
266
            log.critical(msg)
×
267
            raise RuntimeError(msg)
×
268

269
    queue_info_table = Table.read(table_as_string, format='ascii.csv')
1✔
270
    for col in queue_info_table.colnames:
1✔
271
        queue_info_table.rename_column(col, col.upper())
1✔
272

273
    ## Update the cached states of these jobids if we have that info to update
274
    if 'JOBID' in queue_info_table.colnames and 'STATE' in queue_info_table.colnames:
1✔
275
        for row in queue_info_table:
1✔
276
            _cached_slurm_states[int(row['JOBID'])] = row['STATE']
1✔
277

278
    return queue_info_table
1✔
279

280
def get_queue_states_from_qids(qids, dry_run=0, use_cache=False):
1✔
281
    """
282
    Queries the NERSC Slurm database using sacct with appropriate flags to get
283
    information on the job STATE. If use_cache is set and all qids have cached
284
    values from a previous query, those cached states will be returned instead.
285

286
    Parameters
287
    ----------
288
    jobids : list or array of ints
289
        Slurm QID's at NERSC that you want to return information about.
290
    dry_run : int
291
        Whether this is a simulated run or real run. If nonzero, it is a simulation and it returns a default
292
        table that doesn't query the Slurm scheduler.
293
    use_cache, bool. If True the code first looks for a cached status
294
        for the qid. If unavailable, then it queries Slurm. Default is False.
295

296
    Returns
297
    -------
298
    Dict
299
        Dictionary with the keys as jobids and values as the slurm state of the job.
300
    """
NEW
301
    qids = np.atleast_1d(qids).astype(int)
×
NEW
302
    log = get_logger()
×
303

304
    ## Only use cached values if all are cahced, since the time is dominated
305
    ## by the call itself rather than the number of jobids, so we may as well
306
    ## get updated information from all of them if we're submitting a query anyway
NEW
307
    outdict = dict()
×
NEW
308
    if use_cache and np.all(np.isin(qids, list(_cached_slurm_states.keys()))):
×
NEW
309
        log.info(f"All Slurm {qids=} are cached. Using cached values.")
×
NEW
310
        for qid in qids:
×
NEW
311
            outdict[qid] = _cached_slurm_states[qid]
×
312
    else:
NEW
313
        outtable = queue_info_from_qids(qids, columns='jobid,state', dry_run=dry_run)
×
NEW
314
        for row in outtable:
×
NEW
315
            outdict[int(row['JOBID'])] = row['STATE']
×
NEW
316
    return outdict
×
317

318
def update_from_queue(ptable, qtable=None, dry_run=0, ignore_scriptnames=False):
1✔
319
    """
320
    Given an input prcessing table (ptable) and query table from the Slurm queue (qtable) it cross matches the
321
    Slurm job ID's and updates the 'state' in the table using the current state in the Slurm scheduler system.
322

323
    Args:
324
        ptable, Table. Processing table that contains the jobs you want updated with the most recent queue table. Must
325
                       have at least columnns 'LATEST_QID' and 'STATUS'.
326
        qtable, Table. Table with the columns defined by the input variable 'columns' and information relating
327
                                 to all jobs submitted by the specified user in the specified time frame.
328
        ignore_scriptnames, bool. Default is False. Set to true if you do not
329
                        want to check whether the scriptname matches the jobname
330
                        return by the slurm scheduler.
331
        The following are only used if qtable is not provided:
332
            dry_run, int. Whether this is a simulated run or real run. If nonzero, it is a simulation and it returns a default
333
                           table that doesn't query the Slurm scheduler.
334

335
    Returns:
336
        ptable, Table. The same processing table as the input except that the "STATUS" column in ptable for all jobs is
337
                       updated based on the 'STATE' in the qtable (as matched by "LATEST_QID" in the ptable
338
                       and "JOBID" in the qtable).
339
    """
340
    log = get_logger()
1✔
341
    if qtable is None:
1✔
342
        log.info("qtable not provided, querying Slurm using ptable's LATEST_QID set")
1✔
343
        qids = np.array(ptable['LATEST_QID'])
1✔
344
        ## Avoid null valued QID's (set to -99)
345
        qids = qids[qids > 0]
1✔
346
        qtable = queue_info_from_qids(qids, dry_run=dry_run)
1✔
347

348
    log.info(f"Slurm returned information on {len(qtable)} jobs out of "
1✔
349
             +f"{len(ptable)} jobs in the ptable. Updating those now.")
350

351
    check_scriptname = ('JOBNAME' in qtable.colnames
1✔
352
                        and 'SCRIPTNAME' in ptable.colnames
353
                        and not ignore_scriptnames)
354
    if check_scriptname:
1✔
355
        log.info("Will be verifying that the file names are consistent")
1✔
356

357
    for row in qtable:
1✔
358
        match = (int(row['JOBID']) == ptable['LATEST_QID'])
1✔
359
        if np.any(match):
1✔
360
            ind = np.where(match)[0][0]
×
361
            if check_scriptname and ptable['SCRIPTNAME'][ind] not in row['JOBNAME']:
×
362
                log.warning(f"For job with expids:{ptable['EXPID'][ind]}"
×
363
                            + f" the scriptname is {ptable['SCRIPTNAME'][ind]}"
364
                            + f" but the jobname in the queue was "
365
                            + f"{row['JOBNAME']}.")
366
            state = str(row['STATE']).split(' ')[0]
×
367
            ptable['STATUS'][ind] = state
×
368

369
    return ptable
1✔
370

371
def any_jobs_not_complete(statuses, termination_states=None):
1✔
372
    """
373
    Returns True if any of the job statuses in the input column of the processing table, statuses, are not complete
374
    (as based on the list of acceptable final states, termination_states, given as an argument. These should be states
375
    that are viewed as final, as opposed to job states that require resubmission.
376

377
    Args:
378
        statuses, Table.Column or list or np.array. The statuses in the processing table "STATUS". Each element should
379
                                                    be a string.
380
        termination_states, list or np.array. Each element should be a string signifying a state that is returned
381
                                              by the Slurm scheduler that should be deemed terminal state.
382

383
    Returns:
384
        bool. True if any of the statuses of the jobs given in statuses are NOT a member of the termination states.
385
              Otherwise returns False.
386
    """
387
    if termination_states is None:
×
388
        termination_states = get_termination_states()
×
389
    return np.any([status not in termination_states for status in statuses])
×
390

391
def any_jobs_failed(statuses, failed_states=None):
1✔
392
    """
393
    Returns True if any of the job statuses in the input column of the
394
    processing table, statuses, are not complete (as based on the list of
395
    acceptable final states, termination_states, given as an argument. These
396
    should be states that are viewed as final, as opposed to job states
397
    that require resubmission.
398

399
    Args:
400
        statuses, Table.Column or list or np.array. The statuses in the
401
            processing table "STATUS". Each element should be a string.
402
        failed_states, list or np.array. Each element should be a string
403
            signifying a state that is returned by the Slurm scheduler that
404
            should be consider failing or problematic.
405

406
    Returns:
407
        bool. True if any of the statuses of the jobs given in statuses are 
408
            a member of the failed_states.
409
    """
410
    if failed_states is None:
×
411
        failed_states = get_failed_states()
×
412
    return np.any([status in failed_states for status in statuses])
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc