• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

desihub / desispec / 10625440731

30 Aug 2024 01:51AM UTC coverage: 30.196%. First build
10625440731

push

github

akremin
don't try to submit job with failed dependency

8 of 21 new or added lines in 2 files covered. (38.1%)

14635 of 48466 relevant lines covered (30.2%)

0.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

48.76
/py/desispec/workflow/queue.py
1
"""
2
desispec.workflow.queue
3
=======================
4

5
"""
6
import os
1✔
7
import re
1✔
8
import numpy as np
1✔
9
from astropy.table import Table, vstack
1✔
10
import subprocess
1✔
11

12
from desispec.workflow.proctable import get_default_qid
1✔
13
from desiutil.log import get_logger
1✔
14
import time, datetime
1✔
15

16
global _cached_slurm_states
17
_cached_slurm_states = dict()
1✔
18

19
def get_resubmission_states(no_resub_failed=False):
1✔
20
    """
21
    Defines what Slurm job failure modes should be resubmitted in the hopes of the job succeeding the next time.
22

23
    Possible values that Slurm returns are::
24

25
        CA or ca or CANCELLED for cancelled jobs will only show currently running jobs in queue unless times are explicitly given
26
        BF BOOT_FAIL   Job terminated due to launch failure
27
        CA CANCELLED Job was explicitly cancelled by the user or system administrator. The job may or may not have been initiated.
28
        CD COMPLETED Job has terminated all processes on all nodes with an exit code of zero.
29
        DL DEADLINE Job terminated on deadline.
30
        F FAILED Job terminated with non-zero exit code or other failure condition.
31
        NF NODE_FAIL Job terminated due to failure of one or more allocated nodes.
32
        OOM OUT_OF_MEMORY Job experienced out of memory error.
33
        PD PENDING Job is awaiting resource allocation.
34
        PR PREEMPTED Job terminated due to preemption.
35
        R RUNNING Job currently has an allocation.
36
        RQ REQUEUED Job was requeued.
37
        RS RESIZING Job is about to change size.
38
        RV REVOKED Sibling was removed from cluster due to other cluster starting the job.
39
        S SUSPENDED Job has an allocation, but execution has been suspended and CPUs have been released for other jobs.
40
        TO TIMEOUT Job terminated upon reaching its time limit.
41

42
    Args:
43
        no_resub_failed: bool. Set to True if you do NOT want to resubmit
44
            jobs with Slurm status 'FAILED' by default. Default is False.
45

46
    Returns:
47
        list. A list of strings outlining the job states that should be resubmitted.
48
    """
49
    ## 'UNSUBMITTED' is default pipeline state for things not yet submitted
50
    ## 'DEP_NOT_SUBD' is set when resubmission can't proceed because a
51
    ## dependency has failed
52
    resub_states = ['UNSUBMITTED', 'DEP_NOT_SUBD', 'BOOT_FAIL', 'DEADLINE', 'NODE_FAIL',
1✔
53
                    'OUT_OF_MEMORY', 'PREEMPTED', 'TIMEOUT', 'CANCELLED']
54
    if not no_resub_failed:
1✔
55
        resub_states.append('FAILED')
1✔
56
    return resub_states
1✔
57

58

59
def get_termination_states():
1✔
60
    """
61
    Defines what Slurm job states that are final and aren't in question about needing resubmission.
62

63
    Possible values that Slurm returns are::
64

65
        CA or ca or CANCELLED for cancelled jobs will only show currently running jobs in queue unless times are explicitly given
66
        BF BOOT_FAIL   Job terminated due to launch failure
67
        CA CANCELLED Job was explicitly cancelled by the user or system administrator. The job may or may not have been initiated.
68
        CD COMPLETED Job has terminated all processes on all nodes with an exit code of zero.
69
        DL DEADLINE Job terminated on deadline.
70
        F FAILED Job terminated with non-zero exit code or other failure condition.
71
        NF NODE_FAIL Job terminated due to failure of one or more allocated nodes.
72
        OOM OUT_OF_MEMORY Job experienced out of memory error.
73
        PD PENDING Job is awaiting resource allocation.
74
        PR PREEMPTED Job terminated due to preemption.
75
        R RUNNING Job currently has an allocation.
76
        RQ REQUEUED Job was requeued.
77
        RS RESIZING Job is about to change size.
78
        RV REVOKED Sibling was removed from cluster due to other cluster starting the job.
79
        S SUSPENDED Job has an allocation, but execution has been suspended and CPUs have been released for other jobs.
80
        TO TIMEOUT Job terminated upon reaching its time limit.
81

82
    Returns:
83
        list. A list of strings outlining the job states that are considered final (without human investigation/intervention)
84
    """
85
    return ['COMPLETED', 'CANCELLED', 'FAILED']
×
86

87
def get_failed_states():
1✔
88
    """ 
89
    Defines what Slurm job states should be considered failed or problematic
90

91
    All possible values that Slurm returns are:
92
        BF BOOT_FAIL Job terminated due to launch failure, typically due to a hardware failure (e.g. unable to boot the node or block and the job can not be requeued).
93
        CA CANCELLED Job was explicitly cancelled by the user or system administrator. The job may or may not have been initiated.
94
        CD COMPLETED Job has terminated all processes on all nodes with an exit code of zero.
95
        CF CONFIGURING Job has been allocated resources, but are waiting for them to become ready for use (e.g. booting).
96
        CG COMPLETING Job is in the process of completing. Some processes on some nodes may still be active.
97
        DL DEADLINE Job terminated on deadline.
98
        F FAILED Job terminated with non-zero exit code or other failure condition.
99
        NF NODE_FAIL Job terminated due to failure of one or more allocated nodes.
100
        OOM OUT_OF_MEMORY Job experienced out of memory error.
101
        PD PENDING Job is awaiting resource allocation.
102
        PR PREEMPTED Job terminated due to preemption.
103
        R RUNNING Job currently has an allocation.
104
        RD RESV_DEL_HOLD Job is being held after requested reservation was deleted.
105
        RF REQUEUE_FED Job is being requeued by a federation.
106
        RH REQUEUE_HOLD Held job is being requeued.
107
        RQ REQUEUED Completing job is being requeued.
108
        RS RESIZING Job is about to change size.
109
        RV REVOKED Sibling was removed from cluster due to other cluster starting the job.
110
        SI SIGNALING Job is being signaled.
111
        SE SPECIAL_EXIT The job was requeued in a special state. This state can be set by users, typically in EpilogSlurmctld, if the job has terminated with a particular exit value.
112
        SO STAGE_OUT Job is staging out files.
113
        ST STOPPED Job has an allocation, but execution has been stopped with SIGSTOP signal. CPUS have been retained by this job.
114
        S SUSPENDED Job has an allocation, but execution has been suspended and CPUs have been released for other jobs.
115
        TO TIMEOUT Job terminated upon reaching its time limit.
116
    
117
    Returns:
118
        list. A list of strings outlining the job states that are considered to be
119
            failed or problematic.
120
    """
121
    return ['BOOT_FAIL', 'CANCELLED', 'DEADLINE', 'FAILED', 'NODE_FAIL',
×
122
            'OUT_OF_MEMORY', 'PREEMPTED', 'REVOKED', 'SUSPENDED', 'TIMEOUT']
123

124

125
def get_non_final_states():
1✔
126
    """
127
    Defines what Slurm job states that are not final and therefore indicate the
128
    job hasn't finished running.
129

130
    Possible values that Slurm returns are:
131

132
        CA or ca or CANCELLED for cancelled jobs will only show currently running jobs in queue unless times are explicitly given
133
        BF BOOT_FAIL   Job terminated due to launch failure
134
        CA CANCELLED Job was explicitly cancelled by the user or system administrator. The job may or may not have been initiated.
135
        CD COMPLETED Job has terminated all processes on all nodes with an exit code of zero.
136
        DL DEADLINE Job terminated on deadline.
137
        F FAILED Job terminated with non-zero exit code or other failure condition.
138
        NF NODE_FAIL Job terminated due to failure of one or more allocated nodes.
139
        OOM OUT_OF_MEMORY Job experienced out of memory error.
140
        PD PENDING Job is awaiting resource allocation.
141
        PR PREEMPTED Job terminated due to preemption.
142
        R RUNNING Job currently has an allocation.
143
        RQ REQUEUED Job was requeued.
144
        RS RESIZING Job is about to change size.
145
        RV REVOKED Sibling was removed from cluster due to other cluster starting the job.
146
        S SUSPENDED Job has an allocation, but execution has been suspended and CPUs have been released for other jobs.
147
        TO TIMEOUT Job terminated upon reaching its time limit.
148

149
    Returns:
150
        list. A list of strings outlining the job states that are considered final (without human investigation/intervention)
151
    """
NEW
152
    return ['PENDING', 'RUNNING', 'REQUEUED', 'RESIZING']
×
153

154
def queue_info_from_time_window(start_time=None, end_time=None, user=None, \
1✔
155
                             columns='jobid,jobname,partition,submit,eligible,'+
156
                                     'start,end,elapsed,state,exitcode',
157
                             dry_run=0):
158
    """
159
    Queries the NERSC Slurm database using sacct with appropriate flags to get information within a specified time
160
    window of all jobs submitted or executed during that time.
161

162
    Parameters
163
    ----------
164
    start_time : str
165
        String of the form YYYY-mm-ddTHH:MM:SS. Based on the given night and the earliest hour you
166
        want to see queue information about.
167
    end_time : str
168
        String of the form YYYY-mm-ddTHH:MM:SS. Based on the given night and the latest hour you
169
        want to see queue information about.
170
    user : str
171
        The username at NERSC that you want job information about. The default is an the environment name if
172
        if exists, otherwise 'desi'.
173
    columns : str
174
        Comma seperated string of valid sacct column names, in lower case. To be useful for the workflow,
175
        it should have MUST have columns "JOBID" and "STATE". Other columns available that aren't included
176
        in the default list are: jobid,jobname,partition,submit,eligible,start,end,elapsed,state,exitcode.
177
        Other options include: suspended,derivedexitcode,reason,priority,jobname.
178
    dry_run : int
179
        Whether this is a simulated run or real run. If nonzero, it is a simulation and it returns a default
180
        table that doesn't query the Slurm scheduler.
181

182
    Returns
183
    -------
184
    Table
185
        Table with the columns defined by the input variable 'columns' and information relating
186
        to all jobs submitted by the specified user in the specified time frame.
187
    """
188
    # global queue_info_table
189
    if dry_run:
×
190
        string = 'JobID,JobName,Partition,Submit,Eligible,Start,End,State,ExitCode\n'
×
191
        string += '49482394,arc-20211102-00107062-a0123456789,realtime,2021-11-02'\
×
192
                  +'T18:31:14,2021-11-02T18:36:33,2021-11-02T18:36:33,2021-11-02T'\
193
                  +'18:48:32,COMPLETED,0:0' + '\n'
194
        string += '49482395,arc-20211102-00107063-a0123456789,realtime,2021-11-02'\
×
195
                  +'T18:31:16,2021-11-02T18:36:33,2021-11-02T18:48:34,2021-11-02T'\
196
                  +'18:57:02,COMPLETED,0:0' + '\n'
197
        string += '49482397,arc-20211102-00107064-a0123456789,realtime,2021-11-02'\
×
198
                  +'T18:31:19,2021-11-02T18:36:33,2021-11-02T18:57:05,2021-11-02T'\
199
                  +'19:06:17,COMPLETED,0:0' + '\n'
200
        string += '49482398,arc-20211102-00107065-a0123456789,realtime,2021-11-02'\
×
201
                  +'T18:31:24,2021-11-02T18:36:33,2021-11-02T19:06:18,2021-11-02T'\
202
                  +'19:13:59,COMPLETED,0:0' + '\n'
203
        string += '49482399,arc-20211102-00107066-a0123456789,realtime,2021-11-02'\
×
204
                  +'T18:31:27,2021-11-02T18:36:33,2021-11-02T19:14:00,2021-11-02T'\
205
                  +'19:24:49,COMPLETED,0:0'
206
        cmd_as_list = ['echo', string]
×
207
    else:
208
        if user is None:
×
209
            if 'USER' in os.environ:
×
210
                user = os.environ['USER']
×
211
            else:
212
                user = 'desi'
×
213
        if start_time is None:
×
214
            start_time = '2020-04-26T00:00'
×
215
        if end_time is None:
×
216
            end_time = '2020-05-01T00:00'
×
217
        cmd_as_list = ['sacct', '-X', '--parsable2', '--delimiter=,', \
×
218
                       '-S', start_time, \
219
                       '-E', end_time, \
220
                       '-u', user, \
221
                       f'--format={columns}']
222

223
    table_as_string = subprocess.check_output(cmd_as_list, text=True,
×
224
                                              stderr=subprocess.STDOUT)
225
    queue_info_table = Table.read(table_as_string, format='ascii.csv')
×
226

227
    for col in queue_info_table.colnames:
×
228
        queue_info_table.rename_column(col, col.upper())
×
229

230
    ## Update the cached states of these jobids if we have that info to update
231
    update_queue_state_cache_from_table(queue_info_table)
×
232

233
    return queue_info_table
×
234

235
def queue_info_from_qids(qids, columns='jobid,jobname,partition,submit,'+
1✔
236
                         'eligible,start,end,elapsed,state,exitcode', dry_run=0):
237
    """
238
    Queries the NERSC Slurm database using sacct with appropriate flags to get
239
    information about specific jobs based on their jobids.
240

241
    Parameters
242
    ----------
243
    jobids : list or array of ints
244
        Slurm QID's at NERSC that you want to return information about.
245
    columns : str
246
        Comma seperated string of valid sacct column names, in lower case. To be useful for the workflow,
247
        it should have MUST have columns "JOBID" and "STATE". Other columns available that aren't included
248
        in the default list are: jobid,jobname,partition,submit,eligible,start,end,elapsed,state,exitcode.
249
        Other options include: suspended,derivedexitcode,reason,priority,jobname.
250
    dry_run : int
251
        Whether this is a simulated run or real run. If nonzero, it is a simulation and it returns a default
252
        table that doesn't query the Slurm scheduler.
253

254
    Returns
255
    -------
256
    Table
257
        Table with the columns defined by the input variable 'columns' and information relating
258
        to all jobs submitted by the specified user in the specified time frame.
259
    """
260
    qids = np.atleast_1d(qids).astype(int)
1✔
261
    log = get_logger()
1✔
262

263
    ## If qids is too long, recursively call self and stack tables; otherwise sacct hangs
264
    nmax = 100
1✔
265
    if len(qids) > nmax:
1✔
266
        results = list()
×
267
        for i in range(0, len(qids), nmax):
×
268
            results.append(queue_info_from_qids(qids[i:i+nmax], columns=columns, dry_run=dry_run))
×
269
        results = vstack(results)
×
270
        return results
×
271

272
    ## Turn the queue id's into a list
273
    ## this should work with str or int type also, though not officially supported
274
    qid_str = ','.join(np.atleast_1d(qids).astype(str)).replace(' ','')
1✔
275

276
    cmd_as_list = ['sacct', '-X', '--parsable2', '--delimiter=,',
1✔
277
                   f'--format={columns}', '-j', qid_str]
278
    if dry_run:
1✔
279
        log.info("Dry run, would have otherwise queried Slurm with the"
1✔
280
                 +f" following: {' '.join(cmd_as_list)}")
281
        ### Set a random 5% of jobs as TIMEOUT, set seed for reproducibility
282
        # np.random.seed(qids[0])
283
        states = np.array(['COMPLETED'] * len(qids))
1✔
284
        #states[np.random.random(len(qids)) < 0.05] = 'TIMEOUT'
285
        ## Try two different column configurations, otherwise give up trying to simulate
286
        string = 'JobID,JobName,Partition,Submit,Eligible,Start,End,Elapsed,State,ExitCode'
1✔
287
        if columns.lower() == string.lower():
1✔
288
            for jobid, expid, state in zip(qids, 100000+np.arange(len(qids)), states):
1✔
289
                string += f'\n{jobid},arc-20211102-{expid:08d}-a0123456789,realtime,2021-11-02'\
1✔
290
                      +'T18:31:14,2021-11-02T18:36:33,2021-11-02T18:36:33,2021-11-02T'\
291
                      +f'18:48:32,00:11:59,{state},0:0'
292
        elif columns.lower() == 'jobid,state':
1✔
293
            string = 'JobID,State'
1✔
294
            for jobid, state in zip(qids, states):
1✔
295
                string += f'\n{jobid},{state}'
1✔
296
        # create command to run to exercise subprocess -> stdout parsing
297
        cmd_as_list = ['echo', string]
1✔
298
    else:
299
        log.info(f"Querying Slurm with the following: {' '.join(cmd_as_list)}")
×
300

301
    #- sacct sometimes fails; try several times before giving up
302
    max_attempts = 3
1✔
303
    for attempt in range(max_attempts):
1✔
304
        try:
1✔
305
            table_as_string = subprocess.check_output(cmd_as_list, text=True,
1✔
306
                                          stderr=subprocess.STDOUT)
307
            break
1✔
308
        except subprocess.CalledProcessError as err:
×
309
            log.error(f'{qid_str} job query via sacct failure at {datetime.datetime.now()}')
×
310
            log.error(f'{qid_str} {cmd_as_list}')
×
311
            log.error(f'{qid_str} {err.output=}')
×
312
    else:  #- for/else happens if loop doesn't succeed
313
        msg = f'{qid_str} job query via sacct failed {max_attempts} times; exiting'
×
314
        log.critical(msg)
×
315
        raise RuntimeError(msg)
×
316

317
    queue_info_table = Table.read(table_as_string, format='ascii.csv')
1✔
318
    for col in queue_info_table.colnames:
1✔
319
        queue_info_table.rename_column(col, col.upper())
1✔
320

321
    ## Update the cached states of these jobids if we have that info to update
322
    update_queue_state_cache_from_table(queue_info_table)
1✔
323

324
    return queue_info_table
1✔
325

326
def get_queue_states_from_qids(qids, dry_run=0, use_cache=False):
1✔
327
    """
328
    Queries the NERSC Slurm database using sacct with appropriate flags to get
329
    information on the job STATE. If use_cache is set and all qids have cached
330
    values from a previous query, those cached states will be returned instead.
331

332
    Parameters
333
    ----------
334
    jobids : list or array of ints
335
        Slurm QID's at NERSC that you want to return information about.
336
    dry_run : int
337
        Whether this is a simulated run or real run. If nonzero, it is a simulation and it returns a default
338
        table that doesn't query the Slurm scheduler.
339
    use_cache, bool. If True the code first looks for a cached status
340
        for the qid. If unavailable, then it queries Slurm. Default is False.
341

342
    Returns
343
    -------
344
    Dict
345
        Dictionary with the keys as jobids and values as the slurm state of the job.
346
    """
347
    def_qid = get_default_qid()
1✔
348
    global _cached_slurm_states
349
    qids = np.atleast_1d(qids).astype(int)
1✔
350
    log = get_logger()
1✔
351

352
    ## Only use cached values if all are cahced, since the time is dominated
353
    ## by the call itself rather than the number of jobids, so we may as well
354
    ## get updated information from all of them if we're submitting a query anyway
355
    outdict = dict()
1✔
356
    if use_cache and np.all(np.isin(qids, list(_cached_slurm_states.keys()))):
1✔
357
        log.info(f"All Slurm {qids=} are cached. Using cached values.")
1✔
358
        for qid in qids:
1✔
359
            outdict[qid] = _cached_slurm_states[qid]
1✔
360
    else:
361
        if dry_run > 2 or dry_run < 1:
1✔
362
            outtable = queue_info_from_qids(qids, columns='jobid,state', dry_run=dry_run)
1✔
363
            for row in outtable:
1✔
364
                if int(row['JOBID']) != def_qid:
1✔
365
                    outdict[int(row['JOBID'])] = row['STATE']
1✔
366
    return outdict
1✔
367

368
def update_queue_state_cache_from_table(queue_info_table):
1✔
369
    """
370
    Takes a Slurm jobid and updates the queue id cache with the supplied state
371

372
    Parameters
373
    ----------
374
    queue_info_table : astropy.table.Table
375
        Table returned by an sacct query. Should contain at least JOBID and STATE
376
        columns
377

378
    Returns
379
    -------
380
    Nothing
381

382
    """
383
    ## Update the cached states of these jobids if we have that info to update
384
    if 'JOBID' in queue_info_table.colnames and 'STATE' in queue_info_table.colnames:
1✔
385
        for row in queue_info_table:
1✔
386
            update_queue_state_cache(qid=row['JOBID'], state=row['STATE'])
1✔
387

388
def update_queue_state_cache(qid, state):
1✔
389
    """
390
    Takes a Slurm jobid and updates the queue id cache with the supplied state
391

392
    Parameters
393
    ----------
394
    qid : int
395
        Slurm QID at NERSC
396
    state: str
397
        The current job status of the Slurm jobid
398

399
    Returns
400
    -------
401
    Nothing
402

403
    """
404
    global _cached_slurm_states
405
    if int(qid) != get_default_qid():
1✔
406
        _cached_slurm_states[int(qid)] = state
1✔
407

408
def clear_queue_state_cache():
1✔
409
    """
410
    Remove all entries from the queue state cache
411
    """
412
    global _cached_slurm_states
413
    _cached_slurm_states.clear()
1✔
414

415

416
def update_from_queue(ptable, qtable=None, dry_run=0, ignore_scriptnames=False,
1✔
417
                      check_complete_jobs=False):
418
    """
419
    Given an input prcessing table (ptable) and query table from the Slurm queue (qtable) it cross matches the
420
    Slurm job ID's and updates the 'state' in the table using the current state in the Slurm scheduler system.
421

422
    Args:
423
        ptable, Table. Processing table that contains the jobs you want updated with the most recent queue table. Must
424
                       have at least columnns 'LATEST_QID' and 'STATUS'.
425
        qtable, Table. Table with the columns defined by the input variable 'columns' and information relating
426
                                 to all jobs submitted by the specified user in the specified time frame.
427
        ignore_scriptnames, bool. Default is False. Set to true if you do not
428
                        want to check whether the scriptname matches the jobname
429
                        return by the slurm scheduler.
430
        check_complete_jobs, bool. Default is False. Set to true if you want to
431
                        also check QID's that currently have a STATUS "COMPLETED".
432
                        in the ptable.
433
        The following are only used if qtable is not provided:
434
            dry_run, int. Whether this is a simulated run or real run. If nonzero, it is a simulation and it returns a default
435
                           table that doesn't query the Slurm scheduler.
436

437
    Returns:
438
        ptable, Table. The same processing table as the input except that the "STATUS" column in ptable for all jobs is
439
                       updated based on the 'STATE' in the qtable (as matched by "LATEST_QID" in the ptable
440
                       and "JOBID" in the qtable).
441
    """
442
    log = get_logger()
1✔
443
    if qtable is None:
1✔
444
        log.info("qtable not provided, querying Slurm using ptable's LATEST_QID set")
1✔
445
        ## Avoid null valued QID's (set to 2)
446
        sel = ptable['LATEST_QID'] > 2
1✔
447
        ## Only submit incomplete jobs unless explicitly told to check them
448
        ## completed jobs shouldn't change status
449
        if not check_complete_jobs:
1✔
450
            sel &= (ptable['STATUS'] != 'COMPLETED')
1✔
451
        qids = np.array(ptable['LATEST_QID'][sel])
1✔
452
        qtable = queue_info_from_qids(qids, dry_run=dry_run)
1✔
453

454
    log.info(f"Slurm returned information on {len(qtable)} jobs out of "
1✔
455
             +f"{len(ptable)} jobs in the ptable. Updating those now.")
456

457
    check_scriptname = ('JOBNAME' in qtable.colnames
1✔
458
                        and 'SCRIPTNAME' in ptable.colnames
459
                        and not ignore_scriptnames)
460
    if check_scriptname:
1✔
461
        log.info("Will be verifying that the file names are consistent")
1✔
462

463
    for row in qtable:
1✔
464
        if int(row['JOBID']) == get_default_qid():
1✔
465
            continue
×
466
        match = (int(row['JOBID']) == ptable['LATEST_QID'])
1✔
467
        if np.any(match):
1✔
468
            ind = np.where(match)[0][0]
1✔
469
            if check_scriptname and ptable['SCRIPTNAME'][ind] not in row['JOBNAME']:
1✔
470
                log.warning(f"For job with expids:{ptable['EXPID'][ind]}"
×
471
                            + f" the scriptname is {ptable['SCRIPTNAME'][ind]}"
472
                            + f" but the jobname in the queue was "
473
                            + f"{row['JOBNAME']}.")
474
            state = str(row['STATE']).split(' ')[0]
1✔
475
            ## Since dry run 1 and 2 save proc tables, don't alter the
476
            ## states for these when simulating
477
            if dry_run > 2 or dry_run < 1:
1✔
478
                ptable['STATUS'][ind] = state
1✔
479

480
    return ptable
1✔
481

482
def any_jobs_not_complete(statuses, termination_states=None):
1✔
483
    """
484
    Returns True if any of the job statuses in the input column of the processing table, statuses, are not complete
485
    (as based on the list of acceptable final states, termination_states, given as an argument. These should be states
486
    that are viewed as final, as opposed to job states that require resubmission.
487

488
    Args:
489
        statuses, Table.Column or list or np.array. The statuses in the processing table "STATUS". Each element should
490
                                                    be a string.
491
        termination_states, list or np.array. Each element should be a string signifying a state that is returned
492
                                              by the Slurm scheduler that should be deemed terminal state.
493

494
    Returns:
495
        bool. True if any of the statuses of the jobs given in statuses are NOT a member of the termination states.
496
              Otherwise returns False.
497
    """
498
    if termination_states is None:
×
499
        termination_states = get_termination_states()
×
500
    return np.any([status not in termination_states for status in statuses])
×
501

502
def any_jobs_failed(statuses, failed_states=None):
1✔
503
    """
504
    Returns True if any of the job statuses in the input column of the
505
    processing table, statuses, are not complete (as based on the list of
506
    acceptable final states, termination_states, given as an argument. These
507
    should be states that are viewed as final, as opposed to job states
508
    that require resubmission.
509

510
    Args:
511
        statuses, Table.Column or list or np.array. The statuses in the
512
            processing table "STATUS". Each element should be a string.
513
        failed_states, list or np.array. Each element should be a string
514
            signifying a state that is returned by the Slurm scheduler that
515
            should be consider failing or problematic.
516

517
    Returns:
518
        bool. True if any of the statuses of the jobs given in statuses are 
519
            a member of the failed_states.
520
    """
521
    if failed_states is None:
×
522
        failed_states = get_failed_states()
×
523
    return np.any([status in failed_states for status in statuses])
×
524

525
def get_jobs_in_queue(user=None, include_scron=False, dry_run_level=0):
1✔
526
    """
527
    Queries the NERSC Slurm database using sacct with appropriate flags to get
528
    information about specific jobs based on their jobids.
529

530
    Parameters
531
    ----------
532
    user : str
533
        NERSC user to query the jobs for
534
    include_scron : bool
535
        True if you want to include scron entries in the returned table.
536
        Default is False.
537
    dry_run_level : int
538
        Whether this is a simulated run or real run. If nonzero, it is a
539
        simulation and it returns a default table that doesn't query the
540
        Slurm scheduler.
541

542
    Returns
543
    -------
544
    Table
545
        Table with the columns JOBID, PARTITION, RESERVATION, NAME, USER, ST, TIME, NODES,
546
        NODELIST(REASON) for the specified user.
547
    """
548
    log = get_logger()
×
549
    if user is None:
×
550
        if 'USER' in os.environ:
×
551
            user = os.environ['USER']
×
552
        else:
553
            user = 'desi'
×
554

555
    cmd = f'squeue -u {user} -o "%i,%P,%v,%j,%u,%t,%M,%D,%R"'
×
556
    cmd_as_list = cmd.split()
×
557

558
    if dry_run_level > 0:
×
559
        log.info("Dry run, would have otherwise queried Slurm with the"
×
560
                 +f" following: {' '.join(cmd_as_list)}")
561
        string = 'JOBID,PARTITION,RESERVATION,NAME,USER,ST,TIME,NODES,NODELIST(REASON)'
×
562
        string += f"27650097,cron,(null),scron_ar,{user},PD,0:00,1,(BeginTime)"
×
563
        string += f"27650100,cron,(null),scron_nh,{user},PD,0:00,1,(BeginTime)"
×
564
        string += f"27650098,cron,(null),scron_up,{user},PD,0:00,1,(BeginTime)"
×
565
        string += f"29078887,gpu_ss11,(null),tilenight-20230413-24315,{user},PD,0:00,1,(Priority)"
×
566
        string += f"29078892,gpu_ss11,(null),tilenight-20230413-21158,{user},PD,0:00,1,(Priority)"
×
567
        string += f"29079325,gpu_ss11,(null),tilenight-20240309-24526,{user},PD,0:00,1,(Dependency)"
×
568
        string += f"29079322,gpu_ss11,(null),ztile-22959-thru20240309,{user},PD,0:00,1,(Dependency)"
×
569
        string += f"29078883,gpu_ss11,(null),tilenight-20230413-21187,{user},R,10:18,1,nid003960"
×
570
        string += f"29079242,regular_milan_ss11,(null),arc-20240309-00229483-a0123456789,{user},PD,0:00,3,(Priority)"
×
571
        string += f"29079246,regular_milan_ss11,(null),arc-20240309-00229484-a0123456789,{user},PD,0:00,3,(Priority)"
×
572

573
        # create command to run to exercise subprocess -> stdout parsing
574
        cmd = 'echo ' + string
×
575
        cmd_as_list = ['echo', string]
×
576
    else:
577
        log.info(f"Querying jobs in queue with: {' '.join(cmd_as_list)}")
×
578

579
    #- sacct sometimes fails; try several times before giving up
580
    max_attempts = 3
×
581
    for attempt in range(max_attempts):
×
582
        try:
×
583
            table_as_string = subprocess.check_output(cmd_as_list, text=True,
×
584
                                          stderr=subprocess.STDOUT)
585
            break
×
586
        except subprocess.CalledProcessError as err:
×
587
            log.error(f'{cmd} job query failure at {datetime.datetime.now()}')
×
588
            log.error(f'{cmd_as_list}')
×
589
            log.error(f'{err.output=}')
×
590
    else:  #- for/else happens if loop doesn't succeed
591
        msg = f'{cmd} query failed {max_attempts} times; exiting'
×
592
        log.critical(msg)
×
593
        raise RuntimeError(msg)
×
594

595
    ## remove extra quotes that astropy table does't like
596
    table_as_string = table_as_string.replace('"','')
×
597

598
    ## remove parenthesis are also not very desirable
599
    table_as_string = table_as_string.replace('(', '').replace(')', '')
×
600

601

602
    ## remove node list with hyphen or comma otherwise it will break table reader
603
    table_as_string = re.sub(r"nid\[[0-9,-]*\]", "multiple nodes", table_as_string)
×
604

605
    try:
×
606
        queue_info_table = Table.read(table_as_string, format='ascii.csv')
×
607
    except:
×
608
        log.info("Table retured by squeue couldn't be parsed. The string was:")
×
609
        print(table_as_string)
×
610
        raise
×
611
    
612
    for col in queue_info_table.colnames:
×
613
        queue_info_table.rename_column(col, col.upper())
×
614

615
    ## If the table is empty, return it immediately, otherwise perform
616
    ## sanity check and cuts
617
    if len(queue_info_table) == 0:
×
618
        return queue_info_table
×
619

620
    if np.any(queue_info_table['USER']!=user):
×
621
        msg = f"Warning {np.sum(queue_info_table['USER']!=user)} " \
×
622
              + f"jobs returned were not {user=}\n" \
623
              + f"{queue_info_table['USER'][queue_info_table['USER']!=user]}"
624
        log.critical(msg)
×
625
        raise ValueError(msg)
×
626

627
    if not include_scron:
×
628
        queue_info_table = queue_info_table[queue_info_table['PARTITION'] != 'cron']
×
629

630
    return queue_info_table
×
631

632

633
def check_queue_count(user=None, include_scron=False, dry_run_level=0):
1✔
634
    """
635
    Queries the NERSC Slurm database using sacct with appropriate flags to get
636
    information about specific jobs based on their jobids.
637

638
    Parameters
639
    ----------
640
    user : str
641
        NERSC user to query the jobs for
642
    include_scron : bool
643
        True if you want to include scron entries in the returned table.
644
        Default is False.
645
    dry_run_level : int
646
        Whether this is a simulated run or real run. If nonzero, it is a
647
        simulation and it returns a default table that doesn't query the
648
        Slurm scheduler.
649

650
    Returns
651
    -------
652
    int
653
        The number of jobs for that user in the queue (including or excluding
654
        scron entries depending on include_scron).
655
    """
656
    return len(get_jobs_in_queue(user=user, include_scron=include_scron,
×
657
                                 dry_run_level=dry_run_level))
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc