• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

desihub / desispec / 11243997015

08 Oct 2024 05:56AM UTC coverage: 30.081% (-0.02%) from 30.102%
11243997015

push

github

akremin
fix queue bug and only update from queue in proc_night if dry run set appropriately

7 of 29 new or added lines in 3 files covered. (24.14%)

337 existing lines in 3 files now uncovered.

14628 of 48629 relevant lines covered (30.08%)

0.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.24
/py/desispec/workflow/queue.py
1
"""
2
desispec.workflow.queue
3
=======================
4

5
"""
6
import os
1✔
7
import re
1✔
8
import numpy as np
1✔
9
from astropy.table import Table, vstack
1✔
10
import subprocess
1✔
11

12
from desispec.workflow.proctable import get_default_qid
1✔
13
from desiutil.log import get_logger
1✔
14
import time, datetime
1✔
15

16
global _cached_slurm_states
17
_cached_slurm_states = dict()
1✔
18

19
def get_resubmission_states(no_resub_failed=False):
1✔
20
    """
21
    Defines what Slurm job failure modes should be resubmitted in the hopes of the job succeeding the next time.
22

23
    Possible values that Slurm returns are::
24

25
        CA or ca or CANCELLED for cancelled jobs will only show currently running jobs in queue unless times are explicitly given
26
        BF BOOT_FAIL   Job terminated due to launch failure
27
        CA CANCELLED Job was explicitly cancelled by the user or system administrator. The job may or may not have been initiated.
28
        CD COMPLETED Job has terminated all processes on all nodes with an exit code of zero.
29
        DL DEADLINE Job terminated on deadline.
30
        F FAILED Job terminated with non-zero exit code or other failure condition.
31
        NF NODE_FAIL Job terminated due to failure of one or more allocated nodes.
32
        OOM OUT_OF_MEMORY Job experienced out of memory error.
33
        PD PENDING Job is awaiting resource allocation.
34
        PR PREEMPTED Job terminated due to preemption.
35
        R RUNNING Job currently has an allocation.
36
        RQ REQUEUED Job was requeued.
37
        RS RESIZING Job is about to change size.
38
        RV REVOKED Sibling was removed from cluster due to other cluster starting the job.
39
        S SUSPENDED Job has an allocation, but execution has been suspended and CPUs have been released for other jobs.
40
        TO TIMEOUT Job terminated upon reaching its time limit.
41

42
    Args:
43
        no_resub_failed: bool. Set to True if you do NOT want to resubmit
44
            jobs with Slurm status 'FAILED' by default. Default is False.
45

46
    Returns:
47
        list. A list of strings outlining the job states that should be resubmitted.
48
    """
49
    ## 'UNSUBMITTED' is default pipeline state for things not yet submitted
50
    ## 'DEP_NOT_SUBD' is set when resubmission can't proceed because a
51
    ## dependency has failed
52
    resub_states = ['UNSUBMITTED', 'DEP_NOT_SUBD', 'BOOT_FAIL', 'DEADLINE', 'NODE_FAIL',
1✔
53
                    'OUT_OF_MEMORY', 'PREEMPTED', 'TIMEOUT', 'CANCELLED']
54
    if not no_resub_failed:
1✔
55
        resub_states.append('FAILED')
1✔
56
    return resub_states
1✔
57

58

59
def get_termination_states():
1✔
60
    """
61
    Defines what Slurm job states that are final and aren't in question about needing resubmission.
62

63
    Possible values that Slurm returns are::
64

65
        CA or ca or CANCELLED for cancelled jobs will only show currently running jobs in queue unless times are explicitly given
66
        BF BOOT_FAIL   Job terminated due to launch failure
67
        CA CANCELLED Job was explicitly cancelled by the user or system administrator. The job may or may not have been initiated.
68
        CD COMPLETED Job has terminated all processes on all nodes with an exit code of zero.
69
        DL DEADLINE Job terminated on deadline.
70
        F FAILED Job terminated with non-zero exit code or other failure condition.
71
        NF NODE_FAIL Job terminated due to failure of one or more allocated nodes.
72
        OOM OUT_OF_MEMORY Job experienced out of memory error.
73
        PD PENDING Job is awaiting resource allocation.
74
        PR PREEMPTED Job terminated due to preemption.
75
        R RUNNING Job currently has an allocation.
76
        RQ REQUEUED Job was requeued.
77
        RS RESIZING Job is about to change size.
78
        RV REVOKED Sibling was removed from cluster due to other cluster starting the job.
79
        S SUSPENDED Job has an allocation, but execution has been suspended and CPUs have been released for other jobs.
80
        TO TIMEOUT Job terminated upon reaching its time limit.
81

82
    Returns:
83
        list. A list of strings outlining the job states that are considered final (without human investigation/intervention)
84
    """
85
    return ['COMPLETED', 'CANCELLED', 'FAILED']
×
86

87
def get_failed_states():
1✔
88
    """ 
89
    Defines what Slurm job states should be considered failed or problematic
90

91
    All possible values that Slurm returns are:
92
        BF BOOT_FAIL Job terminated due to launch failure, typically due to a hardware failure (e.g. unable to boot the node or block and the job can not be requeued).
93
        CA CANCELLED Job was explicitly cancelled by the user or system administrator. The job may or may not have been initiated.
94
        CD COMPLETED Job has terminated all processes on all nodes with an exit code of zero.
95
        CF CONFIGURING Job has been allocated resources, but are waiting for them to become ready for use (e.g. booting).
96
        CG COMPLETING Job is in the process of completing. Some processes on some nodes may still be active.
97
        DL DEADLINE Job terminated on deadline.
98
        F FAILED Job terminated with non-zero exit code or other failure condition.
99
        NF NODE_FAIL Job terminated due to failure of one or more allocated nodes.
100
        OOM OUT_OF_MEMORY Job experienced out of memory error.
101
        PD PENDING Job is awaiting resource allocation.
102
        PR PREEMPTED Job terminated due to preemption.
103
        R RUNNING Job currently has an allocation.
104
        RD RESV_DEL_HOLD Job is being held after requested reservation was deleted.
105
        RF REQUEUE_FED Job is being requeued by a federation.
106
        RH REQUEUE_HOLD Held job is being requeued.
107
        RQ REQUEUED Completing job is being requeued.
108
        RS RESIZING Job is about to change size.
109
        RV REVOKED Sibling was removed from cluster due to other cluster starting the job.
110
        SI SIGNALING Job is being signaled.
111
        SE SPECIAL_EXIT The job was requeued in a special state. This state can be set by users, typically in EpilogSlurmctld, if the job has terminated with a particular exit value.
112
        SO STAGE_OUT Job is staging out files.
113
        ST STOPPED Job has an allocation, but execution has been stopped with SIGSTOP signal. CPUS have been retained by this job.
114
        S SUSPENDED Job has an allocation, but execution has been suspended and CPUs have been released for other jobs.
115
        TO TIMEOUT Job terminated upon reaching its time limit.
116
    
117
    Returns:
118
        list. A list of strings outlining the job states that are considered to be
119
            failed or problematic.
120
    """
121
    return ['BOOT_FAIL', 'CANCELLED', 'DEADLINE', 'FAILED', 'NODE_FAIL',
×
122
            'OUT_OF_MEMORY', 'PREEMPTED', 'REVOKED', 'SUSPENDED', 'TIMEOUT']
123

124

125
def get_non_final_states():
1✔
126
    """
127
    Defines what Slurm job states that are not final and therefore indicate the
128
    job hasn't finished running.
129

130
    Possible values that Slurm returns are:
131

132
        CA or ca or CANCELLED for cancelled jobs will only show currently running jobs in queue unless times are explicitly given
133
        BF BOOT_FAIL   Job terminated due to launch failure
134
        CA CANCELLED Job was explicitly cancelled by the user or system administrator. The job may or may not have been initiated.
135
        CD COMPLETED Job has terminated all processes on all nodes with an exit code of zero.
136
        DL DEADLINE Job terminated on deadline.
137
        F FAILED Job terminated with non-zero exit code or other failure condition.
138
        NF NODE_FAIL Job terminated due to failure of one or more allocated nodes.
139
        OOM OUT_OF_MEMORY Job experienced out of memory error.
140
        PD PENDING Job is awaiting resource allocation.
141
        PR PREEMPTED Job terminated due to preemption.
142
        R RUNNING Job currently has an allocation.
143
        RQ REQUEUED Job was requeued.
144
        RS RESIZING Job is about to change size.
145
        RV REVOKED Sibling was removed from cluster due to other cluster starting the job.
146
        S SUSPENDED Job has an allocation, but execution has been suspended and CPUs have been released for other jobs.
147
        TO TIMEOUT Job terminated upon reaching its time limit.
148

149
    Returns:
150
        list. A list of strings outlining the job states that are considered final (without human investigation/intervention)
151
    """
152
    return ['PENDING', 'RUNNING', 'REQUEUED', 'RESIZING']
×
153

154
def get_mock_slurm_data():
1✔
155
    """
156
    Returns a string of output that mimics what Slurm would return from
157
    sacct -X --parsable2 --delimiter=,
158
       --format=JobID,JobName,Partition,Submit,Eligible,Start,End,Elapsed,State,ExitCode -j <qid_str>
159

160
    Returns
161
    -------
162
    str
163
        Mock Slurm data csv format.
164
    """
NEW
165
    string = 'JobID,JobName,Partition,Submit,Eligible,Start,End,Elapsed,State,ExitCode\n'
×
NEW
166
    string += '49482394,arc-20211102-00107062-a0123456789,realtime,2021-11-02' \
×
167
              + 'T18:31:14,2021-11-02T18:36:33,2021-11-02T18:36:33,2021-11-02T' \
168
              + '18:48:32,00:11:59,COMPLETED,0:0' + '\n'
NEW
169
    string += '49482395,arc-20211102-00107063-a0123456789,realtime,2021-11-02' \
×
170
              + 'T18:31:16,2021-11-02T18:36:33,2021-11-02T18:48:34,2021-11-02T' \
171
              + '18:57:02,00:11:59,COMPLETED,0:0' + '\n'
NEW
172
    string += '49482397,arc-20211102-00107064-a0123456789,realtime,2021-11-02' \
×
173
              + 'T18:31:19,2021-11-02T18:36:33,2021-11-02T18:57:05,2021-11-02T' \
174
              + '19:06:17,00:11:59,COMPLETED,0:0' + '\n'
NEW
175
    string += '49482398,arc-20211102-00107065-a0123456789,realtime,2021-11-02' \
×
176
              + 'T18:31:24,2021-11-02T18:36:33,2021-11-02T19:06:18,2021-11-02T' \
177
              + '19:13:59,00:11:59,COMPLETED,0:0' + '\n'
NEW
178
    string += '49482399,arc-20211102-00107066-a0123456789,realtime,2021-11-02' \
×
179
              + 'T18:31:27,2021-11-02T18:36:33,2021-11-02T19:14:00,2021-11-02T' \
180
              + '19:24:49,00:11:59,COMPLETED,0:0'
NEW
181
    return string
×
182

183

184
def queue_info_from_time_window(start_time=None, end_time=None, user=None, \
1✔
185
                             columns='jobid,jobname,partition,submit,eligible,'+
186
                                     'start,end,elapsed,state,exitcode',
187
                             dry_run_level=0):
188
    """
189
    Queries the NERSC Slurm database using sacct with appropriate flags to get information within a specified time
190
    window of all jobs submitted or executed during that time.
191

192
    Parameters
193
    ----------
194
    start_time : str
195
        String of the form YYYY-mm-ddTHH:MM:SS. Based on the given night and the earliest hour you
196
        want to see queue information about.
197
    end_time : str
198
        String of the form YYYY-mm-ddTHH:MM:SS. Based on the given night and the latest hour you
199
        want to see queue information about.
200
    user : str
201
        The username at NERSC that you want job information about. The default is an the environment name if
202
        if exists, otherwise 'desi'.
203
    columns : str
204
        Comma seperated string of valid sacct column names, in lower case. To be useful for the workflow,
205
        it should have MUST have columns "JOBID" and "STATE". Other columns available that aren't included
206
        in the default list are: jobid,jobname,partition,submit,eligible,start,end,elapsed,state,exitcode.
207
        Other options include: suspended,derivedexitcode,reason,priority,jobname.
208
    dry_run_level : int
209
        If nonzero, this is a simulated run. Default is 0.
210
        0 which runs the code normally.
211
        1 writes all files but doesn't submit any jobs to Slurm.
212
        2 writes tables but doesn't write scripts or submit anything.
213
        3 Doesn't write or submit anything but queries Slurm normally for job status.
214
        4 Doesn't write, submit jobs, or query Slurm.
215
        5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
216

217
    Returns
218
    -------
219
    astropy.table.Table
220
        Table with the columns defined by the input variable 'columns' and information relating
221
        to all jobs submitted by the specified user in the specified time frame.
222
    """
223
    # global queue_info_table
NEW
224
    if dry_run_level > 4:
×
NEW
225
        string = get_mock_slurm_data()
×
UNCOV
226
        cmd_as_list = ['echo', string]
×
NEW
227
    elif dry_run_level > 3:
×
NEW
228
        cmd_as_list = ['echo', 'JobID,JobName,Partition,Submit,Eligible,Start,End,Elapsed,State,ExitCode']
×
229
    else:
230
        if user is None:
×
231
            if 'USER' in os.environ:
×
232
                user = os.environ['USER']
×
233
            else:
234
                user = 'desi'
×
235
        if start_time is None:
×
UNCOV
236
            start_time = '2020-04-26T00:00'
×
UNCOV
237
        if end_time is None:
×
UNCOV
238
            end_time = '2020-05-01T00:00'
×
UNCOV
239
        cmd_as_list = ['sacct', '-X', '--parsable2', '--delimiter=,', \
×
240
                       '-S', start_time, \
241
                       '-E', end_time, \
242
                       '-u', user, \
243
                       f'--format={columns}']
244

245
    table_as_string = subprocess.check_output(cmd_as_list, text=True,
×
246
                                              stderr=subprocess.STDOUT)
UNCOV
247
    queue_info_table = Table.read(table_as_string, format='ascii.csv')
×
248

249
    for col in queue_info_table.colnames:
×
UNCOV
250
        queue_info_table.rename_column(col, col.upper())
×
251

252
    ## Update the cached states of these jobids if we have that info to update
UNCOV
253
    update_queue_state_cache_from_table(queue_info_table)
×
254

UNCOV
255
    return queue_info_table
×
256

257
def queue_info_from_qids(qids, columns='jobid,jobname,partition,submit,'+
1✔
258
                         'eligible,start,end,elapsed,state,exitcode', dry_run_level=0):
259
    """
260
    Queries the NERSC Slurm database using sacct with appropriate flags to get
261
    information about specific jobs based on their jobids.
262

263
    Parameters
264
    ----------
265
    jobids : list or array of ints
266
        Slurm QID's at NERSC that you want to return information about.
267
    columns : str
268
        Comma seperated string of valid sacct column names, in lower case. To be useful for the workflow,
269
        it should have MUST have columns "JOBID" and "STATE". Other columns available that aren't included
270
        in the default list are: jobid,jobname,partition,submit,eligible,start,end,elapsed,state,exitcode.
271
        Other options include: suspended,derivedexitcode,reason,priority,jobname.
272
    dry_run_level : int
273
        If nonzero, this is a simulated run. Default is 0.
274
        0 which runs the code normally.
275
        1 writes all files but doesn't submit any jobs to Slurm.
276
        2 writes tables but doesn't write scripts or submit anything.
277
        3 Doesn't write or submit anything but queries Slurm normally for job status.
278
        4 Doesn't write, submit jobs, or query Slurm.
279
        5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
280

281
    Returns
282
    -------
283
    astropy.table.Table
284
        Table with the columns defined by the input variable 'columns' and information relating
285
        to all jobs submitted by the specified user in the specified time frame.
286
    """
287
    qids = np.atleast_1d(qids).astype(int)
1✔
288
    log = get_logger()
1✔
289

290
    ## If qids is too long, recursively call self and stack tables; otherwise sacct hangs
291
    nmax = 100
1✔
292
    if len(qids) > nmax:
1✔
UNCOV
293
        results = list()
×
UNCOV
294
        for i in range(0, len(qids), nmax):
×
UNCOV
295
            results.append(queue_info_from_qids(qids[i:i+nmax], columns=columns,
×
296
                                                dry_run_level=dry_run_level))
UNCOV
297
        results = vstack(results)
×
UNCOV
298
        return results
×
299
    elif len(qids) == 0:
1✔
UNCOV
300
        return Table(names=columns.upper().split(','))
×
301

302
    ## Turn the queue id's into a list
303
    ## this should work with str or int type also, though not officially supported
304
    qid_str = ','.join(np.atleast_1d(qids).astype(str)).replace(' ','')
1✔
305

306
    cmd_as_list = ['sacct', '-X', '--parsable2', '--delimiter=,',
1✔
307
                   f'--format={columns}', '-j', qid_str]
308
    if dry_run_level > 4:
1✔
309
        log.info("Dry run, would have otherwise queried Slurm with the"
1✔
310
                 +f" following: {' '.join(cmd_as_list)}")
311
        ### Set a random 5% of jobs as TIMEOUT, set seed for reproducibility
312
        # np.random.seed(qids[0])
313
        states = np.array(['COMPLETED'] * len(qids))
1✔
314
        #states[np.random.random(len(qids)) < 0.05] = 'TIMEOUT'
315
        ## Try two different column configurations, otherwise give up trying to simulate
316
        string = 'JobID,JobName,Partition,Submit,Eligible,Start,End,Elapsed,State,ExitCode'
1✔
317
        if columns.lower() == string.lower():
1✔
318
            for jobid, expid, state in zip(qids, 100000+np.arange(len(qids)), states):
1✔
319
                string += f'\n{jobid},arc-20211102-{expid:08d}-a0123456789,realtime,2021-11-02'\
1✔
320
                      +'T18:31:14,2021-11-02T18:36:33,2021-11-02T18:36:33,2021-11-02T'\
321
                      +f'18:48:32,00:11:59,{state},0:0'
322
        elif columns.lower() == 'jobid,state':
1✔
323
            string = 'JobID,State'
1✔
324
            for jobid, state in zip(qids, states):
1✔
325
                string += f'\n{jobid},{state}'
1✔
326
        # create command to run to exercise subprocess -> stdout parsing
327
        cmd_as_list = ['echo', string]
1✔
328
    elif dry_run_level > 3:
1✔
329
        cmd_as_list = ['echo', columns.lower()]
1✔
330
    else:
331
        log.info(f"Querying Slurm with the following: {' '.join(cmd_as_list)}")
×
332

333
    #- sacct sometimes fails; try several times before giving up
334
    max_attempts = 3
1✔
335
    for attempt in range(max_attempts):
1✔
336
        try:
1✔
337
            table_as_string = subprocess.check_output(cmd_as_list, text=True,
1✔
338
                                          stderr=subprocess.STDOUT)
339
            break
1✔
UNCOV
340
        except subprocess.CalledProcessError as err:
×
UNCOV
341
            log.error(f'{qid_str} job query via sacct failure at {datetime.datetime.now()}')
×
UNCOV
342
            log.error(f'{qid_str} {cmd_as_list}')
×
UNCOV
343
            log.error(f'{qid_str} {err.output=}')
×
344
    else:  #- for/else happens if loop doesn't succeed
UNCOV
345
        msg = f'{qid_str} job query via sacct failed {max_attempts} times; exiting'
×
UNCOV
346
        log.critical(msg)
×
UNCOV
347
        raise RuntimeError(msg)
×
348

349
    queue_info_table = Table.read(table_as_string, format='ascii.csv')
1✔
350
    for col in queue_info_table.colnames:
1✔
351
        queue_info_table.rename_column(col, col.upper())
1✔
352

353
    ## Update the cached states of these jobids if we have that info to update
354
    update_queue_state_cache_from_table(queue_info_table)
1✔
355

356
    return queue_info_table
1✔
357

358
def get_queue_states_from_qids(qids, dry_run_level=0, use_cache=False):
1✔
359
    """
360
    Queries the NERSC Slurm database using sacct with appropriate flags to get
361
    information on the job STATE. If use_cache is set and all qids have cached
362
    values from a previous query, those cached states will be returned instead.
363

364
    Parameters
365
    ----------
366
    jobids : list or array of ints
367
        Slurm QID's at NERSC that you want to return information about.
368
    dry_run_level : int
369
        If nonzero, this is a simulated run. Default is 0.
370
        0 which runs the code normally.
371
        1 writes all files but doesn't submit any jobs to Slurm.
372
        2 writes tables but doesn't write scripts or submit anything.
373
        3 Doesn't write or submit anything but queries Slurm normally for job status.
374
        4 Doesn't write, submit jobs, or query Slurm.
375
        5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
376
    use_cache : bool
377
        If True the code first looks for a cached status
378
        for the qid. If unavailable, then it queries Slurm. Default is False.
379

380
    Returns
381
    -------
382
    Dict
383
        Dictionary with the keys as jobids and values as the slurm state of the job.
384
    """
385
    def_qid = get_default_qid()
1✔
386
    global _cached_slurm_states
387
    qids = np.atleast_1d(qids).astype(int)
1✔
388
    log = get_logger()
1✔
389

390
    ## Only use cached values if all are cahced, since the time is dominated
391
    ## by the call itself rather than the number of jobids, so we may as well
392
    ## get updated information from all of them if we're submitting a query anyway
393
    outdict = dict()
1✔
394
    if use_cache and np.all(np.isin(qids, list(_cached_slurm_states.keys()))):
1✔
395
        log.info(f"All Slurm {qids=} are cached. Using cached values.")
1✔
396
        for qid in qids:
1✔
397
            outdict[qid] = _cached_slurm_states[qid]
1✔
398
    else:
399
        outtable = queue_info_from_qids(qids, columns='jobid,state',
1✔
400
                                        dry_run_level=dry_run_level)
401
        for row in outtable:
1✔
402
            if int(row['JOBID']) != def_qid:
1✔
403
                outdict[int(row['JOBID'])] = row['STATE']
1✔
404
    return outdict
1✔
405

406
def update_queue_state_cache_from_table(queue_info_table):
1✔
407
    """
408
    Takes a Slurm jobid and updates the queue id cache with the supplied state
409

410
    Parameters
411
    ----------
412
    queue_info_table : astropy.table.Table
413
        Table returned by an sacct query. Should contain at least JOBID and STATE
414
        columns
415

416
    Returns
417
    -------
418
    Nothing
419

420
    """
421
    ## Update the cached states of these jobids if we have that info to update
422
    if 'JOBID' in queue_info_table.colnames and 'STATE' in queue_info_table.colnames:
1✔
423
        for row in queue_info_table:
1✔
424
            update_queue_state_cache(qid=row['JOBID'], state=row['STATE'])
1✔
425

426
def update_queue_state_cache(qid, state):
1✔
427
    """
428
    Takes a Slurm jobid and updates the queue id cache with the supplied state
429

430
    Parameters
431
    ----------
432
    qid : int
433
        Slurm QID at NERSC
434
    state: str
435
        The current job status of the Slurm jobid
436

437
    Returns
438
    -------
439
    Nothing
440

441
    """
442
    global _cached_slurm_states
443
    if int(qid) != get_default_qid():
1✔
444
        _cached_slurm_states[int(qid)] = state
1✔
445

446
def clear_queue_state_cache():
1✔
447
    """
448
    Remove all entries from the queue state cache
449
    """
450
    global _cached_slurm_states
451
    _cached_slurm_states.clear()
1✔
452

453

454
def update_from_queue(ptable, qtable=None, dry_run_level=0, ignore_scriptnames=False,
1✔
455
                      check_complete_jobs=False):
456
    """
457
    Given an input prcessing table (ptable) and query table from the Slurm queue (qtable) it cross matches the
458
    Slurm job ID's and updates the 'state' in the table using the current state in the Slurm scheduler system.
459

460
    Parameters
461
    ----------
462
    ptable : astropy.table.Table
463
        Processing table that contains the jobs you want updated with the most recent queue table. Must
464
        have at least columnns 'LATEST_QID' and 'STATUS'.
465
    qtable : astropy.table.Table
466
        Table with the columns defined by the input variable 'columns' and information relating
467
        to all jobs submitted by the specified user in the specified time frame.
468
    ignore_scriptnames : bool
469
        Default is False. Set to true if you do not
470
        want to check whether the scriptname matches the jobname
471
        return by the slurm scheduler.
472
    check_complete_jobs: bool
473
        Default is False. Set to true if you want to
474
        also check QID's that currently have a STATUS "COMPLETED".
475
        in the ptable.
476
    dry_run_level : int
477
        If nonzero, this is a simulated run. Default is 0.
478
        0 which runs the code normally.
479
        1 writes all files but doesn't submit any jobs to Slurm.
480
        2 writes tables but doesn't write scripts or submit anything.
481
        3 Doesn't write or submit anything but queries Slurm normally for job status.
482
        4 Doesn't write, submit jobs, or query Slurm.
483
        5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
484

485
    Returns
486
    -------
487
    ptab : astropy.table.Table
488
        A opy of the same processing table as the input except that the "STATUS" column in ptable for all jobs is
489
        updated based on the 'STATE' in the qtable (as matched by "LATEST_QID" in the ptable
490
        and "JOBID" in the qtable).
491
    """
492
    log = get_logger()
1✔
493
    ptab = ptable.copy()
1✔
494
    if qtable is None:
1✔
495
        log.info("qtable not provided, querying Slurm using ptab's LATEST_QID set")
1✔
496
        ## Avoid null valued QID's (set to 2)
497
        sel = ptab['LATEST_QID'] > 2
1✔
498
        ## Only submit incomplete jobs unless explicitly told to check them
499
        ## completed jobs shouldn't change status
500
        if not check_complete_jobs:
1✔
501
            sel &= (ptab['STATUS'] != 'COMPLETED')
1✔
502
        log.info(f"Querying Slurm for {np.sum(sel)} QIDs from table of length {len(ptab)}.")
1✔
503
        qids = np.array(ptab['LATEST_QID'][sel])
1✔
504
        ## If you provide empty jobids Slurm gives you the three most recent jobs,
505
        ## which we don't want here
506
        if len(qids) == 0:
1✔
UNCOV
507
            log.info(f"No QIDs left to query. Returning the original table.")
×
UNCOV
508
            return ptab
×
509
        qtable = queue_info_from_qids(qids, dry_run_level=dry_run_level)
1✔
510

511
    log.info(f"Slurm returned information on {len(qtable)} jobs out of "
1✔
512
             +f"{len(ptab)} jobs in the ptab. Updating those now.")
513

514
    check_scriptname = ('JOBNAME' in qtable.colnames
1✔
515
                        and 'SCRIPTNAME' in ptab.colnames
516
                        and not ignore_scriptnames)
517
    if check_scriptname:
1✔
518
        log.info("Will be verifying that the file names are consistent")
1✔
519

520
    for row in qtable:
1✔
UNCOV
521
        if int(row['JOBID']) == get_default_qid():
×
UNCOV
522
            continue
×
UNCOV
523
        match = (int(row['JOBID']) == ptab['LATEST_QID'])
×
UNCOV
524
        if np.any(match):
×
UNCOV
525
            ind = np.where(match)[0][0]
×
UNCOV
526
            if check_scriptname and ptab['SCRIPTNAME'][ind] not in row['JOBNAME']:
×
UNCOV
527
                log.warning(f"For job with expids:{ptab['EXPID'][ind]}"
×
528
                            + f" the scriptname is {ptab['SCRIPTNAME'][ind]}"
529
                            + f" but the jobname in the queue was "
530
                            + f"{row['JOBNAME']}.")
531
            state = str(row['STATE']).split(' ')[0]
×
532
            ## Since dry run 1 and 2 save proc tables, don't alter the
533
            ## states for these when simulating
UNCOV
534
            ptab['STATUS'][ind] = state
×
535

536
    return ptab
1✔
537

538
def any_jobs_not_complete(statuses, termination_states=None):
1✔
539
    """
540
    Returns True if any of the job statuses in the input column of the processing table, statuses, are not complete
541
    (as based on the list of acceptable final states, termination_states, given as an argument. These should be states
542
    that are viewed as final, as opposed to job states that require resubmission.
543

544
    Parameters
545
    ----------
546
    statuses : Table.Column or list or np.array
547
        The statuses in the processing table "STATUS". Each element should
548
        be a string.
549
    termination_states : list or np.array
550
        Each element should be a string signifying a state that is returned
551
        by the Slurm scheduler that should be deemed terminal state.
552

553
    Returns
554
    -------
555
    bool
556
        True if any of the statuses of the jobs given in statuses are NOT a member of the termination states.
557
        Otherwise returns False.
558
    """
UNCOV
559
    if termination_states is None:
×
UNCOV
560
        termination_states = get_termination_states()
×
UNCOV
561
    return np.any([status not in termination_states for status in statuses])
×
562

563
def any_jobs_failed(statuses, failed_states=None):
1✔
564
    """
565
    Returns True if any of the job statuses in the input column of the
566
    processing table, statuses, are not complete (as based on the list of
567
    acceptable final states, termination_states, given as an argument. These
568
    should be states that are viewed as final, as opposed to job states
569
    that require resubmission.
570

571
    Parameters
572
    ----------
573
    statuses : Table.Column or list or np.array
574
        The statuses in the
575
        processing table "STATUS". Each element should be a string.
576
    failed_states : list or np.array
577
        Each element should be a string
578
        signifying a state that is returned by the Slurm scheduler that
579
        should be consider failing or problematic.
580

581
    Returns
582
    -------
583
    bool
584
        True if any of the statuses of the jobs given in statuses are
585
        a member of the failed_states.
586
    """
587
    if failed_states is None:
×
588
        failed_states = get_failed_states()
×
UNCOV
589
    return np.any([status in failed_states for status in statuses])
×
590

591
def get_jobs_in_queue(user=None, include_scron=False, dry_run_level=0):
1✔
592
    """
593
    Queries the NERSC Slurm database using sacct with appropriate flags to get
594
    information about specific jobs based on their jobids.
595

596
    Parameters
597
    ----------
598
    user : str
599
        NERSC user to query the jobs for
600
    include_scron : bool
601
        True if you want to include scron entries in the returned table.
602
        Default is False.
603
    dry_run_level : int
604
        If nonzero, this is a simulated run. Default is 0.
605
        0 which runs the code normally.
606
        1 writes all files but doesn't submit any jobs to Slurm.
607
        2 writes tables but doesn't write scripts or submit anything.
608
        3 Doesn't write or submit anything but queries Slurm normally for job status.
609
        4 Doesn't write, submit jobs, or query Slurm.
610
        5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
611

612
    Returns
613
    -------
614
    astropy.table.Table
615
        Table with the columns JOBID, PARTITION, RESERVATION, NAME, USER, ST, TIME, NODES,
616
        NODELIST(REASON) for the specified user.
617
    """
618
    log = get_logger()
×
619
    if user is None:
×
620
        if 'USER' in os.environ:
×
621
            user = os.environ['USER']
×
622
        else:
UNCOV
623
            user = 'desi'
×
624

625
    cmd = f'squeue -u {user} -o "%i,%P,%v,%j,%u,%t,%M,%D,%R"'
×
626
    cmd_as_list = cmd.split()
×
627

NEW
628
    header = 'JOBID,PARTITION,RESERVATION,NAME,USER,ST,TIME,NODES,NODELIST(REASON)'
×
NEW
629
    if dry_run_level > 4:
×
UNCOV
630
        log.info("Dry run, would have otherwise queried Slurm with the"
×
631
                 +f" following: {' '.join(cmd_as_list)}")
NEW
632
        string = header
×
633
        string += f"27650097,cron,(null),scron_ar,{user},PD,0:00,1,(BeginTime)"
×
UNCOV
634
        string += f"27650100,cron,(null),scron_nh,{user},PD,0:00,1,(BeginTime)"
×
UNCOV
635
        string += f"27650098,cron,(null),scron_up,{user},PD,0:00,1,(BeginTime)"
×
UNCOV
636
        string += f"29078887,gpu_ss11,(null),tilenight-20230413-24315,{user},PD,0:00,1,(Priority)"
×
637
        string += f"29078892,gpu_ss11,(null),tilenight-20230413-21158,{user},PD,0:00,1,(Priority)"
×
UNCOV
638
        string += f"29079325,gpu_ss11,(null),tilenight-20240309-24526,{user},PD,0:00,1,(Dependency)"
×
639
        string += f"29079322,gpu_ss11,(null),ztile-22959-thru20240309,{user},PD,0:00,1,(Dependency)"
×
640
        string += f"29078883,gpu_ss11,(null),tilenight-20230413-21187,{user},R,10:18,1,nid003960"
×
641
        string += f"29079242,regular_milan_ss11,(null),arc-20240309-00229483-a0123456789,{user},PD,0:00,3,(Priority)"
×
642
        string += f"29079246,regular_milan_ss11,(null),arc-20240309-00229484-a0123456789,{user},PD,0:00,3,(Priority)"
×
643

644
        # create command to run to exercise subprocess -> stdout parsing
UNCOV
645
        cmd = 'echo ' + string
×
646
        cmd_as_list = ['echo', string]
×
NEW
647
    elif dry_run_level > 3:
×
NEW
648
        cmd = 'echo ' + header
×
NEW
649
        cmd_as_list = ['echo', header]
×
650
    else:
UNCOV
651
        log.info(f"Querying jobs in queue with: {' '.join(cmd_as_list)}")
×
652

653
    #- sacct sometimes fails; try several times before giving up
654
    max_attempts = 3
×
655
    for attempt in range(max_attempts):
×
UNCOV
656
        try:
×
657
            table_as_string = subprocess.check_output(cmd_as_list, text=True,
×
658
                                          stderr=subprocess.STDOUT)
UNCOV
659
            break
×
UNCOV
660
        except subprocess.CalledProcessError as err:
×
661
            log.error(f'{cmd} job query failure at {datetime.datetime.now()}')
×
662
            log.error(f'{cmd_as_list}')
×
UNCOV
663
            log.error(f'{err.output=}')
×
664
    else:  #- for/else happens if loop doesn't succeed
665
        msg = f'{cmd} query failed {max_attempts} times; exiting'
×
UNCOV
666
        log.critical(msg)
×
667
        raise RuntimeError(msg)
×
668

669
    ## remove extra quotes that astropy table does't like
UNCOV
670
    table_as_string = table_as_string.replace('"','')
×
671

672
    ## remove parenthesis are also not very desirable
UNCOV
673
    table_as_string = table_as_string.replace('(', '').replace(')', '')
×
674

675

676
    ## remove node list with hyphen or comma otherwise it will break table reader
UNCOV
677
    table_as_string = re.sub(r"nid\[[0-9,-]*\]", "multiple nodes", table_as_string)
×
678

UNCOV
679
    try:
×
UNCOV
680
        queue_info_table = Table.read(table_as_string, format='ascii.csv')
×
UNCOV
681
    except:
×
UNCOV
682
        log.info("Table retured by squeue couldn't be parsed. The string was:")
×
UNCOV
683
        print(table_as_string)
×
UNCOV
684
        raise
×
685
    
UNCOV
686
    for col in queue_info_table.colnames:
×
UNCOV
687
        queue_info_table.rename_column(col, col.upper())
×
688

689
    ## If the table is empty, return it immediately, otherwise perform
690
    ## sanity check and cuts
UNCOV
691
    if len(queue_info_table) == 0:
×
UNCOV
692
        return queue_info_table
×
693

UNCOV
694
    if np.any(queue_info_table['USER']!=user):
×
UNCOV
695
        msg = f"Warning {np.sum(queue_info_table['USER']!=user)} " \
×
696
              + f"jobs returned were not {user=}\n" \
697
              + f"{queue_info_table['USER'][queue_info_table['USER']!=user]}"
UNCOV
698
        log.critical(msg)
×
UNCOV
699
        raise ValueError(msg)
×
700

UNCOV
701
    if not include_scron:
×
UNCOV
702
        queue_info_table = queue_info_table[queue_info_table['PARTITION'] != 'cron']
×
703

UNCOV
704
    return queue_info_table
×
705

706

707
def check_queue_count(user=None, include_scron=False, dry_run_level=0):
1✔
708
    """
709
    Queries the NERSC Slurm database using sacct with appropriate flags to get
710
    information about specific jobs based on their jobids.
711

712
    Parameters
713
    ----------
714
    user : str
715
        NERSC user to query the jobs for
716
    include_scron : bool
717
        True if you want to include scron entries in the returned table.
718
        Default is False.
719
    dry_run_level : int
720
        If nonzero, this is a simulated run. Default is 0.
721
        0 which runs the code normally.
722
        1 writes all files but doesn't submit any jobs to Slurm.
723
        2 writes tables but doesn't write scripts or submit anything.
724
        3 Doesn't write or submit anything but queries Slurm normally for job status.
725
        4 Doesn't write, submit jobs, or query Slurm.
726
        5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
727

728
    Returns
729
    -------
730
    int
731
        The number of jobs for that user in the queue (including or excluding
732
        scron entries depending on include_scron).
733
    """
UNCOV
734
    return len(get_jobs_in_queue(user=user, include_scron=include_scron,
×
735
                                 dry_run_level=dry_run_level))
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc