• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

desihub / desispec / 5203800523

pending completion
5203800523

Pull #2042

github-actions

web-flow
Merge e46d3f4d8 into 8328a5083
Pull Request #2042: Focus

83 of 83 new or added lines in 1 file covered. (100.0%)

10697 of 43982 relevant lines covered (24.32%)

0.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

13.64
/py/desispec/workflow/queue.py
1
"""
2
desispec.workflow.queue
3
=======================
4

5
"""
6
import os
1✔
7
import numpy as np
1✔
8
from astropy.table import Table
1✔
9
import subprocess
1✔
10
from desiutil.log import get_logger
1✔
11
import time, datetime
1✔
12

13

14
def get_resubmission_states():
1✔
15
    """
16
    Defines what Slurm job failure modes should be resubmitted in the hopes of the job succeeding the next time.
17

18
    Possible values that Slurm returns are::
19

20
        CA or ca or CANCELLED for cancelled jobs will only show currently running jobs in queue unless times are explicitly given
21
        BF BOOT_FAIL   Job terminated due to launch failure
22
        CA CANCELLED Job was explicitly cancelled by the user or system administrator. The job may or may not have been initiated.
23
        CD COMPLETED Job has terminated all processes on all nodes with an exit code of zero.
24
        DL DEADLINE Job terminated on deadline.
25
        F FAILED Job terminated with non-zero exit code or other failure condition.
26
        NF NODE_FAIL Job terminated due to failure of one or more allocated nodes.
27
        OOM OUT_OF_MEMORY Job experienced out of memory error.
28
        PD PENDING Job is awaiting resource allocation.
29
        PR PREEMPTED Job terminated due to preemption.
30
        R RUNNING Job currently has an allocation.
31
        RQ REQUEUED Job was requeued.
32
        RS RESIZING Job is about to change size.
33
        RV REVOKED Sibling was removed from cluster due to other cluster starting the job.
34
        S SUSPENDED Job has an allocation, but execution has been suspended and CPUs have been released for other jobs.
35
        TO TIMEOUT Job terminated upon reaching its time limit.
36

37
    Returns:
38
        list. A list of strings outlining the job states that should be resubmitted.
39
    """
40
    return ['UNSUBMITTED', 'BOOT_FAIL', 'DEADLINE', 'NODE_FAIL', 'OUT_OF_MEMORY', 'PREEMPTED', 'TIMEOUT', 'CANCELLED']
×
41

42

43
def get_termination_states():
1✔
44
    """
45
    Defines what Slurm job states that are final and aren't in question about needing resubmission.
46

47
    Possible values that Slurm returns are::
48

49
        CA or ca or CANCELLED for cancelled jobs will only show currently running jobs in queue unless times are explicitly given
50
        BF BOOT_FAIL   Job terminated due to launch failure
51
        CA CANCELLED Job was explicitly cancelled by the user or system administrator. The job may or may not have been initiated.
52
        CD COMPLETED Job has terminated all processes on all nodes with an exit code of zero.
53
        DL DEADLINE Job terminated on deadline.
54
        F FAILED Job terminated with non-zero exit code or other failure condition.
55
        NF NODE_FAIL Job terminated due to failure of one or more allocated nodes.
56
        OOM OUT_OF_MEMORY Job experienced out of memory error.
57
        PD PENDING Job is awaiting resource allocation.
58
        PR PREEMPTED Job terminated due to preemption.
59
        R RUNNING Job currently has an allocation.
60
        RQ REQUEUED Job was requeued.
61
        RS RESIZING Job is about to change size.
62
        RV REVOKED Sibling was removed from cluster due to other cluster starting the job.
63
        S SUSPENDED Job has an allocation, but execution has been suspended and CPUs have been released for other jobs.
64
        TO TIMEOUT Job terminated upon reaching its time limit.
65

66
    Returns:
67
        list. A list of strings outlining the job states that are considered final (without human investigation/intervention)
68
    """
69
    return ['COMPLETED', 'CANCELLED', 'FAILED']
×
70

71

72

73
def queue_info_from_time_window(start_time=None, end_time=None, user=None, \
1✔
74
                             columns='jobid,jobname,partition,submit,eligible,'+
75
                                     'start,end,elapsed,state,exitcode',
76
                             dry_run=0):
77
    """
78
    Queries the NERSC Slurm database using sacct with appropriate flags to get information within a specified time
79
    window of all jobs submitted or executed during that time.
80

81
    Parameters
82
    ----------
83
    start_time : str
84
        String of the form YYYY-mm-ddTHH:MM:SS. Based on the given night and the earliest hour you
85
        want to see queue information about.
86
    end_time : str
87
        String of the form YYYY-mm-ddTHH:MM:SS. Based on the given night and the latest hour you
88
        want to see queue information about.
89
    user : str
90
        The username at NERSC that you want job information about. The default is an the environment name if
91
        if exists, otherwise 'desi'.
92
    columns : str
93
        Comma seperated string of valid sacct column names, in lower case. To be useful for the workflow,
94
        it should have MUST have columns "JOBID" and "STATE". Other columns available that aren't included
95
        in the default list are: jobid,jobname,partition,submit,eligible,start,end,elapsed,state,exitcode.
96
        Other options include: suspended,derivedexitcode,reason,priority,jobname.
97
    dry_run : int
98
        Whether this is a simulated run or real run. If nonzero, it is a simulation and it returns a default
99
        table that doesn't query the Slurm scheduler.
100

101
    Returns
102
    -------
103
    Table
104
        Table with the columns defined by the input variable 'columns' and information relating
105
        to all jobs submitted by the specified user in the specified time frame.
106
    """
107
    # global queue_info_table
108
    if dry_run:
×
109
        string = 'JobID,JobName,Partition,Submit,Eligible,Start,End,State,ExitCode\n'
×
110
        string += '49482394,arc-20211102-00107062-a0123456789,realtime,2021-11-02'\
×
111
                  +'T18:31:14,2021-11-02T18:36:33,2021-11-02T18:36:33,2021-11-02T'\
112
                  +'18:48:32,COMPLETED,0:0' + '\n'
113
        string += '49482395,arc-20211102-00107063-a0123456789,realtime,2021-11-02'\
×
114
                  +'T18:31:16,2021-11-02T18:36:33,2021-11-02T18:48:34,2021-11-02T'\
115
                  +'18:57:02,COMPLETED,0:0' + '\n'
116
        string += '49482397,arc-20211102-00107064-a0123456789,realtime,2021-11-02'\
×
117
                  +'T18:31:19,2021-11-02T18:36:33,2021-11-02T18:57:05,2021-11-02T'\
118
                  +'19:06:17,COMPLETED,0:0' + '\n'
119
        string += '49482398,arc-20211102-00107065-a0123456789,realtime,2021-11-02'\
×
120
                  +'T18:31:24,2021-11-02T18:36:33,2021-11-02T19:06:18,2021-11-02T'\
121
                  +'19:13:59,COMPLETED,0:0' + '\n'
122
        string += '49482399,arc-20211102-00107066-a0123456789,realtime,2021-11-02'\
×
123
                  +'T18:31:27,2021-11-02T18:36:33,2021-11-02T19:14:00,2021-11-02T'\
124
                  +'19:24:49,COMPLETED,0:0'
125
        cmd_as_list = ['echo', string]
×
126
    else:
127
        if user is None:
×
128
            if 'USER' in os.environ:
×
129
                user = os.environ['USER']
×
130
            else:
131
                user = 'desi'
×
132
        if start_time is None:
×
133
            start_time = '2020-04-26T00:00'
×
134
        if end_time is None:
×
135
            end_time = '2020-05-01T00:00'
×
136
        cmd_as_list = ['sacct', '-X', '--parsable2', '--delimiter=,', \
×
137
                       '-S', start_time, \
138
                       '-E', end_time, \
139
                       '-u', user, \
140
                       f'--format={columns}']
141

142
    table_as_string = subprocess.check_output(cmd_as_list, text=True,
×
143
                                              stderr=subprocess.STDOUT)
144
    queue_info_table = Table.read(table_as_string, format='ascii.csv')
×
145

146
    for col in queue_info_table.colnames:
×
147
        queue_info_table.rename_column(col, col.upper())
×
148
    return queue_info_table
×
149

150

151
def queue_info_from_qids(qids, columns='jobid,jobname,partition,submit,'+
1✔
152
                         'eligible,start,end,elapsed,state,exitcode',
153
                         dry_run=0):
154
    """
155
    Queries the NERSC Slurm database using sacct with appropriate flags to get information within a specified time
156
    window of all jobs submitted or executed during that time.
157

158
    Parameters
159
    ----------
160
    jobids : list or array of ints
161
        Slurm QID's at NERSC that you want to return information about.
162
    columns : str
163
        Comma seperated string of valid sacct column names, in lower case. To be useful for the workflow,
164
        it should have MUST have columns "JOBID" and "STATE". Other columns available that aren't included
165
        in the default list are: jobid,jobname,partition,submit,eligible,start,end,elapsed,state,exitcode.
166
        Other options include: suspended,derivedexitcode,reason,priority,jobname.
167
    dry_run : int
168
        Whether this is a simulated run or real run. If nonzero, it is a simulation and it returns a default
169
        table that doesn't query the Slurm scheduler.
170

171
    Returns
172
    -------
173
    Table
174
        Table with the columns defined by the input variable 'columns' and information relating
175
        to all jobs submitted by the specified user in the specified time frame.
176
    """
177
    log = get_logger()
×
178
    ## Turn the queue id's into a list
179
    ## this should work with str or int type also, though not officially supported
180
    qid_str = ','.join(np.atleast_1d(qids).astype(str)).replace(' ','')
×
181

182
    cmd_as_list = ['sacct', '-X', '--parsable2', '--delimiter=,',
×
183
                   f'--format={columns}', '-j', qid_str]
184
    if dry_run:
×
185
        log.info("Dry run, would have otherwise queried Slurm with the"
×
186
                 +f" following: {' '.join(cmd_as_list)}")
187
        string = 'JobID,JobName,Partition,Submit,Eligible,Start,End,State,ExitCode\n'
×
188
        string += '49482394,arc-20211102-00107062-a0123456789,realtime,2021-11-02'\
×
189
                  +'T18:31:14,2021-11-02T18:36:33,2021-11-02T18:36:33,2021-11-02T'\
190
                  +'18:48:32,COMPLETED,0:0' + '\n'
191
        string += '49482395,arc-20211102-00107063-a0123456789,realtime,2021-11-02'\
×
192
                  +'T18:31:16,2021-11-02T18:36:33,2021-11-02T18:48:34,2021-11-02T'\
193
                  +'18:57:02,COMPLETED,0:0' + '\n'
194
        string += '49482397,arc-20211102-00107064-a0123456789,realtime,2021-11-02'\
×
195
                  +'T18:31:19,2021-11-02T18:36:33,2021-11-02T18:57:05,2021-11-02T'\
196
                  +'19:06:17,COMPLETED,0:0' + '\n'
197
        string += '49482398,arc-20211102-00107065-a0123456789,realtime,2021-11-02'\
×
198
                  +'T18:31:24,2021-11-02T18:36:33,2021-11-02T19:06:18,2021-11-02T'\
199
                  +'19:13:59,COMPLETED,0:0' + '\n'
200
        string += '49482399,arc-20211102-00107066-a0123456789,realtime,2021-11-02'\
×
201
                  +'T18:31:27,2021-11-02T18:36:33,2021-11-02T19:14:00,2021-11-02T'\
202
                  +'19:24:49,COMPLETED,0:0'
203
        cmd_as_list = ['echo', string]
×
204
        table_as_string = subprocess.check_output(cmd_as_list, text=True,
×
205
                                      stderr=subprocess.STDOUT)
206
    else:
207
        log.info(f"Querying Slurm with the following: {' '.join(cmd_as_list)}")
×
208

209
        #- sacct sometimes fails; try several times before giving up
210
        max_attempts = 3
×
211
        for attempt in range(max_attempts):
×
212
            try:
×
213
                table_as_string = subprocess.check_output(cmd_as_list, text=True,
×
214
                                              stderr=subprocess.STDOUT)
215
                break
×
216
            except subprocess.CalledProcessError as err:
×
217
                log.error(f'{qid_str} job query via sacct failure at {datetime.datetime.now()}')
×
218
                log.error(f'{qid_str} {cmd_as_list}')
×
219
                log.error(f'{qid_str} {err.output=}')
×
220
        else:  #- for/else happens if loop doesn't succeed
221
            msg = f'{qid_str} job query via sacct failed {max_attempts} times; exiting'
×
222
            log.critical(msg)
×
223
            raise RuntimeError(msg)
×
224

225
    queue_info_table = Table.read(table_as_string, format='ascii.csv')
×
226

227
    for col in queue_info_table.colnames:
×
228
        queue_info_table.rename_column(col, col.upper())
×
229
    return queue_info_table
×
230

231
def update_from_queue(ptable, qtable=None, dry_run=0, ignore_scriptnames=False):
1✔
232
    """
233
    Given an input prcessing table (ptable) and query table from the Slurm queue (qtable) it cross matches the
234
    Slurm job ID's and updates the 'state' in the table using the current state in the Slurm scheduler system.
235

236
    Args:
237
        ptable, Table. Processing table that contains the jobs you want updated with the most recent queue table. Must
238
                       have at least columnns 'LATEST_QID' and 'STATUS'.
239
        qtable, Table. Table with the columns defined by the input variable 'columns' and information relating
240
                                 to all jobs submitted by the specified user in the specified time frame.
241
        ignore_scriptnames, bool. Default is False. Set to true if you do not
242
                        want to check whether the scriptname matches the jobname
243
                        return by the slurm scheduler.
244
        The following are only used if qtable is not provided:
245
            dry_run, int. Whether this is a simulated run or real run. If nonzero, it is a simulation and it returns a default
246
                           table that doesn't query the Slurm scheduler.
247

248
    Returns:
249
        ptable, Table. The same processing table as the input except that the "STATUS" column in ptable for all jobs is
250
                       updated based on the 'STATE' in the qtable (as matched by "LATEST_QID" in the ptable
251
                       and "JOBID" in the qtable).
252
    """
253
    log = get_logger()
×
254
    if qtable is None:
×
255
        log.info("qtable not provided, querying Slurm using ptable's LATEST_QID set")
×
256
        qids = np.array(ptable['LATEST_QID'])
×
257
        ## Avoid null valued QID's (set to -99)
258
        qids = qids[qids > 0]
×
259
        qtable = queue_info_from_qids(qids, dry_run=dry_run)
×
260

261
    log.info(f"Slurm returned information on {len(qtable)} jobs out of "
×
262
             +f"{len(ptable)} jobs in the ptable. Updating those now.")
263

264
    check_scriptname = ('JOBNAME' in qtable.colnames
×
265
                        and 'SCRIPTNAME' in ptable.colnames
266
                        and not ignore_scriptnames)
267
    if check_scriptname:
×
268
        log.info("Will be verifying that the file names are consistent")
×
269

270
    for row in qtable:
×
271
        match = (int(row['JOBID']) == ptable['LATEST_QID'])
×
272
        if np.any(match):
×
273
            ind = np.where(match)[0][0]
×
274
            if check_scriptname and ptable['SCRIPTNAME'][ind] not in row['JOBNAME']:
×
275
                log.warning(f"For job with expids:{ptable['EXPID'][ind]}"
×
276
                            + f" the scriptname is {ptable['SCRIPTNAME'][ind]}"
277
                            + f" but the jobname in the queue was "
278
                            + f"{row['JOBNAME']}.")
279
            state = str(row['STATE']).split(' ')[0]
×
280
            ptable['STATUS'][ind] = state
×
281

282
    return ptable
×
283

284
def any_jobs_not_complete(statuses, termination_states=None):
1✔
285
    """
286
    Returns True if any of the job statuses in the input column of the processing table, statuses, are not complete
287
    (as based on the list of acceptable final states, termination_states, given as an argument. These should be states
288
    that are viewed as final, as opposed to job states that require resubmission.
289

290
    Args:
291
        statuses, Table.Column or list or np.array. The statuses in the processing table "STATUS". Each element should
292
                                                    be a string.
293
        termination_states, list or np.array. Each element should be a string signifying a state that is returned
294
                                              by the Slurm scheduler that should be deemed terminal state.
295

296
    Returns:
297
        bool. True if any of the statuses of the jobs given in statuses are NOT a member of the termination states.
298
              Otherwise returns False.
299
    """
300
    if termination_states is None:
×
301
        termination_states = get_termination_states()
×
302
    return np.any([status not in termination_states for status in statuses])
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc