• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

desihub / desispec / 8425040410

25 Mar 2024 06:20PM UTC coverage: 28.153% (+3.1%) from 25.01%
8425040410

Pull #2187

github

web-flow
Merge branch 'main' into pipelinerefactor
Pull Request #2187: Introduce desi_proc_night to unify and simplify processing scripts

769 of 1167 new or added lines in 20 files covered. (65.9%)

10 existing lines in 6 files now uncovered.

13250 of 47065 relevant lines covered (28.15%)

0.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.87
/py/desispec/workflow/queue.py
1
"""
2
desispec.workflow.queue
3
=======================
4

5
"""
6
import os
1✔
7
import numpy as np
1✔
8
from astropy.table import Table
1✔
9
import subprocess
1✔
10
from desiutil.log import get_logger
1✔
11
import time, datetime
1✔
12

13

14
def get_resubmission_states():
1✔
15
    """
16
    Defines what Slurm job failure modes should be resubmitted in the hopes of the job succeeding the next time.
17

18
    Possible values that Slurm returns are::
19

20
        CA or ca or CANCELLED for cancelled jobs will only show currently running jobs in queue unless times are explicitly given
21
        BF BOOT_FAIL   Job terminated due to launch failure
22
        CA CANCELLED Job was explicitly cancelled by the user or system administrator. The job may or may not have been initiated.
23
        CD COMPLETED Job has terminated all processes on all nodes with an exit code of zero.
24
        DL DEADLINE Job terminated on deadline.
25
        F FAILED Job terminated with non-zero exit code or other failure condition.
26
        NF NODE_FAIL Job terminated due to failure of one or more allocated nodes.
27
        OOM OUT_OF_MEMORY Job experienced out of memory error.
28
        PD PENDING Job is awaiting resource allocation.
29
        PR PREEMPTED Job terminated due to preemption.
30
        R RUNNING Job currently has an allocation.
31
        RQ REQUEUED Job was requeued.
32
        RS RESIZING Job is about to change size.
33
        RV REVOKED Sibling was removed from cluster due to other cluster starting the job.
34
        S SUSPENDED Job has an allocation, but execution has been suspended and CPUs have been released for other jobs.
35
        TO TIMEOUT Job terminated upon reaching its time limit.
36

37
    Returns:
38
        list. A list of strings outlining the job states that should be resubmitted.
39
    """
40
    return ['UNSUBMITTED', 'BOOT_FAIL', 'DEADLINE', 'NODE_FAIL', 'OUT_OF_MEMORY', 'PREEMPTED', 'TIMEOUT', 'CANCELLED']
×
41

42

43
def get_termination_states():
1✔
44
    """
45
    Defines what Slurm job states that are final and aren't in question about needing resubmission.
46

47
    Possible values that Slurm returns are::
48

49
        CA or ca or CANCELLED for cancelled jobs will only show currently running jobs in queue unless times are explicitly given
50
        BF BOOT_FAIL   Job terminated due to launch failure
51
        CA CANCELLED Job was explicitly cancelled by the user or system administrator. The job may or may not have been initiated.
52
        CD COMPLETED Job has terminated all processes on all nodes with an exit code of zero.
53
        DL DEADLINE Job terminated on deadline.
54
        F FAILED Job terminated with non-zero exit code or other failure condition.
55
        NF NODE_FAIL Job terminated due to failure of one or more allocated nodes.
56
        OOM OUT_OF_MEMORY Job experienced out of memory error.
57
        PD PENDING Job is awaiting resource allocation.
58
        PR PREEMPTED Job terminated due to preemption.
59
        R RUNNING Job currently has an allocation.
60
        RQ REQUEUED Job was requeued.
61
        RS RESIZING Job is about to change size.
62
        RV REVOKED Sibling was removed from cluster due to other cluster starting the job.
63
        S SUSPENDED Job has an allocation, but execution has been suspended and CPUs have been released for other jobs.
64
        TO TIMEOUT Job terminated upon reaching its time limit.
65

66
    Returns:
67
        list. A list of strings outlining the job states that are considered final (without human investigation/intervention)
68
    """
69
    return ['COMPLETED', 'CANCELLED', 'FAILED']
×
70

71
def get_failed_states():
1✔
72
    """ 
73
    Defines what Slurm job states should be considered failed or problematic
74

75
    All possible values that Slurm returns are:
76
        BF BOOT_FAIL Job terminated due to launch failure, typically due to a hardware failure (e.g. unable to boot the node or block and the job can not be requeued).
77
        CA CANCELLED Job was explicitly cancelled by the user or system administrator. The job may or may not have been initiated.
78
        CD COMPLETED Job has terminated all processes on all nodes with an exit code of zero.
79
        CF CONFIGURING Job has been allocated resources, but are waiting for them to become ready for use (e.g. booting).
80
        CG COMPLETING Job is in the process of completing. Some processes on some nodes may still be active.
81
        DL DEADLINE Job terminated on deadline.
82
        F FAILED Job terminated with non-zero exit code or other failure condition.
83
        NF NODE_FAIL Job terminated due to failure of one or more allocated nodes.
84
        OOM OUT_OF_MEMORY Job experienced out of memory error.
85
        PD PENDING Job is awaiting resource allocation.
86
        PR PREEMPTED Job terminated due to preemption.
87
        R RUNNING Job currently has an allocation.
88
        RD RESV_DEL_HOLD Job is being held after requested reservation was deleted.
89
        RF REQUEUE_FED Job is being requeued by a federation.
90
        RH REQUEUE_HOLD Held job is being requeued.
91
        RQ REQUEUED Completing job is being requeued.
92
        RS RESIZING Job is about to change size.
93
        RV REVOKED Sibling was removed from cluster due to other cluster starting the job.
94
        SI SIGNALING Job is being signaled.
95
        SE SPECIAL_EXIT The job was requeued in a special state. This state can be set by users, typically in EpilogSlurmctld, if the job has terminated with a particular exit value.
96
        SO STAGE_OUT Job is staging out files.
97
        ST STOPPED Job has an allocation, but execution has been stopped with SIGSTOP signal. CPUS have been retained by this job.
98
        S SUSPENDED Job has an allocation, but execution has been suspended and CPUs have been released for other jobs.
99
        TO TIMEOUT Job terminated upon reaching its time limit.
100
    
101
    Returns:
102
        list. A list of strings outlining the job states that are considered to be
103
            failed or problematic.
104
    """
NEW
105
    return ['BOOT_FAIL', 'CANCELLED', 'DEADLINE', 'FAILED', 'NODE_FAIL',
×
106
            'OUT_OF_MEMORY', 'PREEMPTED', 'REVOKED', 'SUSPENDED', 'TIMEOUT']
107

108

109
def queue_info_from_time_window(start_time=None, end_time=None, user=None, \
1✔
110
                             columns='jobid,jobname,partition,submit,eligible,'+
111
                                     'start,end,elapsed,state,exitcode',
112
                             dry_run=0):
113
    """
114
    Queries the NERSC Slurm database using sacct with appropriate flags to get information within a specified time
115
    window of all jobs submitted or executed during that time.
116

117
    Parameters
118
    ----------
119
    start_time : str
120
        String of the form YYYY-mm-ddTHH:MM:SS. Based on the given night and the earliest hour you
121
        want to see queue information about.
122
    end_time : str
123
        String of the form YYYY-mm-ddTHH:MM:SS. Based on the given night and the latest hour you
124
        want to see queue information about.
125
    user : str
126
        The username at NERSC that you want job information about. The default is an the environment name if
127
        if exists, otherwise 'desi'.
128
    columns : str
129
        Comma seperated string of valid sacct column names, in lower case. To be useful for the workflow,
130
        it should have MUST have columns "JOBID" and "STATE". Other columns available that aren't included
131
        in the default list are: jobid,jobname,partition,submit,eligible,start,end,elapsed,state,exitcode.
132
        Other options include: suspended,derivedexitcode,reason,priority,jobname.
133
    dry_run : int
134
        Whether this is a simulated run or real run. If nonzero, it is a simulation and it returns a default
135
        table that doesn't query the Slurm scheduler.
136

137
    Returns
138
    -------
139
    Table
140
        Table with the columns defined by the input variable 'columns' and information relating
141
        to all jobs submitted by the specified user in the specified time frame.
142
    """
143
    # global queue_info_table
144
    if dry_run:
×
145
        string = 'JobID,JobName,Partition,Submit,Eligible,Start,End,State,ExitCode\n'
×
146
        string += '49482394,arc-20211102-00107062-a0123456789,realtime,2021-11-02'\
×
147
                  +'T18:31:14,2021-11-02T18:36:33,2021-11-02T18:36:33,2021-11-02T'\
148
                  +'18:48:32,COMPLETED,0:0' + '\n'
149
        string += '49482395,arc-20211102-00107063-a0123456789,realtime,2021-11-02'\
×
150
                  +'T18:31:16,2021-11-02T18:36:33,2021-11-02T18:48:34,2021-11-02T'\
151
                  +'18:57:02,COMPLETED,0:0' + '\n'
152
        string += '49482397,arc-20211102-00107064-a0123456789,realtime,2021-11-02'\
×
153
                  +'T18:31:19,2021-11-02T18:36:33,2021-11-02T18:57:05,2021-11-02T'\
154
                  +'19:06:17,COMPLETED,0:0' + '\n'
155
        string += '49482398,arc-20211102-00107065-a0123456789,realtime,2021-11-02'\
×
156
                  +'T18:31:24,2021-11-02T18:36:33,2021-11-02T19:06:18,2021-11-02T'\
157
                  +'19:13:59,COMPLETED,0:0' + '\n'
158
        string += '49482399,arc-20211102-00107066-a0123456789,realtime,2021-11-02'\
×
159
                  +'T18:31:27,2021-11-02T18:36:33,2021-11-02T19:14:00,2021-11-02T'\
160
                  +'19:24:49,COMPLETED,0:0'
161
        cmd_as_list = ['echo', string]
×
162
    else:
163
        if user is None:
×
164
            if 'USER' in os.environ:
×
165
                user = os.environ['USER']
×
166
            else:
167
                user = 'desi'
×
168
        if start_time is None:
×
169
            start_time = '2020-04-26T00:00'
×
170
        if end_time is None:
×
171
            end_time = '2020-05-01T00:00'
×
172
        cmd_as_list = ['sacct', '-X', '--parsable2', '--delimiter=,', \
×
173
                       '-S', start_time, \
174
                       '-E', end_time, \
175
                       '-u', user, \
176
                       f'--format={columns}']
177

178
    table_as_string = subprocess.check_output(cmd_as_list, text=True,
×
179
                                              stderr=subprocess.STDOUT)
180
    queue_info_table = Table.read(table_as_string, format='ascii.csv')
×
181

182
    for col in queue_info_table.colnames:
×
183
        queue_info_table.rename_column(col, col.upper())
×
184
    return queue_info_table
×
185

186

187
def queue_info_from_qids(qids, columns='jobid,jobname,partition,submit,'+
1✔
188
                         'eligible,start,end,elapsed,state,exitcode',
189
                         dry_run=0):
190
    """
191
    Queries the NERSC Slurm database using sacct with appropriate flags to get information within a specified time
192
    window of all jobs submitted or executed during that time.
193

194
    Parameters
195
    ----------
196
    jobids : list or array of ints
197
        Slurm QID's at NERSC that you want to return information about.
198
    columns : str
199
        Comma seperated string of valid sacct column names, in lower case. To be useful for the workflow,
200
        it should have MUST have columns "JOBID" and "STATE". Other columns available that aren't included
201
        in the default list are: jobid,jobname,partition,submit,eligible,start,end,elapsed,state,exitcode.
202
        Other options include: suspended,derivedexitcode,reason,priority,jobname.
203
    dry_run : int
204
        Whether this is a simulated run or real run. If nonzero, it is a simulation and it returns a default
205
        table that doesn't query the Slurm scheduler.
206

207
    Returns
208
    -------
209
    Table
210
        Table with the columns defined by the input variable 'columns' and information relating
211
        to all jobs submitted by the specified user in the specified time frame.
212
    """
213
    log = get_logger()
1✔
214
    ## Turn the queue id's into a list
215
    ## this should work with str or int type also, though not officially supported
216
    qid_str = ','.join(np.atleast_1d(qids).astype(str)).replace(' ','')
1✔
217

218
    cmd_as_list = ['sacct', '-X', '--parsable2', '--delimiter=,',
1✔
219
                   f'--format={columns}', '-j', qid_str]
220
    if dry_run:
1✔
221
        log.info("Dry run, would have otherwise queried Slurm with the"
1✔
222
                 +f" following: {' '.join(cmd_as_list)}")
223
        string = 'JobID,JobName,Partition,Submit,Eligible,Start,End,State,ExitCode\n'
1✔
224
        string += '49482394,arc-20211102-00107062-a0123456789,realtime,2021-11-02'\
1✔
225
                  +'T18:31:14,2021-11-02T18:36:33,2021-11-02T18:36:33,2021-11-02T'\
226
                  +'18:48:32,COMPLETED,0:0' + '\n'
227
        string += '49482395,arc-20211102-00107063-a0123456789,realtime,2021-11-02'\
1✔
228
                  +'T18:31:16,2021-11-02T18:36:33,2021-11-02T18:48:34,2021-11-02T'\
229
                  +'18:57:02,COMPLETED,0:0' + '\n'
230
        string += '49482397,arc-20211102-00107064-a0123456789,realtime,2021-11-02'\
1✔
231
                  +'T18:31:19,2021-11-02T18:36:33,2021-11-02T18:57:05,2021-11-02T'\
232
                  +'19:06:17,COMPLETED,0:0' + '\n'
233
        string += '49482398,arc-20211102-00107065-a0123456789,realtime,2021-11-02'\
1✔
234
                  +'T18:31:24,2021-11-02T18:36:33,2021-11-02T19:06:18,2021-11-02T'\
235
                  +'19:13:59,COMPLETED,0:0' + '\n'
236
        string += '49482399,arc-20211102-00107066-a0123456789,realtime,2021-11-02'\
1✔
237
                  +'T18:31:27,2021-11-02T18:36:33,2021-11-02T19:14:00,2021-11-02T'\
238
                  +'19:24:49,COMPLETED,0:0'
239
        cmd_as_list = ['echo', string]
1✔
240
        table_as_string = subprocess.check_output(cmd_as_list, text=True,
1✔
241
                                      stderr=subprocess.STDOUT)
242
    else:
243
        log.info(f"Querying Slurm with the following: {' '.join(cmd_as_list)}")
×
244

245
        #- sacct sometimes fails; try several times before giving up
246
        max_attempts = 3
×
247
        for attempt in range(max_attempts):
×
248
            try:
×
249
                table_as_string = subprocess.check_output(cmd_as_list, text=True,
×
250
                                              stderr=subprocess.STDOUT)
251
                break
×
252
            except subprocess.CalledProcessError as err:
×
253
                log.error(f'{qid_str} job query via sacct failure at {datetime.datetime.now()}')
×
254
                log.error(f'{qid_str} {cmd_as_list}')
×
255
                log.error(f'{qid_str} {err.output=}')
×
256
        else:  #- for/else happens if loop doesn't succeed
257
            msg = f'{qid_str} job query via sacct failed {max_attempts} times; exiting'
×
258
            log.critical(msg)
×
259
            raise RuntimeError(msg)
×
260

261
    queue_info_table = Table.read(table_as_string, format='ascii.csv')
1✔
262

263
    for col in queue_info_table.colnames:
1✔
264
        queue_info_table.rename_column(col, col.upper())
1✔
265
    return queue_info_table
1✔
266

267
def update_from_queue(ptable, qtable=None, dry_run=0, ignore_scriptnames=False):
1✔
268
    """
269
    Given an input prcessing table (ptable) and query table from the Slurm queue (qtable) it cross matches the
270
    Slurm job ID's and updates the 'state' in the table using the current state in the Slurm scheduler system.
271

272
    Args:
273
        ptable, Table. Processing table that contains the jobs you want updated with the most recent queue table. Must
274
                       have at least columnns 'LATEST_QID' and 'STATUS'.
275
        qtable, Table. Table with the columns defined by the input variable 'columns' and information relating
276
                                 to all jobs submitted by the specified user in the specified time frame.
277
        ignore_scriptnames, bool. Default is False. Set to true if you do not
278
                        want to check whether the scriptname matches the jobname
279
                        return by the slurm scheduler.
280
        The following are only used if qtable is not provided:
281
            dry_run, int. Whether this is a simulated run or real run. If nonzero, it is a simulation and it returns a default
282
                           table that doesn't query the Slurm scheduler.
283

284
    Returns:
285
        ptable, Table. The same processing table as the input except that the "STATUS" column in ptable for all jobs is
286
                       updated based on the 'STATE' in the qtable (as matched by "LATEST_QID" in the ptable
287
                       and "JOBID" in the qtable).
288
    """
289
    log = get_logger()
1✔
290
    if qtable is None:
1✔
291
        log.info("qtable not provided, querying Slurm using ptable's LATEST_QID set")
1✔
292
        qids = np.array(ptable['LATEST_QID'])
1✔
293
        ## Avoid null valued QID's (set to -99)
294
        qids = qids[qids > 0]
1✔
295
        qtable = queue_info_from_qids(qids, dry_run=dry_run)
1✔
296

297
    log.info(f"Slurm returned information on {len(qtable)} jobs out of "
1✔
298
             +f"{len(ptable)} jobs in the ptable. Updating those now.")
299

300
    check_scriptname = ('JOBNAME' in qtable.colnames
1✔
301
                        and 'SCRIPTNAME' in ptable.colnames
302
                        and not ignore_scriptnames)
303
    if check_scriptname:
1✔
304
        log.info("Will be verifying that the file names are consistent")
1✔
305

306
    for row in qtable:
1✔
307
        match = (int(row['JOBID']) == ptable['LATEST_QID'])
1✔
308
        if np.any(match):
1✔
309
            ind = np.where(match)[0][0]
×
310
            if check_scriptname and ptable['SCRIPTNAME'][ind] not in row['JOBNAME']:
×
311
                log.warning(f"For job with expids:{ptable['EXPID'][ind]}"
×
312
                            + f" the scriptname is {ptable['SCRIPTNAME'][ind]}"
313
                            + f" but the jobname in the queue was "
314
                            + f"{row['JOBNAME']}.")
315
            state = str(row['STATE']).split(' ')[0]
×
316
            ptable['STATUS'][ind] = state
×
317

318
    return ptable
1✔
319

320
def any_jobs_not_complete(statuses, termination_states=None):
1✔
321
    """
322
    Returns True if any of the job statuses in the input column of the processing table, statuses, are not complete
323
    (as based on the list of acceptable final states, termination_states, given as an argument. These should be states
324
    that are viewed as final, as opposed to job states that require resubmission.
325

326
    Args:
327
        statuses, Table.Column or list or np.array. The statuses in the processing table "STATUS". Each element should
328
                                                    be a string.
329
        termination_states, list or np.array. Each element should be a string signifying a state that is returned
330
                                              by the Slurm scheduler that should be deemed terminal state.
331

332
    Returns:
333
        bool. True if any of the statuses of the jobs given in statuses are NOT a member of the termination states.
334
              Otherwise returns False.
335
    """
336
    if termination_states is None:
×
337
        termination_states = get_termination_states()
×
338
    return np.any([status not in termination_states for status in statuses])
×
339

340
def any_jobs_failed(statuses, failed_states=None):
1✔
341
    """
342
    Returns True if any of the job statuses in the input column of the
343
    processing table, statuses, are not complete (as based on the list of
344
    acceptable final states, termination_states, given as an argument. These
345
    should be states that are viewed as final, as opposed to job states
346
    that require resubmission.
347

348
    Args:
349
        statuses, Table.Column or list or np.array. The statuses in the
350
            processing table "STATUS". Each element should be a string.
351
        failed_states, list or np.array. Each element should be a string
352
            signifying a state that is returned by the Slurm scheduler that
353
            should be consider failing or problematic.
354

355
    Returns:
356
        bool. True if any of the statuses of the jobs given in statuses are 
357
            a member of the failed_states.
358
    """
NEW
359
    if failed_states is None:
×
NEW
360
        failed_states = get_failed_states()
×
NEW
361
    return np.any([status in failed_states for status in statuses])
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc