• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

desihub / desispec / 4888068324

pending completion
4888068324

Pull #2037

github-actions

GitHub
Merge b9577a3f9 into e3cee2f1c
Pull Request #2037: Workaround for sbatch bug on Perlmutter

11 of 11 new or added lines in 2 files covered. (100.0%)

10653 of 43760 relevant lines covered (24.34%)

0.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

7.36
/py/desispec/workflow/procfuncs.py
1
"""
2
desispec.workflow.procfuncs
3
===========================
4

5
"""
6
import sys, os, glob
1✔
7
import json
1✔
8
from astropy.io import fits
1✔
9
from astropy.table import Table, join
1✔
10
import numpy as np
1✔
11

12
import time, datetime
1✔
13
from collections import OrderedDict
1✔
14
import subprocess
1✔
15
from copy import deepcopy
1✔
16

17
from desispec.scripts.tile_redshifts import generate_tile_redshift_scripts
1✔
18
from desispec.workflow.redshifts import get_ztile_script_pathname, \
1✔
19
                                        get_ztile_relpath, \
20
                                        get_ztile_script_suffix
21
from desispec.workflow.queue import get_resubmission_states, update_from_queue, queue_info_from_qids
1✔
22
from desispec.workflow.timing import what_night_is_it
1✔
23
from desispec.workflow.desi_proc_funcs import get_desi_proc_batch_file_pathname, \
1✔
24
                                              create_desi_proc_batch_script, \
25
                                              get_desi_proc_batch_file_path, \
26
                                              get_desi_proc_tilenight_batch_file_pathname, \
27
                                              create_desi_proc_tilenight_batch_script
28
from desispec.workflow.utils import pathjoin, sleep_and_report
1✔
29
from desispec.workflow.tableio import write_table
1✔
30
from desispec.workflow.proctable import table_row_to_dict
1✔
31
from desiutil.log import get_logger
1✔
32

33
from desispec.io import findfile, specprod_root
1✔
34
from desispec.io.util import decode_camword, create_camword, \
1✔
35
    difference_camwords, \
36
    camword_to_spectros, camword_union, camword_intersection, parse_badamps
37

38

39
#################################################
40
############## Misc Functions ###################
41
#################################################
42
def night_to_starting_iid(night=None):
1✔
43
    """
44
    Creates an internal ID for a given night. The resulting integer is an 8 digit number.
45
    The digits are YYMMDDxxx where YY is the years since 2000, MM and DD are the month and day. xxx are 000,
46
    and are incremented for up to 1000 unique job ID's for a given night.
47

48
    Args:
49
        night (str or int): YYYYMMDD of the night to get the starting internal ID for.
50

51
    Returns:
52
        int: 9 digit number consisting of YYMMDD000. YY is years after 2000, MMDD is month and day.
53
        000 being the starting job number (0).
54
    """
55
    if night is None:
×
56
        night = what_night_is_it()
×
57
    night = int(night)
×
58
    internal_id = (night - 20000000) * 1000
×
59
    return internal_id
×
60

61

62

63
#################################################
64
############ Script Functions ###################
65
#################################################
66
def batch_script_name(prow):
1✔
67
    """
68
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, JOBDESC, PROCCAMWORD defined)
69
    and determines the script file pathname as defined by desi_proc's helper functions.
70

71
    Args:
72
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
73

74
    Returns:
75
        str: The complete pathname to the script file, as it is defined within the desi_proc ecosystem.
76
    """
77
    expids = prow['EXPID']
×
78
    if len(expids) == 0:
×
79
        expids = None
×
80
    if prow['JOBDESC'] == 'tilenight':
×
81
        pathname = get_desi_proc_tilenight_batch_file_pathname(night = prow['NIGHT'], tileid=prow['TILEID'])
×
82
    else:
83
        pathname = get_desi_proc_batch_file_pathname(night = prow['NIGHT'], exp=expids, \
×
84
                                             jobdesc=prow['JOBDESC'], cameras=prow['PROCCAMWORD'])
85
    scriptfile =  pathname + '.slurm'
×
86
    return scriptfile
×
87

88
def check_for_outputs_on_disk(prow, resubmit_partial_complete=True):
1✔
89
    """
90
    Args:
91
        prow (Table.Row or dict): Must include keyword accessible definitions for processing_table columns found in
92
            desispect.workflow.proctable.get_processing_table_column_defs()
93
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
94
            jobs with some prior data are pruned using PROCCAMWORD to only process the
95
            remaining cameras not found to exist.
96

97
    Returns:
98
        Table.Row or dict: The same prow type and keywords as input except with modified values updated to reflect
99
        the change in job status after creating and submitting the job for processing.
100
    """
101
    prow['STATUS'] = 'UNKNOWN'
×
102
    log = get_logger()
×
103

104
    job_to_file_map = {
×
105
            'prestdstar': 'sframe',
106
            'stdstarfit': 'stdstars',
107
            'poststdstar': 'cframe',
108
            'nightlybias': 'biasnight',
109
            'ccdcalib': 'badcolumns',
110
            'badcol': 'badcolumns',
111
            'arc': 'fitpsf',
112
            'flat': 'fiberflat',
113
            'psfnight': 'psfnight',
114
            'nightlyflat': 'fiberflatnight',
115
            'spectra': 'spectra_tile',
116
            'coadds': 'coadds_tile',
117
            'redshift': 'redrock_tile',
118
            }
119

120
    night = prow['NIGHT']
×
121
    if prow['JOBDESC'] in ['cumulative','pernight-v0','pernight','perexp']:
×
122
        filetype = 'redrock_tile'
×
123
    else:
124
        filetype = job_to_file_map[prow['JOBDESC']]
×
125
    orig_camword = prow['PROCCAMWORD']
×
126

127
    ## if spectro based, look for spectros, else look for cameras
128
    if prow['JOBDESC'] in ['stdstarfit','spectra','coadds','redshift']:
×
129
        ## Spectrograph based
130
        spectros = camword_to_spectros(prow['PROCCAMWORD'])
×
131
        n_desired = len(spectros)
×
132
        ## Suppress outputs about using tile based files in findfile if only looking for stdstarfits
133
        if prow['JOBDESC'] == 'stdstarfit':
×
134
            tileid = None
×
135
        else:
136
            tileid = prow['TILEID']
×
137
        expid = prow['EXPID'][0]
×
138
        existing_spectros = []
×
139
        for spectro in spectros:
×
140
            if os.path.exists(findfile(filetype=filetype, night=night, expid=expid, spectrograph=spectro, tile=tileid)):
×
141
                existing_spectros.append(spectro)
×
142
        completed = (len(existing_spectros) == n_desired)
×
143
        if not completed and resubmit_partial_complete and len(existing_spectros) > 0:
×
144
            existing_camword = 'a' + ''.join([str(spec) for spec in sorted(existing_spectros)])
×
145
            prow['PROCCAMWORD'] = difference_camwords(prow['PROCCAMWORD'],existing_camword)
×
146
    elif prow['JOBDESC'] in ['cumulative','pernight-v0','pernight','perexp']:
×
147
        ## Spectrograph based
148
        spectros = camword_to_spectros(prow['PROCCAMWORD'])
×
149
        n_desired = len(spectros)
×
150
        ## Suppress outputs about using tile based files in findfile if only looking for stdstarfits
151
        tileid = prow['TILEID']
×
152
        expid = prow['EXPID'][0]
×
153
        redux_dir = specprod_root()
×
154
        outdir = os.path.join(redux_dir,get_ztile_relpath(tileid,group=prow['JOBDESC'],night=night,expid=expid))
×
155
        suffix = get_ztile_script_suffix(tileid, group=prow['JOBDESC'], night=night, expid=expid)
×
156
        existing_spectros = []
×
157
        for spectro in spectros:
×
158
            if os.path.exists(os.path.join(outdir, f"redrock-{spectro}-{suffix}.fits")):
×
159
                existing_spectros.append(spectro)
×
160
        completed = (len(existing_spectros) == n_desired)
×
161
        if not completed and resubmit_partial_complete and len(existing_spectros) > 0:
×
162
            existing_camword = 'a' + ''.join([str(spec) for spec in sorted(existing_spectros)])
×
163
            prow['PROCCAMWORD'] = difference_camwords(prow['PROCCAMWORD'],existing_camword)
×
164
    else:
165
        ## Otheriwse camera based
166
        cameras = decode_camword(prow['PROCCAMWORD'])
×
167
        n_desired = len(cameras)
×
168
        if len(prow['EXPID']) > 0:
×
169
            expid = prow['EXPID'][0]
×
170
        else:
171
            expid = None
×
172
        if len(prow['EXPID']) > 1 and prow['JOBDESC'] not in ['psfnight','nightlyflat']:
×
173
            log.warning(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']}. This job type only makes " +
×
174
                     f"sense with a single exposure. Proceeding with {expid}.")
175
        missing_cameras = []
×
176
        for cam in cameras:
×
177
            if not os.path.exists(findfile(filetype=filetype, night=night, expid=expid, camera=cam)):
×
178
                missing_cameras.append(cam)
×
179
        completed = (len(missing_cameras) == 0)
×
180
        if not completed and resubmit_partial_complete and len(missing_cameras) < n_desired:
×
181
            prow['PROCCAMWORD'] = create_camword(missing_cameras)
×
182

183
    if completed:
×
184
        prow['STATUS'] = 'COMPLETED'
×
185
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} already has " +
×
186
                 f"the desired {n_desired} {filetype}'s. Not submitting this job.")
187
    elif resubmit_partial_complete and orig_camword != prow['PROCCAMWORD']:
×
188
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} already has " +
×
189
                 f"some {filetype}'s. Submitting smaller camword={prow['PROCCAMWORD']}.")
190
    elif not resubmit_partial_complete:
×
191
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} doesn't have all " +
×
192
                 f"{filetype}'s and resubmit_partial_complete=False. "+
193
                 f"Submitting full camword={prow['PROCCAMWORD']}.")
194
    else:
195
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} has no " +
×
196
                 f"existing {filetype}'s. Submitting full camword={prow['PROCCAMWORD']}.")
197
    return prow
×
198

199
def create_and_submit(prow, queue='realtime', reservation=None, dry_run=0, joint=False,
1✔
200
                      strictly_successful=False, check_for_outputs=True, resubmit_partial_complete=True,
201
                      system_name=None,use_specter=False,laststeps=None):
202
    """
203
    Wrapper script that takes a processing table row and three modifier keywords, creates a submission script for the
204
    compute nodes, and then submits that script to the Slurm scheduler with appropriate dependencies.
205

206
    Args:
207
        prow (Table.Row or dict): Must include keyword accessible definitions for processing_table columns found in
208
            desispect.workflow.proctable.get_processing_table_column_defs()
209
        queue (str, optional): The name of the NERSC Slurm queue to submit to. Default is the realtime queue.
210
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
211
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
212
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
213
            for testing as though scripts are being submitted. Default is 0 (false).
214
        joint (bool, optional): Whether this is a joint fitting job (the job involves multiple exposures) and therefore needs to be
215
            run with desi_proc_joint_fit. Default is False.
216
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
217
            less desirable because e.g. the sciences can run with SVN default calibrations rather
218
            than failing completely from failed calibrations. Default is False.
219
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
220
            data products for the script being submitted. If all files exist and this is True,
221
            then the script will not be submitted. If some files exist and this is True, only the
222
            subset of the cameras without the final data products will be generated and submitted.
223
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
224
            jobs with some prior data are pruned using PROCCAMWORD to only process the
225
            remaining cameras not found to exist.
226
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
227
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
228
        laststeps (list of str, optional): A list of laststeps to pass as the laststeps argument to tilenight.
229

230
    Returns:
231
        Table.Row or dict: The same prow type and keywords as input except with modified values updated to reflect
232
        the change in job status after creating and submitting the job for processing.
233

234
    Note:
235
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
236
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
237
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
238
    """
239
    orig_prow = prow.copy()
×
240
    if check_for_outputs:
×
241
        prow = check_for_outputs_on_disk(prow, resubmit_partial_complete)
×
242
        if prow['STATUS'].upper() == 'COMPLETED':
×
243
            return prow
×
244

245
    prow = create_batch_script(prow, queue=queue, dry_run=dry_run, joint=joint, system_name=system_name, use_specter=use_specter,laststeps=laststeps)
×
246
    prow = submit_batch_script(prow, reservation=reservation, dry_run=dry_run, strictly_successful=strictly_successful)
×
247
    ## If resubmitted partial, the PROCCAMWORD and SCRIPTNAME will correspond to the pruned values. But we want to
248
    ## retain the full job's value, so get those from the old job.
249
    if resubmit_partial_complete:
×
250
        prow['PROCCAMWORD'] = orig_prow['PROCCAMWORD']
×
251
        prow['SCRIPTNAME'] = orig_prow['SCRIPTNAME']
×
252
    return prow
×
253

254
def desi_proc_command(prow, system_name, use_specter=False, queue=None):
1✔
255
    """
256
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, OBSTYPE, JOBDESC, PROCCAMWORD defined)
257
    and determines the proper command line call to process the data defined by the input row/dict.
258

259
    Args:
260
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
261
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, perlmutter-gpu
262
        queue (str, optional): The name of the NERSC Slurm queue to submit to. Default is None (which leaves it to the desi_proc default).
263
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
264

265
    Returns:
266
        str: The proper command to be submitted to desi_proc to process the job defined by the prow values.
267
    """
268
    cmd = 'desi_proc'
×
269
    cmd += ' --batch'
×
270
    cmd += ' --nosubmit'
×
271
    if queue is not None:
×
272
        cmd += f' -q {queue}'
×
273
    if prow['OBSTYPE'].lower() == 'science':
×
274
        if prow['JOBDESC'] == 'prestdstar':
×
275
            cmd += ' --nostdstarfit --nofluxcalib'
×
276
        elif prow['JOBDESC'] == 'poststdstar':
×
277
            cmd += ' --noprestdstarfit --nostdstarfit'
×
278

279
        if use_specter:
×
280
            cmd += ' --use-specter'
×
281

282
    elif prow['JOBDESC'] in ['nightlybias', 'ccdcalib']:
×
283
        cmd += ' --nightlybias'
×
284
    elif prow['JOBDESC'] in ['flat', 'prestdstar'] and use_specter:
×
285
        cmd += ' --use-specter'
×
286
    pcamw = str(prow['PROCCAMWORD'])
×
287
    cmd += f" --cameras={pcamw} -n {prow['NIGHT']}"
×
288
    if len(prow['EXPID']) > 0:
×
289
        cmd += f" -e {prow['EXPID'][0]}"
×
290
    if prow['BADAMPS'] != '':
×
291
        cmd += ' --badamps={}'.format(prow['BADAMPS'])
×
292
    return cmd
×
293

294
def desi_proc_joint_fit_command(prow, queue=None):
1✔
295
    """
296
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, OBSTYPE, PROCCAMWORD defined)
297
    and determines the proper command line call to process the data defined by the input row/dict.
298

299
    Args:
300
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
301
        queue (str): The name of the NERSC Slurm queue to submit to. Default is None (which leaves it to the desi_proc default).
302

303
    Returns:
304
        str: The proper command to be submitted to desi_proc_joint_fit to process the job defined by the prow values.
305
    """
306
    cmd = 'desi_proc_joint_fit'
×
307
    cmd += ' --batch'
×
308
    cmd += ' --nosubmit'
×
309
    if queue is not None:
×
310
        cmd += f' -q {queue}'
×
311

312
    descriptor = prow['OBSTYPE'].lower()
×
313

314
    night = prow['NIGHT']
×
315
    specs = str(prow['PROCCAMWORD'])
×
316
    expid_str = ','.join([str(eid) for eid in prow['EXPID']])
×
317

318
    cmd += f' --obstype {descriptor}'
×
319
    cmd += f' --cameras={specs} -n {night}'
×
320
    if len(expid_str) > 0:
×
321
        cmd += f' -e {expid_str}'
×
322
    return cmd
×
323

324
def create_batch_script(prow, queue='realtime', dry_run=0, joint=False, system_name=None, use_specter=False, laststeps=None):
1✔
325
    """
326
    Wrapper script that takes a processing table row and three modifier keywords and creates a submission script for the
327
    compute nodes.
328

329
    Args:
330
        prow, Table.Row or dict. Must include keyword accessible definitions for processing_table columns found in
331
            desispect.workflow.proctable.get_processing_table_column_defs()
332
        queue, str. The name of the NERSC Slurm queue to submit to. Default is the realtime queue.
333
        dry_run, int. If nonzero, this is a simulated run. If dry_run=1 the scripts will be written but not submitted.
334
            If dry_run=2, the scripts will not be written nor submitted. Logging will remain the same
335
            for testing as though scripts are being submitted. Default is 0 (false).
336
        joint, bool. Whether this is a joint fitting job (the job involves multiple exposures) and therefore needs to be
337
            run with desi_proc_joint_fit when not using tilenight. Default is False.
338
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
339
        use_specter, bool, optional. Default is False. If True, use specter, otherwise use gpu_specter by default.
340
        laststeps (list of str, optional): A list of laststeps to pass as the laststeps argument to tilenight.
341

342
    Returns:
343
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
344
        scriptname.
345

346
    Note:
347
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
348
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
349
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
350
    """
351
    log = get_logger()
×
352
    if prow['JOBDESC'] in ['perexp','pernight','pernight-v0','cumulative']:
×
353
        if dry_run > 1:
×
354
            scriptpathname = get_ztile_script_pathname(tileid=prow['TILEID'],group=prow['JOBDESC'],
×
355
                                                               night=prow['NIGHT'], expid=prow['EXPID'][0])
356

357
            log.info("Output file would have been: {}".format(scriptpathname))
×
358
        else:
359
            #- run zmtl for cumulative redshifts but not others
360
            run_zmtl = (prow['JOBDESC'] == 'cumulative')
×
361
            no_afterburners = False
×
362
            scripts, failed_scripts = generate_tile_redshift_scripts(tileid=prow['TILEID'], group=prow['JOBDESC'],
×
363
                                                                     nights=[prow['NIGHT']], expids=prow['EXPID'],
364
                                                                     batch_queue=queue, system_name=system_name,
365
                                                                     run_zmtl=run_zmtl,
366
                                                                     no_afterburners=no_afterburners,
367
                                                                     nosubmit=True)
368
            if len(failed_scripts) > 0:
×
369
                log.error(f"Redshifts failed for group={prow['JOBDESC']}, night={prow['NIGHT']}, "+
×
370
                          f"tileid={prow['TILEID']}, expid={prow['EXPID']}.")
371
                log.info(f"Returned failed scriptname is {failed_scripts}")
×
372
            elif len(scripts) > 1:
×
373
                log.error(f"More than one redshifts returned for group={prow['JOBDESC']}, night={prow['NIGHT']}, "+
×
374
                          f"tileid={prow['TILEID']}, expid={prow['EXPID']}.")
375
                log.info(f"Returned scriptnames were {scripts}")
×
376
            else:
377
                scriptpathname = scripts[0]
×
378
    else:
379
        if prow['JOBDESC'] != 'tilenight':
×
380
            if joint:
×
381
                cmd = desi_proc_joint_fit_command(prow, queue=queue)
×
382
            else:
383
                cmd = desi_proc_command(prow, system_name, use_specter, queue=queue)
×
384
        if dry_run > 1:
×
385
            scriptpathname = batch_script_name(prow)
×
386
            log.info("Output file would have been: {}".format(scriptpathname))
×
387
            if prow['JOBDESC'] != 'tilenight':
×
388
                log.info("Command to be run: {}".format(cmd.split()))
×
389
        else:
390
            expids = prow['EXPID']
×
391
            if len(expids) == 0:
×
392
                expids = None
×
393

394
            if prow['JOBDESC'] == 'tilenight':
×
395
                log.info("Creating tilenight script for tile {}".format(prow['TILEID']))
×
396
                ncameras = len(decode_camword(prow['PROCCAMWORD']))
×
397
                scriptpathname = create_desi_proc_tilenight_batch_script(
×
398
                                                               night=prow['NIGHT'], exp=expids,
399
                                                               tileid=prow['TILEID'],
400
                                                               ncameras=ncameras,
401
                                                               queue=queue,
402
                                                               mpistdstars=True,
403
                                                               use_specter=use_specter,
404
                                                               system_name=system_name,
405
                                                               laststeps=laststeps)
406
            else:
407
                log.info("Running: {}".format(cmd.split()))
×
408
                scriptpathname = create_desi_proc_batch_script(
×
409
                                                               night=prow['NIGHT'], exp=expids,
410
                                                               cameras=prow['PROCCAMWORD'],
411
                                                               jobdesc=prow['JOBDESC'],
412
                                                               queue=queue, cmdline=cmd,
413
                                                               use_specter=use_specter,
414
                                                               system_name=system_name)
415
    log.info("Outfile is: {}".format(scriptpathname))
×
416
    prow['SCRIPTNAME'] = os.path.basename(scriptpathname)
×
417
    return prow
×
418

419

420
def submit_batch_script(prow, dry_run=0, reservation=None, strictly_successful=False):
1✔
421
    """
422
    Wrapper script that takes a processing table row and three modifier keywords and submits the scripts to the Slurm
423
    scheduler.
424

425
    Args:
426
        prow, Table.Row or dict. Must include keyword accessible definitions for processing_table columns found in
427
            desispect.workflow.proctable.get_processing_table_column_defs()
428
        dry_run, int. If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
429
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
430
            for testing as though scripts are being submitted. Default is 0 (false).
431
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
432
        strictly_successful, bool. Whether all jobs require all inputs to have succeeded. For daily processing, this is
433
            less desirable because e.g. the sciences can run with SVN default calibrations rather
434
            than failing completely from failed calibrations. Default is False.
435

436
    Returns:
437
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
438
        scriptname.
439

440
    Note:
441
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
442
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
443
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
444
    """
445
    log = get_logger()
×
446
    dep_qids = prow['LATEST_DEP_QID']
×
447
    dep_list, dep_str = '', ''
×
448

449
    # workaround for sbatch --dependency not working for completed jobs
450
    # see NERSC TICKET INC0203024
451
    if len(dep_qids) > 0:
×
452
        dep_table = queue_info_from_qids(np.asarray(dep_qids), columns='jobid,state')
×
453
        for row in dep_table:
×
454
            if row['STATE'] == 'COMPLETED':
×
455
                dep_qids.remove(row['JOBID'])
×
456

457
    if len(dep_qids) > 0:
×
458
        jobtype = prow['JOBDESC']
×
459
        if strictly_successful:
×
460
            depcond = 'afterok'
×
461
        elif jobtype in ['arc', 'psfnight', 'prestdstar', 'stdstarfit']:
×
462
            ## (though psfnight and stdstarfit will require some inputs otherwise they'll go up in flames)
463
            depcond = 'afterany'
×
464
        else:
465
            ## if 'flat','nightlyflat','poststdstar', or any type of redshift, require strict success of inputs
466
            depcond = 'afterok'
×
467

468
        dep_str = f'--dependency={depcond}:'
×
469

470
        if np.isscalar(dep_qids):
×
471
            dep_list = str(dep_qids).strip(' \t')
×
472
            if dep_list == '':
×
473
                dep_str = ''
×
474
            else:
475
                dep_str += dep_list
×
476
        else:
477
            if len(dep_qids)>1:
×
478
                dep_list = ':'.join(np.array(dep_qids).astype(str))
×
479
                dep_str += dep_list
×
480
            elif len(dep_qids) == 1 and dep_qids[0] not in [None, 0]:
×
481
                dep_str += str(dep_qids[0])
×
482
            else:
483
                dep_str = ''
×
484

485
    # script = f'{jobname}.slurm'
486
    # script_path = pathjoin(batchdir, script)
487
    if prow['JOBDESC'] in ['pernight-v0','pernight','perexp','cumulative']:
×
488
        script_path = get_ztile_script_pathname(tileid=prow['TILEID'],group=prow['JOBDESC'],
×
489
                                                        night=prow['NIGHT'], expid=np.min(prow['EXPID']))
490
        jobname = os.path.split(script_path)[-1]
×
491
    else:
492
        batchdir = get_desi_proc_batch_file_path(night=prow['NIGHT'])
×
493
        jobname = batch_script_name(prow)
×
494
        script_path = pathjoin(batchdir, jobname)
×
495

496
    batch_params = ['sbatch', '--parsable']
×
497
    if dep_str != '':
×
498
        batch_params.append(f'{dep_str}')
×
499
    if reservation is not None:
×
500
        batch_params.append(f'--reservation={reservation}')
×
501
    batch_params.append(f'{script_path}')
×
502

503
    if dry_run:
×
504
        ## in dry_run, mock Slurm ID's are generated using CPU seconds. Wait one second so we have unique ID's
505
        current_qid = int(time.time() - 1.6e9)
×
506
        time.sleep(1)
×
507
    else:
508
        #- sbatch sometimes fails; try several times before giving up
509
        max_attempts = 3
×
510
        for attempt in range(max_attempts):
×
511
            try:
×
512
                current_qid = subprocess.check_output(batch_params, stderr=subprocess.STDOUT, text=True)
×
513
                current_qid = int(current_qid.strip(' \t\n'))
×
514
                break
×
515
            except subprocess.CalledProcessError as err:
×
516
                log.error(f'{jobname} submission failure at {datetime.datetime.now()}')
×
517
                log.error(f'{jobname}   {batch_params}')
×
518
                log.error(f'{jobname}   {err.output=}')
×
519
                if attempt < max_attempts - 1:
×
520
                    log.info('Sleeping 60 seconds then retrying')
×
521
                    time.sleep(60)
×
522
        else:  #- for/else happens if loop doesn't succeed
523
            msg = f'{jobname} submission failed {max_attempts} times; exiting'
×
524
            log.critical(msg)
×
525
            raise RuntimeError(msg)
×
526

527
    log.info(batch_params)
×
528
    log.info(f'Submitted {jobname} with dependencies {dep_str} and reservation={reservation}. Returned qid: {current_qid}')
×
529

530
    prow['LATEST_QID'] = current_qid
×
531
    prow['ALL_QIDS'] = np.append(prow['ALL_QIDS'],current_qid)
×
532
    prow['STATUS'] = 'SUBMITTED'
×
533
    prow['SUBMIT_DATE'] = int(time.time())
×
534

535
    return prow
×
536

537

538
#############################################
539
##########   Row Manipulations   ############
540
#############################################
541
def define_and_assign_dependency(prow, calibjobs, use_tilenight=False):
1✔
542
    """
543
    Given input processing row and possible calibjobs, this defines the
544
    JOBDESC keyword and assigns the dependency appropriate for the job type of
545
    prow.
546

547
    Args:
548
        prow, Table.Row or dict. Must include keyword accessible definitions for
549
            'OBSTYPE'. A row must have column names for
550
            'JOBDESC', 'INT_DEP_IDS', and 'LATEST_DEP_ID'.
551
        calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
552
            and 'nightlyflat'. Each key corresponds to a Table.Row or
553
            None. The table.Row() values are for the corresponding
554
            calibration job. Each value that isn't None must contain
555
            'INTID', and 'LATEST_QID'. If None, it assumes the
556
            dependency doesn't exist and no dependency is assigned.
557
        use_tilenight, bool. Default is False. If True, use desi_proc_tilenight
558
            for prestdstar, stdstar,and poststdstar steps for
559
            science exposures.
560

561
    Returns:
562
        Table.Row or dict: The same prow type and keywords as input except
563
        with modified values updated values for 'JOBDESC', 'INT_DEP_IDS'. and 'LATEST_DEP_ID'.
564

565
    Note:
566
        This modifies the input. Though Table.Row objects are generally copied
567
        on modification, so the change to the input object in memory may or may
568
        not be changed. As of writing, a row from a table given to this function
569
        will not change during the execution of this function (but can be
570
        overwritten explicitly with the returned row if desired).
571
    """
572
    if prow['OBSTYPE'] in ['science', 'twiflat']:
×
573
        if calibjobs['nightlyflat'] is not None:
×
574
            dependency = calibjobs['nightlyflat']
×
575
        elif calibjobs['psfnight'] is not None:
×
576
            dependency = calibjobs['psfnight']
×
577
        elif calibjobs['ccdcalib'] is not None:
×
578
            dependency = calibjobs['ccdcalib']
×
579
        else:
580
            dependency = calibjobs['nightlybias']
×
581
        if not use_tilenight:
×
582
            prow['JOBDESC'] = 'prestdstar'
×
583
    elif prow['OBSTYPE'] == 'flat':
×
584
        if calibjobs['psfnight'] is not None:
×
585
            dependency = calibjobs['psfnight']
×
586
        elif calibjobs['ccdcalib'] is not None:
×
587
            dependency = calibjobs['ccdcalib']
×
588
        else:
589
            dependency = calibjobs['nightlybias']
×
590
    elif prow['OBSTYPE'] == 'arc':
×
591
        if calibjobs['ccdcalib'] is not None:
×
592
            dependency = calibjobs['ccdcalib']
×
593
        elif calibjobs['nightlybias'] is not None:
×
594
            dependency = calibjobs['nightlybias']
×
595
        elif calibjobs['badcol'] is not None:
×
596
            dependency = calibjobs['badcol']
×
597
        else:
598
            # arc job, but no ZEROs or DARKs to process ahead of time
599
            dependency = None
×
600
    else:
601
        dependency = None
×
602

603
    prow = assign_dependency(prow, dependency)
×
604

605
    return prow
×
606

607

608
def assign_dependency(prow, dependency):
1✔
609
    """
610
    Given input processing row and possible arcjob (processing row for psfnight) and flatjob (processing row for
611
    nightlyflat), this defines the JOBDESC keyword and assigns the dependency appropriate for the job type of prow.
612

613
    Args:
614
        prow, Table.Row or dict. Must include keyword accessible definitions for 'OBSTYPE'. A row must have column names for
615
            'JOBDESC', 'INT_DEP_IDS', and 'LATEST_DEP_ID'.
616
        dependency, NoneType or scalar/list/array of Table.Row, dict. Processing row corresponding to the required input
617
            for the job in prow. This must contain keyword
618
            accessible values for 'INTID', and 'LATEST_QID'.
619
            If None, it assumes the dependency doesn't exist
620
            and no dependency is assigned.
621

622
    Returns:
623
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
624
        'JOBDESC', 'INT_DEP_IDS'. and 'LATEST_DEP_ID'.
625

626
    Note:
627
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
628
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
629
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
630
    """
631
    prow['INT_DEP_IDS'] = np.ndarray(shape=0).astype(int)
×
632
    prow['LATEST_DEP_QID'] = np.ndarray(shape=0).astype(int)
×
633
    if dependency is not None:
×
634
        if type(dependency) in [list, np.array]:
×
635
            ids, qids = [], []
×
636
            for curdep in dependency:
×
637
                if still_a_dependency(curdep):
×
638
                    ids.append(curdep['INTID'])
×
639
                    qids.append(curdep['LATEST_QID'])
×
640
            prow['INT_DEP_IDS'] = np.array(ids, dtype=int)
×
641
            prow['LATEST_DEP_QID'] = np.array(qids, dtype=int)
×
642
        elif type(dependency) in [dict, OrderedDict, Table.Row] and still_a_dependency(dependency):
×
643
            prow['INT_DEP_IDS'] = np.array([dependency['INTID']], dtype=int)
×
644
            prow['LATEST_DEP_QID'] = np.array([dependency['LATEST_QID']], dtype=int)
×
645
    return prow
×
646

647
def still_a_dependency(dependency):
1✔
648
    """
649
    Defines the criteria for which a dependency is deemed complete (and therefore no longer a dependency).
650

651
     Args:
652
        dependency, Table.Row or dict. Processing row corresponding to the required input for the job in prow.
653
            This must contain keyword accessible values for 'STATUS', and 'LATEST_QID'.
654

655
    Returns:
656
        bool: False if the criteria indicate that the dependency is completed and no longer a blocking factor (ie no longer
657
        a genuine dependency). Returns True if the dependency is still a blocking factor such that the slurm
658
        scheduler needs to be aware of the pending job.
659

660
    """
661
    return dependency['LATEST_QID'] > 0 and dependency['STATUS'] != 'COMPLETED'
×
662

663
def get_type_and_tile(erow):
1✔
664
    """
665
    Trivial function to return the OBSTYPE and the TILEID from an exposure table row
666

667
    Args:
668
        erow, Table.Row or dict. Must contain 'OBSTYPE' and 'TILEID' as keywords.
669

670
    Returns:
671
        tuple (str, str), corresponding to the OBSTYPE and TILEID values of the input erow.
672
    """
673
    return str(erow['OBSTYPE']).lower(), erow['TILEID']
×
674

675

676
#############################################
677
#########   Table manipulators   ############
678
#############################################
679
def parse_previous_tables(etable, ptable, night):
1✔
680
    """
681
    This takes in the exposure and processing tables and regenerates all the working memory variables needed for the
682
    daily processing script.
683

684
    Used by the daily processing to define most of its state-ful variables into working memory.
685
    If the processing table is empty, these are simply declared and returned for use.
686
    If the code had previously run and exited (or crashed), however, this will all the code to
687
    re-establish itself by redefining these values.
688

689
    Args:
690
        etable, Table, Exposure table of all exposures that have been dealt with thus far.
691
        ptable, Table, Processing table of all exposures that have been processed.
692
        night, str or int, the night the data was taken.
693

694
    Returns:
695
        tuple: A tuple containing:
696

697
        * arcs, list of dicts, list of the individual arc jobs used for the psfnight (NOT all
698
          the arcs, if multiple sets existed)
699
        * flats, list of dicts, list of the individual flat jobs used for the nightlyflat (NOT
700
          all the flats, if multiple sets existed)
701
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
702
          (if currently processing that tile)
703
        * calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'badcol', 'psfnight'
704
          and 'nightlyflat'. Each key corresponds to a Table.Row or
705
          None. The table.Row() values are for the corresponding
706
          calibration job.
707
        * curtype, None, the obstype of the current job being run. Always None as first new job will define this.
708
        * lasttype, str or None, the obstype of the last individual exposure row to be processed.
709
        * curtile, None, the tileid of the current job (if science). Otherwise None. Always None as first
710
          new job will define this.
711
        * lasttile, str or None, the tileid of the last job (if science). Otherwise None.
712
        * internal_id, int, an internal identifier unique to each job. Increments with each new job. This
713
          is the latest unassigned value.
714
    """
715
    log = get_logger()
×
716
    arcs, flats, sciences = [], [], []
×
717
    calibjobs = {'nightlybias': None, 'ccdcalib': None, 'badcol': None, 'psfnight': None,
×
718
                 'nightlyflat': None}
719
    curtype,lasttype = None,None
×
720
    curtile,lasttile = None,None
×
721

722
    if len(ptable) > 0:
×
723
        prow = ptable[-1]
×
724
        internal_id = int(prow['INTID'])+1
×
725
        lasttype,lasttile = get_type_and_tile(ptable[-1])
×
726
        jobtypes = ptable['JOBDESC']
×
727

728
        if 'nightlybias' in jobtypes:
×
729
            calibjobs['nightlybias'] = table_row_to_dict(ptable[jobtypes=='nightlybias'][0])
×
730
            log.info("Located nightlybias job in exposure table: {}".format(calibjobs['nightlybias']))
×
731

732
        if 'ccdcalib' in jobtypes:
×
733
            calibjobs['ccdcalib'] = table_row_to_dict(ptable[jobtypes=='ccdcalib'][0])
×
734
            log.info("Located ccdcalib job in exposure table: {}".format(calibjobs['ccdcalib']))
×
735

736
        if 'psfnight' in jobtypes:
×
737
            calibjobs['psfnight'] = table_row_to_dict(ptable[jobtypes=='psfnight'][0])
×
738
            log.info("Located joint fit psfnight job in exposure table: {}".format(calibjobs['psfnight']))
×
739
        elif lasttype == 'arc':
×
740
            seqnum = 10
×
741
            for row in ptable[::-1]:
×
742
                erow = etable[etable['EXPID']==row['EXPID'][0]]
×
743
                if row['OBSTYPE'].lower() == 'arc' and int(erow['SEQNUM'])<seqnum:
×
744
                    arcs.append(table_row_to_dict(row))
×
745
                    seqnum = int(erow['SEQNUM'])
×
746
                else:
747
                    break
×
748
            ## Because we work backword to fill in, we need to reverse them to get chronological order back
749
            arcs = arcs[::-1]
×
750

751
        if 'nightlyflat' in jobtypes:
×
752
            calibjobs['nightlyflat'] = table_row_to_dict(ptable[jobtypes=='nightlyflat'][0])
×
753
            log.info("Located joint fit nightlyflat job in exposure table: {}".format(calibjobs['nightlyflat']))
×
754
        elif lasttype == 'flat':
×
755
            for row in ptable[::-1]:
×
756
                erow = etable[etable['EXPID']==row['EXPID'][0]]
×
757
                if row['OBSTYPE'].lower() == 'flat' and int(erow['SEQTOT']) < 5:
×
758
                    if float(erow['EXPTIME']) > 100.:
×
759
                        flats.append(table_row_to_dict(row))
×
760
                else:
761
                    break
×
762
            flats = flats[::-1]
×
763

764
        if lasttype.lower() == 'science':
×
765
            for row in ptable[::-1]:
×
766
                if row['OBSTYPE'].lower() == 'science' and row['TILEID'] == lasttile and \
×
767
                   row['JOBDESC'] == 'prestdstar' and row['LASTSTEP'] != 'skysub':
768
                    sciences.append(table_row_to_dict(row))
×
769
                else:
770
                    break
×
771
            sciences = sciences[::-1]
×
772
    else:
773
        internal_id = night_to_starting_iid(night)
×
774

775
    return arcs,flats,sciences, \
×
776
           calibjobs, \
777
           curtype, lasttype, \
778
           curtile, lasttile,\
779
           internal_id
780

781

782
def update_and_recurvsively_submit(proc_table, submits=0, resubmission_states=None,
1✔
783
                                   ptab_name=None, dry_run=0,reservation=None):
784
    """
785
    Given an processing table, this loops over job rows and resubmits failed jobs (as defined by resubmission_states).
786
    Before submitting a job, it checks the dependencies for failures. If a dependency needs to be resubmitted, it recursively
787
    follows dependencies until it finds the first job without a failed dependency and resubmits that. Then resubmits the
788
    other jobs with the new Slurm jobID's for proper dependency coordination within Slurm.
789

790
    Args:
791
        proc_table, Table, the processing table with a row per job.
792
        submits, int, the number of submissions made to the queue. Used for saving files and in not overloading the scheduler.
793
        resubmission_states, list or array of strings, each element should be a capitalized string corresponding to a
794
            possible Slurm scheduler state, where you wish for jobs with that
795
            outcome to be resubmitted
796
        ptab_name, str, the full pathname where the processing table should be saved.
797
        dry_run, int, If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
798
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
799
            for testing as though scripts are being submitted. Default is 0 (false).
800
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
801

802
    Returns:
803
        tuple: A tuple containing:
804

805
        * proc_table: Table, a table with the same rows as the input except that Slurm and jobid relevant columns have
806
          been updated for those jobs that needed to be resubmitted.
807
        * submits: int, the number of submissions made to the queue. This is incremented from the input submits, so it is
808
          the number of submissions made from this function call plus the input submits value.
809

810
    Note:
811
        This modifies the inputs of both proc_table and submits and returns them.
812
    """
813
    log = get_logger()
×
814
    if resubmission_states is None:
×
815
        resubmission_states = get_resubmission_states()
×
816
    log.info(f"Resubmitting jobs with current states in the following: {resubmission_states}")
×
817
    proc_table = update_from_queue(proc_table, dry_run=False)
×
818
    log.info("Updated processing table queue information:")
×
819
    cols = ['INTID', 'INT_DEP_IDS', 'EXPID', 'TILEID',
×
820
            'OBSTYPE', 'JOBDESC', 'LATEST_QID', 'STATUS']
821
    print(np.array(cols))
×
822
    for row in proc_table:
×
823
        print(np.array(row[cols]))
×
824
    print("\n")
×
825
    id_to_row_map = {row['INTID']: rown for rown, row in enumerate(proc_table)}
×
826
    for rown in range(len(proc_table)):
×
827
        if proc_table['STATUS'][rown] in resubmission_states:
×
828
            proc_table, submits = recursive_submit_failed(rown, proc_table, submits,
×
829
                                                          id_to_row_map, ptab_name,
830
                                                          resubmission_states,
831
                                                          reservation, dry_run)
832
    return proc_table, submits
×
833

834
def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name=None,
1✔
835
                            resubmission_states=None, reservation=None, dry_run=0):
836
    """
837
    Given a row of a processing table and the full processing table, this resubmits the given job.
838
    Before submitting a job, it checks the dependencies for failures in the processing table. If a dependency needs to
839
    be resubmitted, it recursively follows dependencies until it finds the first job without a failed dependency and
840
    resubmits that. Then resubmits the other jobs with the new Slurm jobID's for proper dependency coordination within Slurm.
841

842
    Args:
843
        rown, Table.Row, the row of the processing table that you want to resubmit.
844
        proc_table, Table, the processing table with a row per job.
845
        submits, int, the number of submissions made to the queue. Used for saving files and in not overloading the scheduler.
846
        id_to_row_map, dict, lookup dictionary where the keys are internal ids (INTID's) and the values are the row position
847
            in the processing table.
848
        ptab_name, str, the full pathname where the processing table should be saved.
849
        resubmission_states, list or array of strings, each element should be a capitalized string corresponding to a
850
            possible Slurm scheduler state, where you wish for jobs with that
851
            outcome to be resubmitted
852
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
853
        dry_run, int, If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
854
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
855
            for testing as though scripts are being submitted. Default is 0 (false).
856

857
    Returns:
858
        tuple: A tuple containing:
859

860
        * proc_table: Table, a table with the same rows as the input except that Slurm and jobid relevant columns have
861
          been updated for those jobs that needed to be resubmitted.
862
        * submits: int, the number of submissions made to the queue. This is incremented from the input submits, so it is
863
          the number of submissions made from this function call plus the input submits value.
864

865
    Note:
866
        This modifies the inputs of both proc_table and submits and returns them.
867
    """
868
    log = get_logger()
×
869
    row = proc_table[rown]
×
870
    log.info(f"Identified row {row['INTID']} as needing resubmission.")
×
871
    log.info(f"{row['INTID']}: Expid(s): {row['EXPID']}  Job: {row['JOBDESC']}")
×
872
    if resubmission_states is None:
×
873
        resubmission_states = get_resubmission_states()
×
874
    ideps = proc_table['INT_DEP_IDS'][rown]
×
875
    if ideps is None:
×
876
        proc_table['LATEST_DEP_QID'][rown] = np.ndarray(shape=0).astype(int)
×
877
    else:
878
        all_valid_states = list(resubmission_states.copy())
×
879
        all_valid_states.extend(['RUNNING','PENDING','SUBMITTED','COMPLETED'])
×
880
        for idep in np.sort(np.atleast_1d(ideps)):
×
881
            if proc_table['STATUS'][id_to_row_map[idep]] not in all_valid_states:
×
882
                log.warning(f"Proc INTID: {proc_table['INTID'][rown]} depended on" +
×
883
                            f" INTID {proc_table['INTID'][id_to_row_map[idep]]}" +
884
                            f" but that exposure has state" +
885
                            f" {proc_table['STATUS'][id_to_row_map[idep]]} that" +
886
                            f" isn't in the list of resubmission states." +
887
                            f" Exiting this job's resubmission attempt.")
888
                proc_table['STATUS'][rown] = "DEP_NOT_SUBD"
×
889
                return proc_table, submits
×
890
        qdeps = []
×
891
        for idep in np.sort(np.atleast_1d(ideps)):
×
892
            if proc_table['STATUS'][id_to_row_map[idep]] in resubmission_states:
×
893
                proc_table, submits = recursive_submit_failed(id_to_row_map[idep],
×
894
                                                              proc_table, submits,
895
                                                              id_to_row_map,
896
                                                              reservation=reservation,
897
                                                              dry_run=dry_run)
898
            qdeps.append(proc_table['LATEST_QID'][id_to_row_map[idep]])
×
899

900
        qdeps = np.atleast_1d(qdeps)
×
901
        if len(qdeps) > 0:
×
902
            proc_table['LATEST_DEP_QID'][rown] = qdeps
×
903
        else:
904
            log.error(f"number of qdeps should be 1 or more: Rown {rown}, ideps {ideps}")
×
905

906
    proc_table[rown] = submit_batch_script(proc_table[rown], reservation=reservation,
×
907
                                           strictly_successful=True, dry_run=dry_run)
908
    submits += 1
×
909

910
    if not dry_run:
×
911
        sleep_and_report(1, message_suffix=f"after submitting job to queue")
×
912
        if submits % 10 == 0:
×
913
            if ptab_name is None:
×
914
                write_table(proc_table, tabletype='processing', overwrite=True)
×
915
            else:
916
                write_table(proc_table, tablename=ptab_name, overwrite=True)
×
917
            sleep_and_report(2, message_suffix=f"after writing to disk")
×
918
        if submits % 100 == 0:
×
919
            proc_table = update_from_queue(proc_table)
×
920
            if ptab_name is None:
×
921
                write_table(proc_table, tabletype='processing', overwrite=True)
×
922
            else:
923
                write_table(proc_table, tablename=ptab_name, overwrite=True)
×
924
            sleep_and_report(10, message_suffix=f"after updating queue and writing to disk")
×
925
    return proc_table, submits
×
926

927

928
#########################################
929
########     Joint fit     ##############
930
#########################################
931
def joint_fit(ptable, prows, internal_id, queue, reservation, descriptor, z_submit_types=None,
1✔
932
              dry_run=0, strictly_successful=False, check_for_outputs=True, resubmit_partial_complete=True,
933
              system_name=None):
934
    """
935
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
936
    joint fitting job given by descriptor. If the joint fitting job is standard star fitting, the post standard star fits
937
    for all the individual exposures also created and submitted. The returned ptable has all of these rows added to the
938
    table given as input.
939

940
    Args:
941
        ptable (Table): The processing table where each row is a processed job.
942
        prows (list or array of dict): The rows corresponding to the individual exposure jobs that are
943
            inputs to the joint fit.
944
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
945
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
946
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
947
        descriptor (str): Description of the joint fitting job. Can either be 'science' or 'stdstarfit', 'arc' or 'psfnight',
948
            or 'flat' or 'nightlyflat'.
949
        z_submit_types (list of str, optional): The "group" types of redshifts that should be submitted with each
950
            exposure. If not specified or None, then no redshifts are submitted.
951
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
952
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
953
            for testing as though scripts are being submitted. Default is 0 (false).
954
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
955
            less desirable because e.g. the sciences can run with SVN default calibrations rather
956
            than failing completely from failed calibrations. Default is False.
957
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
958
            data products for the script being submitted. If all files exist and this is True,
959
            then the script will not be submitted. If some files exist and this is True, only the
960
            subset of the cameras without the final data products will be generated and submitted.
961
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
962
            jobs with some prior data are pruned using PROCCAMWORD to only process the
963
            remaining cameras not found to exist.
964
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
965

966
    Returns:
967
        tuple: A tuple containing:
968

969
        * ptable, Table. The same processing table as input except with added rows for the joint fit job and, in the case
970
          of a stdstarfit, the poststdstar science exposure jobs.
971
        * joint_prow, dict. Row of a processing table corresponding to the joint fit job.
972
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
973
    """
974
    log = get_logger()
×
975
    if len(prows) < 1:
×
976
        return ptable, None, internal_id
×
977

978
    if descriptor is None:
×
979
        return ptable, None
×
980
    elif descriptor == 'arc':
×
981
        descriptor = 'psfnight'
×
982
    elif descriptor == 'flat':
×
983
        descriptor = 'nightlyflat'
×
984
    elif descriptor == 'science':
×
985
        if z_submit_types is None or len(z_submit_types) == 0:
×
986
            descriptor = 'stdstarfit'
×
987

988
    if descriptor not in ['psfnight', 'nightlyflat', 'science','stdstarfit']:
×
989
        return ptable, None, internal_id
×
990

991
    log.info(" ")
×
992
    log.info(f"Joint fit criteria found. Running {descriptor}.\n")
×
993

994
    if descriptor == 'science':
×
995
        joint_prow = make_joint_prow(prows, descriptor='stdstarfit', internal_id=internal_id)
×
996
    else:
997
        joint_prow = make_joint_prow(prows, descriptor=descriptor, internal_id=internal_id)
×
998
    internal_id += 1
×
999
    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1000
                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1001
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1002
    ptable.add_row(joint_prow)
×
1003

1004
    if descriptor in ['science','stdstarfit']:
×
1005
        if descriptor == 'science':
×
1006
            zprows = []
×
1007
        log.info(" ")
×
1008
        log.info(f"Submitting individual science exposures now that joint fitting of standard stars is submitted.\n")
×
1009
        for row in prows:
×
1010
            if row['LASTSTEP'] == 'stdstarfit':
×
1011
                continue
×
1012
            row['JOBDESC'] = 'poststdstar'
×
1013

1014
            # poststdstar job can't process cameras not included in its stdstar joint fit
1015
            stdcamword = joint_prow['PROCCAMWORD']
×
1016
            thiscamword = row['PROCCAMWORD']
×
1017
            proccamword = camword_intersection([stdcamword, thiscamword])
×
1018
            if proccamword != thiscamword:
×
1019
                dropcams = difference_camwords(thiscamword, proccamword)
×
1020
                assert dropcams != ''  #- i.e. if they differ, we should be dropping something
×
1021
                log.warning(f"Dropping exp {row['EXPID']} poststdstar cameras {dropcams} since they weren't included in stdstar fit {stdcamword}")
×
1022
                row['PROCCAMWORD'] = proccamword
×
1023

1024
            row['INTID'] = internal_id
×
1025
            internal_id += 1
×
1026
            row['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
×
1027
            row = assign_dependency(row, joint_prow)
×
1028
            row = create_and_submit(row, queue=queue, reservation=reservation, dry_run=dry_run,
×
1029
                                    strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1030
                                    resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1031
            ptable.add_row(row)
×
1032
            if descriptor == 'science' and row['LASTSTEP'] == 'all':
×
1033
                zprows.append(row)
×
1034

1035
    ## Now run redshifts
1036
    if descriptor == 'science' and len(zprows) > 0 and z_submit_types is not None:
×
1037
        prow_selection = (  (ptable['OBSTYPE'] == 'science')
×
1038
                          & (ptable['LASTSTEP'] == 'all')
1039
                          & (ptable['JOBDESC'] == 'poststdstar')
1040
                          & (ptable['TILEID'] == int(zprows[0]['TILEID'])) )
1041
        nightly_zprows = []
×
1042
        if np.sum(prow_selection) == len(zprows):
×
1043
            nightly_zprows = zprows.copy()
×
1044
        else:
1045
            for prow in ptable[prow_selection]:
×
1046
                nightly_zprows.append(table_row_to_dict(prow))
×
1047

1048
        for zsubtype in z_submit_types:
×
1049
            if zsubtype == 'perexp':
×
1050
                for zprow in zprows:
×
1051
                    log.info(" ")
×
1052
                    log.info(f"Submitting redshift fit of type {zsubtype} for TILEID {zprow['TILEID']} and EXPID {zprow['EXPID']}.\n")
×
1053
                    joint_prow = make_joint_prow([zprow], descriptor=zsubtype, internal_id=internal_id)
×
1054
                    internal_id += 1
×
1055
                    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1056
                                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1057
                                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1058
                    ptable.add_row(joint_prow)
×
1059
            else:
1060
                log.info(" ")
×
1061
                log.info(f"Submitting joint redshift fits of type {zsubtype} for TILEID {nightly_zprows[0]['TILEID']}.")
×
1062
                expids = [prow['EXPID'][0] for prow in nightly_zprows]
×
1063
                log.info(f"Expids: {expids}.\n")
×
1064
                joint_prow = make_joint_prow(nightly_zprows, descriptor=zsubtype, internal_id=internal_id)
×
1065
                internal_id += 1
×
1066
                joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1067
                                               strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1068
                                               resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1069
                ptable.add_row(joint_prow)
×
1070

1071
    if descriptor in ['psfnight', 'nightlyflat']:
×
1072
        log.info(f"Setting the calibration exposures as calibrators in the processing table.\n")
×
1073
        ptable = set_calibrator_flag(prows, ptable)
×
1074

1075
    return ptable, joint_prow, internal_id
×
1076

1077
#########################################
1078
########     Redshifts     ##############
1079
#########################################
1080
def submit_redshifts(ptable, prows, tnight, internal_id, queue, reservation,
1✔
1081
              dry_run=0, strictly_successful=False,
1082
              check_for_outputs=True, resubmit_partial_complete=True,
1083
              z_submit_types=None, system_name=None):
1084
    """
1085
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1086
    tilenight job given by descriptor. The returned ptable has all of these rows added to the
1087
    table given as input.
1088

1089
    Args:
1090
        ptable (Table): The processing table where each row is a processed job.
1091
        prows (list or array of dict): Unsubmitted prestdstar jobs that are first steps in tilenight.
1092
        tnight (Table.Row): The processing table row of the tilenight job on which the redshifts depend.
1093
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1094
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1095
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1096
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1097
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1098
            for testing as though scripts are being submitted. Default is 0 (false).
1099
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1100
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1101
            than failing completely from failed calibrations. Default is False.
1102
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1103
            data products for the script being submitted. If all files exist and this is True,
1104
            then the script will not be submitted. If some files exist and this is True, only the
1105
            subset of the cameras without the final data products will be generated and submitted.
1106
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1107
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1108
            remaining cameras not found to exist.
1109
        z_submit_types (list of str): The "group" types of redshifts that should be submitted with each
1110
            exposure. If not specified or None, then no redshifts are submitted.
1111
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1112

1113
    Returns:
1114
        tuple: A tuple containing:
1115

1116
        * ptable, Table. The same processing table as input except with added rows for the joint fit job.
1117
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1118
    """
1119
    log = get_logger()
×
1120
    if len(prows) < 1 or z_submit_types == None:
×
1121
        return ptable, internal_id
×
1122

1123
    log.info(" ")
×
1124
    log.info(f"Running redshifts.\n")
×
1125

1126
    ## Now run redshifts
1127
    zprows = []
×
1128
    for row in prows:
×
1129
        if row['LASTSTEP'] == 'all':
×
1130
            zprows.append(row)
×
1131

1132
    if len(zprows) > 0:
×
1133
        for zsubtype in z_submit_types:
×
1134
            if zsubtype == 'perexp':
×
1135
                for zprow in zprows:
×
1136
                    log.info(" ")
×
1137
                    log.info(f"Submitting redshift fit of type {zsubtype} for TILEID {zprow['TILEID']} and EXPID {zprow['EXPID']}.\n")
×
1138
                    redshift_prow = make_redshift_prow([zprow], tnight, descriptor=zsubtype, internal_id=internal_id)
×
1139
                    internal_id += 1
×
1140
                    redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1141
                                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1142
                                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1143
                    ptable.add_row(redshift_prow)
×
1144
            else:
1145
                log.info(" ")
×
1146
                log.info(f"Submitting joint redshift fits of type {zsubtype} for TILEID {zprows[0]['TILEID']}.")
×
1147
                expids = [prow['EXPID'][0] for prow in zprows]
×
1148
                log.info(f"Expids: {expids}.\n")
×
1149
                redshift_prow = make_redshift_prow(zprows, tnight, descriptor=zsubtype, internal_id=internal_id)
×
1150
                internal_id += 1
×
1151
                redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1152
                                               strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1153
                                               resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1154
                ptable.add_row(redshift_prow)
×
1155

1156
    return ptable, internal_id
×
1157

1158
#########################################
1159
########     Tilenight     ##############
1160
#########################################
1161
def submit_tilenight(ptable, prows, calibjobs, internal_id, queue, reservation,
1✔
1162
              dry_run=0, strictly_successful=False, resubmit_partial_complete=True,
1163
              system_name=None,use_specter=False,laststeps=None):
1164
    """
1165
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1166
    tilenight job given by descriptor. The returned ptable has all of these rows added to the
1167
    table given as input.
1168

1169
    Args:
1170
        ptable (Table): The processing table where each row is a processed job.
1171
        prows (list or array of dict): Unsubmitted prestdstar jobs that are first steps in tilenight.
1172
        calibjobs (dict): Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
1173
            and 'nightlyflat'. Each key corresponds to a Table.Row or
1174
            None. The table.Row() values are for the corresponding
1175
            calibration job.
1176
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1177
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1178
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1179
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1180
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1181
            for testing as though scripts are being submitted. Default is 0 (false).
1182
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1183
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1184
            than failing completely from failed calibrations. Default is False.
1185
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1186
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1187
            remaining cameras not found to exist.
1188
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1189
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
1190
        laststeps (list of str, optional): A list of laststeps to pass as the laststeps argument to tilenight.
1191

1192
    Returns:
1193
        tuple: A tuple containing:
1194

1195
        * ptable, Table. The same processing table as input except with added rows for the joint fit job.
1196
        * tnight_prow, dict. Row of a processing table corresponding to the tilenight job.
1197
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1198
    """
1199
    log = get_logger()
×
1200
    if len(prows) < 1:
×
1201
        return ptable, None, internal_id
×
1202

1203
    log.info(" ")
×
1204
    log.info(f"Running tilenight.\n")
×
1205

1206
    tnight_prow = make_tnight_prow(prows, calibjobs, internal_id=internal_id)
×
1207
    internal_id += 1
×
1208
    tnight_prow = create_and_submit(tnight_prow, queue=queue, reservation=reservation, dry_run=dry_run,
×
1209
                                   strictly_successful=strictly_successful, check_for_outputs=False,
1210
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name,
1211
                                   use_specter=use_specter,laststeps=laststeps)
1212
    ptable.add_row(tnight_prow)
×
1213

1214
    return ptable, tnight_prow, internal_id
×
1215

1216
## wrapper functions for joint fitting
1217
def science_joint_fit(ptable, sciences, internal_id, queue='realtime', reservation=None,
1✔
1218
                      z_submit_types=None, dry_run=0, strictly_successful=False,
1219
                      check_for_outputs=True, resubmit_partial_complete=True,
1220
                      system_name=None):
1221
    """
1222
    Wrapper function for desiproc.workflow.procfuns.joint_fit specific to the stdstarfit joint fit and redshift fitting.
1223

1224
    All variables are the same except::
1225

1226
        Arg 'sciences' is mapped to the prows argument of joint_fit.
1227
        The joint_fit argument descriptor is pre-defined as 'science'.
1228
    """
1229
    return joint_fit(ptable=ptable, prows=sciences, internal_id=internal_id, queue=queue, reservation=reservation,
×
1230
                     descriptor='science', z_submit_types=z_submit_types, dry_run=dry_run,
1231
                     strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1232
                     resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1233

1234

1235
def flat_joint_fit(ptable, flats, internal_id, queue='realtime',
1✔
1236
                   reservation=None, dry_run=0, strictly_successful=False,
1237
                   check_for_outputs=True, resubmit_partial_complete=True,
1238
                   system_name=None):
1239
    """
1240
    Wrapper function for desiproc.workflow.procfuns.joint_fit specific to the nightlyflat joint fit.
1241

1242
    All variables are the same except::
1243

1244
        Arg 'flats' is mapped to the prows argument of joint_fit.
1245
        The joint_fit argument descriptor is pre-defined as 'nightlyflat'.
1246
    """
1247
    return joint_fit(ptable=ptable, prows=flats, internal_id=internal_id, queue=queue, reservation=reservation,
×
1248
                     descriptor='nightlyflat', dry_run=dry_run, strictly_successful=strictly_successful,
1249
                     check_for_outputs=check_for_outputs, resubmit_partial_complete=resubmit_partial_complete,
1250
                     system_name=system_name)
1251

1252

1253
def arc_joint_fit(ptable, arcs, internal_id, queue='realtime',
1✔
1254
                  reservation=None, dry_run=0, strictly_successful=False,
1255
                  check_for_outputs=True, resubmit_partial_complete=True,
1256
                  system_name=None):
1257
    """
1258
    Wrapper function for desiproc.workflow.procfuns.joint_fit specific to the psfnight joint fit.
1259

1260
    All variables are the same except::
1261

1262
        Arg 'arcs' is mapped to the prows argument of joint_fit.
1263
        The joint_fit argument descriptor is pre-defined as 'psfnight'.
1264
    """
1265
    return joint_fit(ptable=ptable, prows=arcs, internal_id=internal_id, queue=queue, reservation=reservation,
×
1266
                     descriptor='psfnight', dry_run=dry_run, strictly_successful=strictly_successful,
1267
                     check_for_outputs=check_for_outputs, resubmit_partial_complete=resubmit_partial_complete,
1268
                     system_name=system_name)
1269

1270

1271
def make_joint_prow(prows, descriptor, internal_id):
1✔
1272
    """
1273
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
1274
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
1275
    input prows).
1276

1277
    Args:
1278
        prows, list or array of dicts. The rows corresponding to the individual exposure jobs that are
1279
            inputs to the joint fit.
1280
        descriptor, str. Description of the joint fitting job. Can either be 'stdstarfit', 'psfnight', or 'nightlyflat'.
1281
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1282

1283
    Returns:
1284
        dict: Row of a processing table corresponding to the joint fit job.
1285
    """
1286
    first_row = prows[0]
×
1287
    joint_prow = first_row.copy()
×
1288

1289
    joint_prow['INTID'] = internal_id
×
1290
    joint_prow['JOBDESC'] = descriptor
×
1291
    joint_prow['LATEST_QID'] = -99
×
1292
    joint_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
×
1293
    joint_prow['SUBMIT_DATE'] = -99
×
1294
    joint_prow['STATUS'] = 'U'
×
1295
    joint_prow['SCRIPTNAME'] = ''
×
1296
    joint_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int)
×
1297

1298
    ## Assign the PROCCAMWORD based on the descriptor and the input exposures
1299
    if descriptor == 'stdstarfit':
×
1300
        pcamwords = [prow['PROCCAMWORD'] for prow in prows]
×
1301
        joint_prow['PROCCAMWORD'] = camword_union(pcamwords,
×
1302
                                                  full_spectros_only=True)
1303
    else:
1304
        ## For arcs and flats, a BADAMP takes out the camera, so remove those
1305
        ## cameras from the proccamword
1306
        pcamwords = []
×
1307
        for prow in prows:
×
1308
            if len(prow['BADAMPS']) > 0:
×
1309
                badcams = []
×
1310
                for (camera, petal, amplifier) in parse_badamps(prow['BADAMPS']):
×
1311
                    badcams.append(f'{camera}{petal}')
×
1312
                badampcamword = create_camword(list(set(badcams)))
×
1313
                pcamword = difference_camwords(prow['PROCCAMWORD'], badampcamword)
×
1314
            else:
1315
                pcamword = prow['PROCCAMWORD']
×
1316
            pcamwords.append(pcamword)
×
1317

1318
        ## For flats we want any camera that exists in all 12 exposures
1319
        ## For arcs we want any camera that exists in at least 3 exposures
1320
        if descriptor == 'nightlyflat':
×
1321
            joint_prow['PROCCAMWORD'] = camword_intersection(pcamwords,
×
1322
                                                             full_spectros_only=False)
1323
        elif descriptor == 'psfnight':
×
1324
            ## Count number of exposures each camera is present for
1325
            camcheck = {}
×
1326
            for camword in pcamwords:
×
1327
                for cam in decode_camword(camword):
×
1328
                    if cam in camcheck:
×
1329
                        camcheck[cam] += 1
×
1330
                    else:
1331
                        camcheck[cam] = 1
×
1332
            ## if exists in 3 or more exposures, then include it
1333
            goodcams = []
×
1334
            for cam,camcount in camcheck.items():
×
1335
                if camcount >= 3:
×
1336
                    goodcams.append(cam)
×
1337
            joint_prow['PROCCAMWORD'] = create_camword(goodcams)
×
1338

1339
    joint_prow = assign_dependency(joint_prow,dependency=prows)
×
1340
    return joint_prow
×
1341

1342
def make_tnight_prow(prows, calibjobs, internal_id):
1✔
1343
    """
1344
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
1345
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
1346
    input prows).
1347

1348
    Args:
1349
        prows, list or array of dicts. Unsumbitted rows corresponding to the individual prestdstar jobs that are
1350
            the first steps of tilenight.
1351
        calibjobs, dict. Dictionary containing keys that each corresponds to a Table.Row or
1352
            None, with each table.Row() value corresponding to a calibration job
1353
            on which the tilenight job depends.
1354
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1355

1356
    Returns:
1357
        dict: Row of a processing table corresponding to the tilenight job.
1358
    """
1359
    first_row = prows[0]
×
1360
    joint_prow = first_row.copy()
×
1361

1362
    joint_prow['INTID'] = internal_id
×
1363
    joint_prow['JOBDESC'] = 'tilenight'
×
1364
    joint_prow['LATEST_QID'] = -99
×
1365
    joint_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
×
1366
    joint_prow['SUBMIT_DATE'] = -99
×
1367
    joint_prow['STATUS'] = 'U'
×
1368
    joint_prow['SCRIPTNAME'] = ''
×
1369
    joint_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int)
×
1370

1371
    joint_prow = define_and_assign_dependency(joint_prow,calibjobs,use_tilenight=True)
×
1372

1373
    return joint_prow
×
1374

1375
def make_redshift_prow(prows, tnight, descriptor, internal_id):
1✔
1376
    """
1377
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
1378
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
1379
    input prows).
1380

1381
    Args:
1382
        prows, list or array of dicts. Unsumbitted rows corresponding to the individual prestdstar jobs that are
1383
            the first steps of tilenight.
1384
        tnight, Table.Row object. Row corresponding to the tilenight job on which the redshift job depends.
1385
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1386

1387
    Returns:
1388
        dict: Row of a processing table corresponding to the tilenight job.
1389
    """
1390
    first_row = prows[0]
×
1391
    redshift_prow = first_row.copy()
×
1392

1393
    redshift_prow['INTID'] = internal_id
×
1394
    redshift_prow['JOBDESC'] = descriptor
×
1395
    redshift_prow['LATEST_QID'] = -99
×
1396
    redshift_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
×
1397
    redshift_prow['SUBMIT_DATE'] = -99
×
1398
    redshift_prow['STATUS'] = 'U'
×
1399
    redshift_prow['SCRIPTNAME'] = ''
×
1400
    redshift_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int)
×
1401

1402
    redshift_prow = assign_dependency(redshift_prow,dependency=tnight)
×
1403

1404
    return redshift_prow
×
1405

1406
def checkfor_and_submit_joint_job(ptable, arcs, flats, sciences, calibjobs,
1✔
1407
                                  lasttype, internal_id, z_submit_types=None, dry_run=0,
1408
                                  queue='realtime', reservation=None, strictly_successful=False,
1409
                                  check_for_outputs=True, resubmit_partial_complete=True,
1410
                                  system_name=None):
1411
    """
1412
    Takes all the state-ful data from daily processing and determines whether a joint fit needs to be submitted. Places
1413
    the decision criteria into a single function for easier maintainability over time. These are separate from the
1414
    new standard manifest*.json method of indicating a calibration sequence is complete. That is checked independently
1415
    elsewhere and doesn't interact with this.
1416

1417
    Args:
1418
        ptable (Table): Processing table of all exposures that have been processed.
1419
        arcs (list of dict): list of the individual arc jobs to be used for the psfnight (NOT all
1420
            the arcs, if multiple sets existed). May be empty if none identified yet.
1421
        flats (list of dict): list of the individual flat jobs to be used for the nightlyflat (NOT
1422
            all the flats, if multiple sets existed). May be empty if none identified yet.
1423
        sciences (list of dict): list of the most recent individual prestdstar science exposures
1424
            (if currently processing that tile). May be empty if none identified yet.
1425
        calibjobs (dict): Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
1426
            and 'nightlyflat'. Each key corresponds to a Table.Row or
1427
            None. The table.Row() values are for the corresponding
1428
            calibration job.
1429
        lasttype (str or None): the obstype of the last individual exposure row to be processed.
1430
        internal_id (int): an internal identifier unique to each job. Increments with each new job. This
1431
            is the smallest unassigned value.
1432
        z_submit_types (list of str): The "group" types of redshifts that should be submitted with each
1433
            exposure. If not specified or None, then no redshifts are submitted.
1434
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1435
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1436
            for testing as though scripts are being submitted. Default is 0 (false).
1437
        queue (str, optional): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1438
        reservation (str, optional): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1439
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1440
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1441
            than failing completely from failed calibrations. Default is False.
1442
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1443
            data products for the script being submitted. If all files exist and this is True,
1444
            then the script will not be submitted. If some files exist and this is True, only the
1445
            subset of the cameras without the final data products will be generated and submitted.
1446
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1447
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1448
            remaining cameras not found to exist.
1449
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, permutter-gpu
1450

1451
    Returns:
1452
        tuple: A tuple containing:
1453

1454
        * ptable, Table, Processing table of all exposures that have been processed.
1455
        * calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
1456
          and 'nightlyflat'. Each key corresponds to a Table.Row or
1457
          None. The table.Row() values are for the corresponding
1458
          calibration job.
1459
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
1460
          (if currently processing that tile). May be empty if none identified yet or
1461
          we just submitted them for processing.
1462
        * internal_id, int, if no job is submitted, this is the same as the input, otherwise it is incremented upward from
1463
          from the input such that it represents the smallest unused ID.
1464
    """
1465
    if lasttype == 'science' and len(sciences) > 0:
×
1466
        log = get_logger()
×
1467
        skysubonly = np.array([sci['LASTSTEP'] == 'skysub' for sci in sciences])
×
1468
        if np.all(skysubonly):
×
1469
            log.error("Identified all exposures in joint fitting request as skysub-only. Not submitting")
×
1470
            sciences = []
×
1471
            return ptable, calibjobs, sciences, internal_id
×
1472

1473
        if np.any(skysubonly):
×
1474
            log.error("Identified skysub-only exposures in joint fitting request")
×
1475
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
1476
            log.info("LASTSTEP's: {}".format([row['LASTSTEP'] for row in sciences]))
×
1477
            sciences = (np.array(sciences,dtype=object)[~skysubonly]).tolist()
×
1478
            log.info("Removed skysub only exposures in joint fitting:")
×
1479
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
1480
            log.info("LASTSTEP's: {}".format([row['LASTSTEP'] for row in sciences]))
×
1481

1482
        from collections import Counter
×
1483
        tiles = np.array([sci['TILEID'] for sci in sciences])
×
1484
        counts = Counter(tiles)
×
1485
        if len(counts.most_common()) > 1:
×
1486
            log.error("Identified more than one tile in a joint fitting request")
×
1487
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
1488
            log.info("Tileid's: {}".format(tiles))
×
1489
            log.info("Returning without joint fitting any of these exposures.")
×
1490
            # most_common, nmost_common = counts.most_common()[0]
1491
            # if most_common == -99:
1492
            #     most_common, nmost_common = counts.most_common()[1]
1493
            # log.warning(f"Given multiple tiles to jointly fit: {counts}. "+
1494
            #             "Only processing the most common non-default " +
1495
            #             f"tile: {most_common} with {nmost_common} exposures")
1496
            # sciences = (np.array(sciences,dtype=object)[tiles == most_common]).tolist()
1497
            # log.info("Tiles and exposure id's being submitted for joint fitting:")
1498
            # log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
1499
            # log.info("Tileid's: {}".format([row['TILEID'] for row in sciences]))
1500
            sciences = []
×
1501
            return ptable, calibjobs, sciences, internal_id
×
1502

1503
        ptable, tilejob, internal_id = science_joint_fit(ptable, sciences, internal_id, z_submit_types=z_submit_types,
×
1504
                                                         dry_run=dry_run, queue=queue, reservation=reservation,
1505
                                                         strictly_successful=strictly_successful,
1506
                                                         check_for_outputs=check_for_outputs,
1507
                                                         resubmit_partial_complete=resubmit_partial_complete,
1508
                                                         system_name=system_name)
1509
        if tilejob is not None:
×
1510
            sciences = []
×
1511

1512
    elif lasttype == 'flat' and calibjobs['nightlyflat'] is None and len(flats) == 12:
×
1513
        ## Note here we have an assumption about the number of expected flats being greater than 11
1514
        ptable, calibjobs['nightlyflat'], internal_id \
×
1515
            = flat_joint_fit(ptable, flats, internal_id, dry_run=dry_run, queue=queue,
1516
                             reservation=reservation, strictly_successful=strictly_successful,
1517
                             check_for_outputs=check_for_outputs,
1518
                             resubmit_partial_complete=resubmit_partial_complete,
1519
                             system_name=system_name
1520
                            )
1521

1522
    elif lasttype == 'arc' and calibjobs['psfnight'] is None and len(arcs) == 5:
×
1523
        ## Note here we have an assumption about the number of expected arcs being greater than 4
1524
        ptable, calibjobs['psfnight'], internal_id \
×
1525
            = arc_joint_fit(ptable, arcs, internal_id, dry_run=dry_run, queue=queue,
1526
                            reservation=reservation, strictly_successful=strictly_successful,
1527
                            check_for_outputs=check_for_outputs,
1528
                            resubmit_partial_complete=resubmit_partial_complete,
1529
                            system_name=system_name
1530
                            )
1531
    return ptable, calibjobs, sciences, internal_id
×
1532

1533
def submit_tilenight_and_redshifts(ptable, sciences, calibjobs, lasttype, internal_id, dry_run=0,
1✔
1534
                                  queue='realtime', reservation=None, strictly_successful=False,
1535
                                  check_for_outputs=True, resubmit_partial_complete=True,
1536
                                  z_submit_types=None, system_name=None,use_specter=False,
1537
                                  laststeps=None):
1538
    """
1539
    Takes all the state-ful data from daily processing and determines whether a tilenight job needs to be submitted.
1540

1541
    Args:
1542
        ptable (Table): Processing table of all exposures that have been processed.
1543
        sciences (list of dict): list of the most recent individual prestdstar science exposures
1544
            (if currently processing that tile). May be empty if none identified yet.
1545
        lasttype (str or None): the obstype of the last individual exposure row to be processed.
1546
        internal_id (int): an internal identifier unique to each job. Increments with each new job. This
1547
            is the smallest unassigned value.
1548
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1549
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1550
            for testing as though scripts are being submitted. Default is 0 (false).
1551
        queue (str, optional): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1552
        reservation (str, optional): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1553
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1554
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1555
            than failing completely from failed calibrations. Default is False.
1556
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1557
            data products for the script being submitted. If all files exist and this is True,
1558
            then the script will not be submitted. If some files exist and this is True, only the
1559
            subset of the cameras without the final data products will be generated and submitted.
1560
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1561
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1562
            remaining cameras not found to exist.
1563
        z_submit_types (list of str, optional): The "group" types of redshifts that should be submitted with each
1564
            exposure. If not specified or None, then no redshifts are submitted.
1565
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, permutter-gpu
1566
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
1567
        laststeps (list of str, optional): A list of laststeps to pass as the laststeps argument to tilenight.
1568
    Returns:
1569
        tuple: A tuple containing:
1570

1571
        * ptable, Table, Processing table of all exposures that have been processed.
1572
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
1573
          (if currently processing that tile). May be empty if none identified yet or
1574
          we just submitted them for processing.
1575
        * internal_id, int, if no job is submitted, this is the same as the input, otherwise it is incremented upward from
1576
          from the input such that it represents the smallest unused ID.
1577
    """
1578
    ptable, tnight, internal_id = submit_tilenight(ptable, sciences, calibjobs, internal_id,
×
1579
                                             queue=queue, reservation=reservation,
1580
                                             dry_run=dry_run, strictly_successful=strictly_successful,
1581
                                             resubmit_partial_complete=resubmit_partial_complete,
1582
                                             system_name=system_name,use_specter=use_specter,
1583
                                             laststeps=laststeps
1584
                                             )
1585

1586
    ptable, internal_id = submit_redshifts(ptable, sciences, tnight, internal_id,
×
1587
                                    queue=queue, reservation=reservation,
1588
                                    dry_run=dry_run, strictly_successful=strictly_successful,
1589
                                    check_for_outputs=check_for_outputs,
1590
                                    resubmit_partial_complete=resubmit_partial_complete,
1591
                                    z_submit_types=z_submit_types, system_name=system_name
1592
                                    )
1593

1594
    if tnight is not None:
×
1595
        sciences = []
×
1596

1597
    return ptable, sciences, internal_id
×
1598

1599
def set_calibrator_flag(prows, ptable):
1✔
1600
    """
1601
    Sets the "CALIBRATOR" column of a procesing table row to 1 (integer representation of True)
1602
     for all input rows. Used within joint fitting code to flag the exposures that were input
1603
     to the psfnight or nightlyflat for later reference.
1604

1605
    Args:
1606
        prows, list or array of Table.Rows or dicts. The rows corresponding to the individual exposure jobs that are
1607
            inputs to the joint fit.
1608
        ptable, Table. The processing table where each row is a processed job.
1609

1610
    Returns:
1611
        Table: The same processing table as input except with added rows for the joint fit job and, in the case
1612
        of a stdstarfit, the poststdstar science exposure jobs.
1613
    """
1614
    for prow in prows:
×
1615
        ptable['CALIBRATOR'][ptable['INTID'] == prow['INTID']] = 1
×
1616
    return ptable
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc