• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

desihub / desispec / 11924426179

20 Nov 2024 12:39AM UTC coverage: 30.072% (-0.09%) from 30.16%
11924426179

Pull #2411

github

segasai
reformat
Pull Request #2411: save the trace shift offsets in the psf file

1 of 31 new or added lines in 3 files covered. (3.23%)

1675 existing lines in 20 files now uncovered.

14637 of 48673 relevant lines covered (30.07%)

0.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.0
/py/desispec/workflow/queue.py
1
"""
2
desispec.workflow.queue
3
=======================
4

5
"""
6
import os
1✔
7
import re
1✔
8
import numpy as np
1✔
9
from astropy.table import Table, vstack
1✔
10
import subprocess
1✔
11

12
from desispec.workflow.proctable import get_default_qid
1✔
13
from desiutil.log import get_logger
1✔
14
import time, datetime
1✔
15

16
global _cached_slurm_states
17
_cached_slurm_states = dict()
1✔
18

19
def get_resubmission_states(no_resub_failed=False):
1✔
20
    """
21
    Defines what Slurm job failure modes should be resubmitted in the hopes of the job succeeding the next time.
22

23
    Possible values that Slurm returns are::
24

25
        CA or ca or CANCELLED for cancelled jobs will only show currently running jobs in queue unless times are explicitly given
26
        BF BOOT_FAIL   Job terminated due to launch failure
27
        CA CANCELLED Job was explicitly cancelled by the user or system administrator. The job may or may not have been initiated.
28
        CD COMPLETED Job has terminated all processes on all nodes with an exit code of zero.
29
        DL DEADLINE Job terminated on deadline.
30
        F FAILED Job terminated with non-zero exit code or other failure condition.
31
        NF NODE_FAIL Job terminated due to failure of one or more allocated nodes.
32
        OOM OUT_OF_MEMORY Job experienced out of memory error.
33
        PD PENDING Job is awaiting resource allocation.
34
        PR PREEMPTED Job terminated due to preemption.
35
        R RUNNING Job currently has an allocation.
36
        RQ REQUEUED Job was requeued.
37
        RS RESIZING Job is about to change size.
38
        RV REVOKED Sibling was removed from cluster due to other cluster starting the job.
39
        S SUSPENDED Job has an allocation, but execution has been suspended and CPUs have been released for other jobs.
40
        TO TIMEOUT Job terminated upon reaching its time limit.
41

42
    Args:
43
        no_resub_failed: bool. Set to True if you do NOT want to resubmit
44
            jobs with Slurm status 'FAILED' by default. Default is False.
45

46
    Returns:
47
        list. A list of strings outlining the job states that should be resubmitted.
48
    """
49
    ## 'UNSUBMITTED' is default pipeline state for things not yet submitted
50
    ## 'DEP_NOT_SUBD' is set when resubmission can't proceed because a
51
    ## dependency has failed
52
    resub_states = ['UNSUBMITTED', 'DEP_NOT_SUBD', 'BOOT_FAIL', 'DEADLINE', 'NODE_FAIL',
1✔
53
                    'OUT_OF_MEMORY', 'PREEMPTED', 'TIMEOUT', 'CANCELLED']
54
    if not no_resub_failed:
1✔
55
        resub_states.append('FAILED')
1✔
56
    return resub_states
1✔
57

58

59
def get_termination_states():
1✔
60
    """
61
    Defines what Slurm job states that are final and aren't in question about needing resubmission.
62

63
    Possible values that Slurm returns are::
64

65
        CA or ca or CANCELLED for cancelled jobs will only show currently running jobs in queue unless times are explicitly given
66
        BF BOOT_FAIL   Job terminated due to launch failure
67
        CA CANCELLED Job was explicitly cancelled by the user or system administrator. The job may or may not have been initiated.
68
        CD COMPLETED Job has terminated all processes on all nodes with an exit code of zero.
69
        DL DEADLINE Job terminated on deadline.
70
        F FAILED Job terminated with non-zero exit code or other failure condition.
71
        NF NODE_FAIL Job terminated due to failure of one or more allocated nodes.
72
        OOM OUT_OF_MEMORY Job experienced out of memory error.
73
        PD PENDING Job is awaiting resource allocation.
74
        PR PREEMPTED Job terminated due to preemption.
75
        R RUNNING Job currently has an allocation.
76
        RQ REQUEUED Job was requeued.
77
        RS RESIZING Job is about to change size.
78
        RV REVOKED Sibling was removed from cluster due to other cluster starting the job.
79
        S SUSPENDED Job has an allocation, but execution has been suspended and CPUs have been released for other jobs.
80
        TO TIMEOUT Job terminated upon reaching its time limit.
81

82
    Returns:
83
        list. A list of strings outlining the job states that are considered final (without human investigation/intervention)
84
    """
85
    return ['COMPLETED', 'CANCELLED', 'FAILED']
×
86

87
def get_failed_states():
1✔
88
    """ 
89
    Defines what Slurm job states should be considered failed or problematic
90

91
    All possible values that Slurm returns are:
92
        BF BOOT_FAIL Job terminated due to launch failure, typically due to a hardware failure (e.g. unable to boot the node or block and the job can not be requeued).
93
        CA CANCELLED Job was explicitly cancelled by the user or system administrator. The job may or may not have been initiated.
94
        CD COMPLETED Job has terminated all processes on all nodes with an exit code of zero.
95
        CF CONFIGURING Job has been allocated resources, but are waiting for them to become ready for use (e.g. booting).
96
        CG COMPLETING Job is in the process of completing. Some processes on some nodes may still be active.
97
        DL DEADLINE Job terminated on deadline.
98
        F FAILED Job terminated with non-zero exit code or other failure condition.
99
        NF NODE_FAIL Job terminated due to failure of one or more allocated nodes.
100
        OOM OUT_OF_MEMORY Job experienced out of memory error.
101
        PD PENDING Job is awaiting resource allocation.
102
        PR PREEMPTED Job terminated due to preemption.
103
        R RUNNING Job currently has an allocation.
104
        RD RESV_DEL_HOLD Job is being held after requested reservation was deleted.
105
        RF REQUEUE_FED Job is being requeued by a federation.
106
        RH REQUEUE_HOLD Held job is being requeued.
107
        RQ REQUEUED Completing job is being requeued.
108
        RS RESIZING Job is about to change size.
109
        RV REVOKED Sibling was removed from cluster due to other cluster starting the job.
110
        SI SIGNALING Job is being signaled.
111
        SE SPECIAL_EXIT The job was requeued in a special state. This state can be set by users, typically in EpilogSlurmctld, if the job has terminated with a particular exit value.
112
        SO STAGE_OUT Job is staging out files.
113
        ST STOPPED Job has an allocation, but execution has been stopped with SIGSTOP signal. CPUS have been retained by this job.
114
        S SUSPENDED Job has an allocation, but execution has been suspended and CPUs have been released for other jobs.
115
        TO TIMEOUT Job terminated upon reaching its time limit.
116
    
117
    Returns:
118
        list. A list of strings outlining the job states that are considered to be
119
            failed or problematic.
120
    """
121
    return ['BOOT_FAIL', 'CANCELLED', 'DEADLINE', 'FAILED', 'NODE_FAIL',
×
122
            'OUT_OF_MEMORY', 'PREEMPTED', 'REVOKED', 'SUSPENDED', 'TIMEOUT']
123

124

125
def get_non_final_states():
1✔
126
    """
127
    Defines what Slurm job states that are not final and therefore indicate the
128
    job hasn't finished running.
129

130
    Possible values that Slurm returns are:
131

132
        CA or ca or CANCELLED for cancelled jobs will only show currently running jobs in queue unless times are explicitly given
133
        BF BOOT_FAIL   Job terminated due to launch failure
134
        CA CANCELLED Job was explicitly cancelled by the user or system administrator. The job may or may not have been initiated.
135
        CD COMPLETED Job has terminated all processes on all nodes with an exit code of zero.
136
        DL DEADLINE Job terminated on deadline.
137
        F FAILED Job terminated with non-zero exit code or other failure condition.
138
        NF NODE_FAIL Job terminated due to failure of one or more allocated nodes.
139
        OOM OUT_OF_MEMORY Job experienced out of memory error.
140
        PD PENDING Job is awaiting resource allocation.
141
        PR PREEMPTED Job terminated due to preemption.
142
        R RUNNING Job currently has an allocation.
143
        RQ REQUEUED Job was requeued.
144
        RS RESIZING Job is about to change size.
145
        RV REVOKED Sibling was removed from cluster due to other cluster starting the job.
146
        S SUSPENDED Job has an allocation, but execution has been suspended and CPUs have been released for other jobs.
147
        TO TIMEOUT Job terminated upon reaching its time limit.
148

149
    Returns:
150
        list. A list of strings outlining the job states that are considered final (without human investigation/intervention)
151
    """
152
    return ['PENDING', 'RUNNING', 'REQUEUED', 'RESIZING']
×
153

154
def queue_info_from_time_window(start_time=None, end_time=None, user=None, \
1✔
155
                             columns='jobid,jobname,partition,submit,eligible,'+
156
                                     'start,end,elapsed,state,exitcode',
157
                             dry_run=0):
158
    """
159
    Queries the NERSC Slurm database using sacct with appropriate flags to get information within a specified time
160
    window of all jobs submitted or executed during that time.
161

162
    Parameters
163
    ----------
164
    start_time : str
165
        String of the form YYYY-mm-ddTHH:MM:SS. Based on the given night and the earliest hour you
166
        want to see queue information about.
167
    end_time : str
168
        String of the form YYYY-mm-ddTHH:MM:SS. Based on the given night and the latest hour you
169
        want to see queue information about.
170
    user : str
171
        The username at NERSC that you want job information about. The default is an the environment name if
172
        if exists, otherwise 'desi'.
173
    columns : str
174
        Comma seperated string of valid sacct column names, in lower case. To be useful for the workflow,
175
        it should have MUST have columns "JOBID" and "STATE". Other columns available that aren't included
176
        in the default list are: jobid,jobname,partition,submit,eligible,start,end,elapsed,state,exitcode.
177
        Other options include: suspended,derivedexitcode,reason,priority,jobname.
178
    dry_run : int
179
        Whether this is a simulated run or real run. If nonzero, it is a simulation and it returns a default
180
        table that doesn't query the Slurm scheduler.
181

182
    Returns
183
    -------
184
    Table
185
        Table with the columns defined by the input variable 'columns' and information relating
186
        to all jobs submitted by the specified user in the specified time frame.
187
    """
188
    # global queue_info_table
UNCOV
189
    if dry_run:
×
UNCOV
190
        string = 'JobID,JobName,Partition,Submit,Eligible,Start,End,State,ExitCode\n'
×
UNCOV
191
        string += '49482394,arc-20211102-00107062-a0123456789,realtime,2021-11-02'\
×
192
                  +'T18:31:14,2021-11-02T18:36:33,2021-11-02T18:36:33,2021-11-02T'\
193
                  +'18:48:32,COMPLETED,0:0' + '\n'
UNCOV
194
        string += '49482395,arc-20211102-00107063-a0123456789,realtime,2021-11-02'\
×
195
                  +'T18:31:16,2021-11-02T18:36:33,2021-11-02T18:48:34,2021-11-02T'\
196
                  +'18:57:02,COMPLETED,0:0' + '\n'
UNCOV
197
        string += '49482397,arc-20211102-00107064-a0123456789,realtime,2021-11-02'\
×
198
                  +'T18:31:19,2021-11-02T18:36:33,2021-11-02T18:57:05,2021-11-02T'\
199
                  +'19:06:17,COMPLETED,0:0' + '\n'
UNCOV
200
        string += '49482398,arc-20211102-00107065-a0123456789,realtime,2021-11-02'\
×
201
                  +'T18:31:24,2021-11-02T18:36:33,2021-11-02T19:06:18,2021-11-02T'\
202
                  +'19:13:59,COMPLETED,0:0' + '\n'
UNCOV
203
        string += '49482399,arc-20211102-00107066-a0123456789,realtime,2021-11-02'\
×
204
                  +'T18:31:27,2021-11-02T18:36:33,2021-11-02T19:14:00,2021-11-02T'\
205
                  +'19:24:49,COMPLETED,0:0'
UNCOV
206
        cmd_as_list = ['echo', string]
×
207
    else:
UNCOV
208
        if user is None:
×
UNCOV
209
            if 'USER' in os.environ:
×
UNCOV
210
                user = os.environ['USER']
×
211
            else:
UNCOV
212
                user = 'desi'
×
UNCOV
213
        if start_time is None:
×
UNCOV
214
            start_time = '2020-04-26T00:00'
×
UNCOV
215
        if end_time is None:
×
UNCOV
216
            end_time = '2020-05-01T00:00'
×
UNCOV
217
        cmd_as_list = ['sacct', '-X', '--parsable2', '--delimiter=,', \
×
218
                       '-S', start_time, \
219
                       '-E', end_time, \
220
                       '-u', user, \
221
                       f'--format={columns}']
222

UNCOV
223
    table_as_string = subprocess.check_output(cmd_as_list, text=True,
×
224
                                              stderr=subprocess.STDOUT)
225
    queue_info_table = Table.read(table_as_string, format='ascii.csv')
×
226

227
    for col in queue_info_table.colnames:
×
228
        queue_info_table.rename_column(col, col.upper())
×
229

230
    ## Update the cached states of these jobids if we have that info to update
231
    update_queue_state_cache_from_table(queue_info_table)
×
232

UNCOV
233
    return queue_info_table
×
234

235
def queue_info_from_qids(qids, columns='jobid,jobname,partition,submit,'+
1✔
236
                         'eligible,start,end,elapsed,state,exitcode', dry_run=0):
237
    """
238
    Queries the NERSC Slurm database using sacct with appropriate flags to get
239
    information about specific jobs based on their jobids.
240

241
    Parameters
242
    ----------
243
    jobids : list or array of ints
244
        Slurm QID's at NERSC that you want to return information about.
245
    columns : str
246
        Comma seperated string of valid sacct column names, in lower case. To be useful for the workflow,
247
        it should have MUST have columns "JOBID" and "STATE". Other columns available that aren't included
248
        in the default list are: jobid,jobname,partition,submit,eligible,start,end,elapsed,state,exitcode.
249
        Other options include: suspended,derivedexitcode,reason,priority,jobname.
250
    dry_run : int
251
        Whether this is a simulated run or real run. If nonzero, it is a simulation and it returns a default
252
        table that doesn't query the Slurm scheduler.
253

254
    Returns
255
    -------
256
    Table
257
        Table with the columns defined by the input variable 'columns' and information relating
258
        to all jobs submitted by the specified user in the specified time frame.
259
    """
260
    qids = np.atleast_1d(qids).astype(int)
1✔
261
    log = get_logger()
1✔
262

263
    ## If qids is too long, recursively call self and stack tables; otherwise sacct hangs
264
    nmax = 100
1✔
265
    if len(qids) > nmax:
1✔
UNCOV
266
        results = list()
×
UNCOV
267
        for i in range(0, len(qids), nmax):
×
UNCOV
268
            results.append(queue_info_from_qids(qids[i:i+nmax], columns=columns, dry_run=dry_run))
×
UNCOV
269
        results = vstack(results)
×
UNCOV
270
        return results
×
271
    elif len(qids) == 0:
1✔
UNCOV
272
        return Table(names=columns.upper().split(','))
×
273

274
    ## Turn the queue id's into a list
275
    ## this should work with str or int type also, though not officially supported
276
    qid_str = ','.join(np.atleast_1d(qids).astype(str)).replace(' ','')
1✔
277

278
    cmd_as_list = ['sacct', '-X', '--parsable2', '--delimiter=,',
1✔
279
                   f'--format={columns}', '-j', qid_str]
280
    if dry_run:
1✔
281
        log.info("Dry run, would have otherwise queried Slurm with the"
1✔
282
                 +f" following: {' '.join(cmd_as_list)}")
283
        ### Set a random 5% of jobs as TIMEOUT, set seed for reproducibility
284
        # np.random.seed(qids[0])
285
        states = np.array(['COMPLETED'] * len(qids))
1✔
286
        #states[np.random.random(len(qids)) < 0.05] = 'TIMEOUT'
287
        ## Try two different column configurations, otherwise give up trying to simulate
288
        string = 'JobID,JobName,Partition,Submit,Eligible,Start,End,Elapsed,State,ExitCode'
1✔
289
        if columns.lower() == string.lower():
1✔
290
            for jobid, expid, state in zip(qids, 100000+np.arange(len(qids)), states):
1✔
291
                string += f'\n{jobid},arc-20211102-{expid:08d}-a0123456789,realtime,2021-11-02'\
1✔
292
                      +'T18:31:14,2021-11-02T18:36:33,2021-11-02T18:36:33,2021-11-02T'\
293
                      +f'18:48:32,00:11:59,{state},0:0'
294
        elif columns.lower() == 'jobid,state':
1✔
295
            string = 'JobID,State'
1✔
296
            for jobid, state in zip(qids, states):
1✔
297
                string += f'\n{jobid},{state}'
1✔
298
        # create command to run to exercise subprocess -> stdout parsing
299
        cmd_as_list = ['echo', string]
1✔
300
    else:
UNCOV
301
        log.info(f"Querying Slurm with the following: {' '.join(cmd_as_list)}")
×
302

303
    #- sacct sometimes fails; try several times before giving up
304
    max_attempts = 3
1✔
305
    for attempt in range(max_attempts):
1✔
306
        try:
1✔
307
            table_as_string = subprocess.check_output(cmd_as_list, text=True,
1✔
308
                                          stderr=subprocess.STDOUT)
309
            break
1✔
UNCOV
310
        except subprocess.CalledProcessError as err:
×
UNCOV
311
            log.error(f'{qid_str} job query via sacct failure at {datetime.datetime.now()}')
×
UNCOV
312
            log.error(f'{qid_str} {cmd_as_list}')
×
UNCOV
313
            log.error(f'{qid_str} {err.output=}')
×
314
    else:  #- for/else happens if loop doesn't succeed
UNCOV
315
        msg = f'{qid_str} job query via sacct failed {max_attempts} times; exiting'
×
UNCOV
316
        log.critical(msg)
×
UNCOV
317
        raise RuntimeError(msg)
×
318

319
    queue_info_table = Table.read(table_as_string, format='ascii.csv')
1✔
320
    for col in queue_info_table.colnames:
1✔
321
        queue_info_table.rename_column(col, col.upper())
1✔
322

323
    ## Update the cached states of these jobids if we have that info to update
324
    update_queue_state_cache_from_table(queue_info_table)
1✔
325

326
    return queue_info_table
1✔
327

328
def get_queue_states_from_qids(qids, dry_run=0, use_cache=False):
1✔
329
    """
330
    Queries the NERSC Slurm database using sacct with appropriate flags to get
331
    information on the job STATE. If use_cache is set and all qids have cached
332
    values from a previous query, those cached states will be returned instead.
333

334
    Parameters
335
    ----------
336
    jobids : list or array of ints
337
        Slurm QID's at NERSC that you want to return information about.
338
    dry_run : int
339
        Whether this is a simulated run or real run. If nonzero, it is a simulation and it returns a default
340
        table that doesn't query the Slurm scheduler.
341
    use_cache, bool. If True the code first looks for a cached status
342
        for the qid. If unavailable, then it queries Slurm. Default is False.
343

344
    Returns
345
    -------
346
    Dict
347
        Dictionary with the keys as jobids and values as the slurm state of the job.
348
    """
349
    def_qid = get_default_qid()
1✔
350
    global _cached_slurm_states
351
    qids = np.atleast_1d(qids).astype(int)
1✔
352
    log = get_logger()
1✔
353

354
    ## Only use cached values if all are cahced, since the time is dominated
355
    ## by the call itself rather than the number of jobids, so we may as well
356
    ## get updated information from all of them if we're submitting a query anyway
357
    outdict = dict()
1✔
358
    if use_cache and np.all(np.isin(qids, list(_cached_slurm_states.keys()))):
1✔
359
        log.info(f"All Slurm {qids=} are cached. Using cached values.")
1✔
360
        for qid in qids:
1✔
361
            outdict[qid] = _cached_slurm_states[qid]
1✔
362
    else:
363
        if dry_run > 2 or dry_run < 1:
1✔
364
            outtable = queue_info_from_qids(qids, columns='jobid,state', dry_run=dry_run)
1✔
365
            for row in outtable:
1✔
366
                if int(row['JOBID']) != def_qid:
1✔
367
                    outdict[int(row['JOBID'])] = row['STATE']
1✔
368
    return outdict
1✔
369

370
def update_queue_state_cache_from_table(queue_info_table):
1✔
371
    """
372
    Takes a Slurm jobid and updates the queue id cache with the supplied state
373

374
    Parameters
375
    ----------
376
    queue_info_table : astropy.table.Table
377
        Table returned by an sacct query. Should contain at least JOBID and STATE
378
        columns
379

380
    Returns
381
    -------
382
    Nothing
383

384
    """
385
    ## Update the cached states of these jobids if we have that info to update
386
    if 'JOBID' in queue_info_table.colnames and 'STATE' in queue_info_table.colnames:
1✔
387
        for row in queue_info_table:
1✔
388
            update_queue_state_cache(qid=row['JOBID'], state=row['STATE'])
1✔
389

390
def update_queue_state_cache(qid, state):
1✔
391
    """
392
    Takes a Slurm jobid and updates the queue id cache with the supplied state
393

394
    Parameters
395
    ----------
396
    qid : int
397
        Slurm QID at NERSC
398
    state: str
399
        The current job status of the Slurm jobid
400

401
    Returns
402
    -------
403
    Nothing
404

405
    """
406
    global _cached_slurm_states
407
    if int(qid) != get_default_qid():
1✔
408
        _cached_slurm_states[int(qid)] = state
1✔
409

410
def clear_queue_state_cache():
1✔
411
    """
412
    Remove all entries from the queue state cache
413
    """
414
    global _cached_slurm_states
415
    _cached_slurm_states.clear()
1✔
416

417

418
def update_from_queue(ptable, qtable=None, dry_run=0, ignore_scriptnames=False,
1✔
419
                      check_complete_jobs=False):
420
    """
421
    Given an input prcessing table (ptable) and query table from the Slurm queue (qtable) it cross matches the
422
    Slurm job ID's and updates the 'state' in the table using the current state in the Slurm scheduler system.
423

424
    Args:
425
        ptable, Table. Processing table that contains the jobs you want updated with the most recent queue table. Must
426
                       have at least columnns 'LATEST_QID' and 'STATUS'.
427
        qtable, Table. Table with the columns defined by the input variable 'columns' and information relating
428
                                 to all jobs submitted by the specified user in the specified time frame.
429
        ignore_scriptnames, bool. Default is False. Set to true if you do not
430
                        want to check whether the scriptname matches the jobname
431
                        return by the slurm scheduler.
432
        check_complete_jobs, bool. Default is False. Set to true if you want to
433
                        also check QID's that currently have a STATUS "COMPLETED".
434
                        in the ptable.
435
        The following are only used if qtable is not provided:
436
            dry_run, int. Whether this is a simulated run or real run. If nonzero, it is a simulation and it returns a default
437
                           table that doesn't query the Slurm scheduler.
438

439
    Returns:
440
        ptab, Table. A opy of the same processing table as the input except that the "STATUS" column in ptable for all jobs is
441
                       updated based on the 'STATE' in the qtable (as matched by "LATEST_QID" in the ptable
442
                       and "JOBID" in the qtable).
443
    """
444
    log = get_logger()
1✔
445
    ptab = ptable.copy()
1✔
446
    if qtable is None:
1✔
447
        log.info("qtable not provided, querying Slurm using ptab's LATEST_QID set")
1✔
448
        ## Avoid null valued QID's (set to 2)
449
        sel = ptab['LATEST_QID'] > 2
1✔
450
        ## Only submit incomplete jobs unless explicitly told to check them
451
        ## completed jobs shouldn't change status
452
        if not check_complete_jobs:
1✔
453
            sel &= (ptab['STATUS'] != 'COMPLETED')
1✔
454
        log.info(f"Querying Slurm for {np.sum(sel)} QIDs from table of length {len(ptab)}.")
1✔
455
        qids = np.array(ptab['LATEST_QID'][sel])
1✔
456
        ## If you provide empty jobids Slurm gives you the three most recent jobs,
457
        ## which we don't want here
458
        if len(qids) == 0:
1✔
459
            log.info(f"No QIDs left to query. Returning the original table.")
1✔
460
            return ptab
1✔
461
        qtable = queue_info_from_qids(qids, dry_run=dry_run)
1✔
462

463
    log.info(f"Slurm returned information on {len(qtable)} jobs out of "
1✔
464
             +f"{len(ptab)} jobs in the ptab. Updating those now.")
465

466
    check_scriptname = ('JOBNAME' in qtable.colnames
1✔
467
                        and 'SCRIPTNAME' in ptab.colnames
468
                        and not ignore_scriptnames)
469
    if check_scriptname:
1✔
470
        log.info("Will be verifying that the file names are consistent")
1✔
471

472
    for row in qtable:
1✔
473
        if int(row['JOBID']) == get_default_qid():
1✔
UNCOV
474
            continue
×
475
        match = (int(row['JOBID']) == ptab['LATEST_QID'])
1✔
476
        if np.any(match):
1✔
477
            ind = np.where(match)[0][0]
1✔
478
            if check_scriptname and ptab['SCRIPTNAME'][ind] not in row['JOBNAME']:
1✔
UNCOV
479
                log.warning(f"For job with expids:{ptab['EXPID'][ind]}"
×
480
                            + f" the scriptname is {ptab['SCRIPTNAME'][ind]}"
481
                            + f" but the jobname in the queue was "
482
                            + f"{row['JOBNAME']}.")
483
            state = str(row['STATE']).split(' ')[0]
1✔
484
            ## Since dry run 1 and 2 save proc tables, don't alter the
485
            ## states for these when simulating
486
            if dry_run > 2 or dry_run < 1:
1✔
487
                ptab['STATUS'][ind] = state
1✔
488

489
    return ptab
1✔
490

491
def any_jobs_not_complete(statuses, termination_states=None):
1✔
492
    """
493
    Returns True if any of the job statuses in the input column of the processing table, statuses, are not complete
494
    (as based on the list of acceptable final states, termination_states, given as an argument. These should be states
495
    that are viewed as final, as opposed to job states that require resubmission.
496

497
    Args:
498
        statuses, Table.Column or list or np.array. The statuses in the processing table "STATUS". Each element should
499
                                                    be a string.
500
        termination_states, list or np.array. Each element should be a string signifying a state that is returned
501
                                              by the Slurm scheduler that should be deemed terminal state.
502

503
    Returns:
504
        bool. True if any of the statuses of the jobs given in statuses are NOT a member of the termination states.
505
              Otherwise returns False.
506
    """
507
    if termination_states is None:
×
508
        termination_states = get_termination_states()
×
UNCOV
509
    return np.any([status not in termination_states for status in statuses])
×
510

511
def any_jobs_failed(statuses, failed_states=None):
1✔
512
    """
513
    Returns True if any of the job statuses in the input column of the
514
    processing table, statuses, are not complete (as based on the list of
515
    acceptable final states, termination_states, given as an argument. These
516
    should be states that are viewed as final, as opposed to job states
517
    that require resubmission.
518

519
    Args:
520
        statuses, Table.Column or list or np.array. The statuses in the
521
            processing table "STATUS". Each element should be a string.
522
        failed_states, list or np.array. Each element should be a string
523
            signifying a state that is returned by the Slurm scheduler that
524
            should be consider failing or problematic.
525

526
    Returns:
527
        bool. True if any of the statuses of the jobs given in statuses are 
528
            a member of the failed_states.
529
    """
UNCOV
530
    if failed_states is None:
×
531
        failed_states = get_failed_states()
×
UNCOV
532
    return np.any([status in failed_states for status in statuses])
×
533

534
def get_jobs_in_queue(user=None, include_scron=False, dry_run_level=0):
1✔
535
    """
536
    Queries the NERSC Slurm database using sacct with appropriate flags to get
537
    information about specific jobs based on their jobids.
538

539
    Parameters
540
    ----------
541
    user : str
542
        NERSC user to query the jobs for
543
    include_scron : bool
544
        True if you want to include scron entries in the returned table.
545
        Default is False.
546
    dry_run_level : int
547
        Whether this is a simulated run or real run. If nonzero, it is a
548
        simulation and it returns a default table that doesn't query the
549
        Slurm scheduler.
550

551
    Returns
552
    -------
553
    Table
554
        Table with the columns JOBID, PARTITION, RESERVATION, NAME, USER, ST, TIME, NODES,
555
        NODELIST(REASON) for the specified user.
556
    """
UNCOV
557
    log = get_logger()
×
UNCOV
558
    if user is None:
×
559
        if 'USER' in os.environ:
×
560
            user = os.environ['USER']
×
561
        else:
UNCOV
562
            user = 'desi'
×
563

UNCOV
564
    cmd = f'squeue -u {user} -o "%i,%P,%v,%j,%u,%t,%M,%D,%R"'
×
UNCOV
565
    cmd_as_list = cmd.split()
×
566

UNCOV
567
    if dry_run_level > 0:
×
UNCOV
568
        log.info("Dry run, would have otherwise queried Slurm with the"
×
569
                 +f" following: {' '.join(cmd_as_list)}")
UNCOV
570
        string = 'JOBID,PARTITION,RESERVATION,NAME,USER,ST,TIME,NODES,NODELIST(REASON)'
×
UNCOV
571
        string += f"27650097,cron,(null),scron_ar,{user},PD,0:00,1,(BeginTime)"
×
UNCOV
572
        string += f"27650100,cron,(null),scron_nh,{user},PD,0:00,1,(BeginTime)"
×
UNCOV
573
        string += f"27650098,cron,(null),scron_up,{user},PD,0:00,1,(BeginTime)"
×
UNCOV
574
        string += f"29078887,gpu_ss11,(null),tilenight-20230413-24315,{user},PD,0:00,1,(Priority)"
×
UNCOV
575
        string += f"29078892,gpu_ss11,(null),tilenight-20230413-21158,{user},PD,0:00,1,(Priority)"
×
UNCOV
576
        string += f"29079325,gpu_ss11,(null),tilenight-20240309-24526,{user},PD,0:00,1,(Dependency)"
×
UNCOV
577
        string += f"29079322,gpu_ss11,(null),ztile-22959-thru20240309,{user},PD,0:00,1,(Dependency)"
×
UNCOV
578
        string += f"29078883,gpu_ss11,(null),tilenight-20230413-21187,{user},R,10:18,1,nid003960"
×
UNCOV
579
        string += f"29079242,regular_milan_ss11,(null),arc-20240309-00229483-a0123456789,{user},PD,0:00,3,(Priority)"
×
UNCOV
580
        string += f"29079246,regular_milan_ss11,(null),arc-20240309-00229484-a0123456789,{user},PD,0:00,3,(Priority)"
×
581

582
        # create command to run to exercise subprocess -> stdout parsing
UNCOV
583
        cmd = 'echo ' + string
×
UNCOV
584
        cmd_as_list = ['echo', string]
×
585
    else:
UNCOV
586
        log.info(f"Querying jobs in queue with: {' '.join(cmd_as_list)}")
×
587

588
    #- sacct sometimes fails; try several times before giving up
589
    max_attempts = 3
×
UNCOV
590
    for attempt in range(max_attempts):
×
UNCOV
591
        try:
×
UNCOV
592
            table_as_string = subprocess.check_output(cmd_as_list, text=True,
×
593
                                          stderr=subprocess.STDOUT)
UNCOV
594
            break
×
UNCOV
595
        except subprocess.CalledProcessError as err:
×
UNCOV
596
            log.error(f'{cmd} job query failure at {datetime.datetime.now()}')
×
UNCOV
597
            log.error(f'{cmd_as_list}')
×
UNCOV
598
            log.error(f'{err.output=}')
×
599
    else:  #- for/else happens if loop doesn't succeed
UNCOV
600
        msg = f'{cmd} query failed {max_attempts} times; exiting'
×
UNCOV
601
        log.critical(msg)
×
UNCOV
602
        raise RuntimeError(msg)
×
603

604
    ## remove extra quotes that astropy table does't like
UNCOV
605
    table_as_string = table_as_string.replace('"','')
×
606

607
    ## remove parenthesis are also not very desirable
UNCOV
608
    table_as_string = table_as_string.replace('(', '').replace(')', '')
×
609

610

611
    ## remove node list with hyphen or comma otherwise it will break table reader
UNCOV
612
    table_as_string = re.sub(r"nid\[[0-9,-]*\]", "multiple nodes", table_as_string)
×
613

UNCOV
614
    try:
×
UNCOV
615
        queue_info_table = Table.read(table_as_string, format='ascii.csv')
×
UNCOV
616
    except:
×
UNCOV
617
        log.info("Table retured by squeue couldn't be parsed. The string was:")
×
618
        print(table_as_string)
×
619
        raise
×
620
    
621
    for col in queue_info_table.colnames:
×
UNCOV
622
        queue_info_table.rename_column(col, col.upper())
×
623

624
    ## If the table is empty, return it immediately, otherwise perform
625
    ## sanity check and cuts
626
    if len(queue_info_table) == 0:
×
UNCOV
627
        return queue_info_table
×
628

629
    if np.any(queue_info_table['USER']!=user):
×
630
        msg = f"Warning {np.sum(queue_info_table['USER']!=user)} " \
×
631
              + f"jobs returned were not {user=}\n" \
632
              + f"{queue_info_table['USER'][queue_info_table['USER']!=user]}"
633
        log.critical(msg)
×
634
        raise ValueError(msg)
×
635

636
    if not include_scron:
×
637
        queue_info_table = queue_info_table[queue_info_table['PARTITION'] != 'cron']
×
638

639
    return queue_info_table
×
640

641

642
def check_queue_count(user=None, include_scron=False, dry_run_level=0):
1✔
643
    """
644
    Queries the NERSC Slurm database using sacct with appropriate flags to get
645
    information about specific jobs based on their jobids.
646

647
    Parameters
648
    ----------
649
    user : str
650
        NERSC user to query the jobs for
651
    include_scron : bool
652
        True if you want to include scron entries in the returned table.
653
        Default is False.
654
    dry_run_level : int
655
        Whether this is a simulated run or real run. If nonzero, it is a
656
        simulation and it returns a default table that doesn't query the
657
        Slurm scheduler.
658

659
    Returns
660
    -------
661
    int
662
        The number of jobs for that user in the queue (including or excluding
663
        scron entries depending on include_scron).
664
    """
665
    return len(get_jobs_in_queue(user=user, include_scron=include_scron,
×
666
                                 dry_run_level=dry_run_level))
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc