• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

desihub / desispec / 22122886015

18 Feb 2026 01:32AM UTC coverage: 37.827% (+0.1%) from 37.721%
22122886015

Pull #2606

github

web-flow
Merge 742e87a1e into d1282e13b
Pull Request #2606: Rescale individual exposures before coaddition

93 of 104 new or added lines in 2 files covered. (89.42%)

374 existing lines in 7 files now uncovered.

13127 of 34703 relevant lines covered (37.83%)

0.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.7
/py/desispec/workflow/processing.py
1
"""
2
desispec.workflow.processing
3
============================
4

5
"""
6
import sys, os, glob
1✔
7
import json
1✔
8
from astropy.io import fits
1✔
9
from astropy.table import Table, join
1✔
10
from desispec.scripts.compute_dark import compute_dark_parser, get_stacked_dark_exposure_table
1✔
11
from desispec.workflow.batch_writer import create_biaspdark_batch_script, create_ccdcalib_batch_script, create_desi_proc_batch_script, create_desi_proc_tilenight_batch_script, create_linkcal_batch_script, get_desi_proc_batch_file_pathname, get_desi_proc_tilenight_batch_file_pathname
1✔
12
import numpy as np
1✔
13

14
import time, datetime
1✔
15
from collections import OrderedDict
1✔
16
import subprocess
1✔
17

18
from desispec.scripts.link_calibnight import derive_include_exclude
1✔
19
from desispec.scripts.tile_redshifts import generate_tile_redshift_scripts
1✔
20
from desispec.workflow.redshifts import get_ztile_script_pathname, \
1✔
21
    get_ztile_relpath, \
22
    get_ztile_script_suffix
23
from desispec.workflow.exptable import read_minimal_science_exptab_cols
1✔
24
from desispec.workflow.queue import get_resubmission_states, update_from_queue, \
1✔
25
    queue_info_from_qids, get_queue_states_from_qids, update_queue_state_cache, \
26
    get_non_final_states
27
from desispec.workflow.timing import what_night_is_it
1✔
28
from desispec.workflow.batch_writer import get_desi_proc_batch_file_path
1✔
29
from desispec.workflow.batch import parse_reservation
1✔
30
from desispec.workflow.utils import pathjoin, sleep_and_report, \
1✔
31
    load_override_file
32
from desispec.workflow.tableio import write_table, load_table
1✔
33
from desispec.workflow.proctable import get_pdarks_from_ptable, table_row_to_dict, erow_to_prow, \
1✔
34
    read_minimal_tilenight_proctab_cols, read_minimal_full_proctab_cols, \
35
    update_full_ptab_cache, default_prow, get_default_qid
36
from desiutil.log import get_logger
1✔
37

38
from desispec.io import findfile, specprod_root
1✔
39
from desispec.io.util import decode_camword, create_camword, \
1✔
40
    difference_camwords, erow_to_goodcamword, \
41
    camword_to_spectros, camword_union, camword_intersection, parse_badamps
42

43

44
#################################################
45
############## Misc Functions ###################
46
#################################################
47
def night_to_starting_iid(night=None):
1✔
48
    """
49
    Creates an internal ID for a given night. The resulting integer is an 8 digit number.
50
    The digits are YYMMDDxxx where YY is the years since 2000, MM and DD are the month and day. xxx are 000,
51
    and are incremented for up to 1000 unique job ID's for a given night.
52

53
    Args:
54
        night (str or int): YYYYMMDD of the night to get the starting internal ID for.
55

56
    Returns:
57
        int: 9 digit number consisting of YYMMDD000. YY is years after 2000, MMDD is month and day.
58
        000 being the starting job number (0).
59
    """
60
    if night is None:
1✔
61
        night = what_night_is_it()
×
62
    night = int(night)
1✔
63
    internal_id = (night - 20000000) * 1000
1✔
64
    return internal_id
1✔
65

66
class ProcessingParams():
1✔
67
    def __init__(self, dry_run_level=0, queue='realtime',
1✔
68
                 reservation=None, strictly_successful=True,
69
                 check_for_outputs=True,
70
                 resubmit_partial_complete=True,
71
                 system_name='perlmutter', use_specter=True):
72

73
        self.dry_run_level = dry_run_level
×
74
        self.system_name = system_name
×
75
        self.queue = queue
×
76
        self.reservation = reservation
×
77
        self.strictly_successful = strictly_successful
×
78
        self.check_for_outputs = check_for_outputs
×
79
        self.resubmit_partial_complete = resubmit_partial_complete
×
80
        self.use_specter = use_specter
×
81

82
#################################################
83
############ Script Functions ###################
84
#################################################
85
def batch_script_pathname(prow):
1✔
86
    """
87
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, JOBDESC, PROCCAMWORD defined)
88
    and determines the script file pathname as defined by desi_proc's helper functions.
89

90
    Args:
91
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
92

93
    Returns:
94
        str: The complete pathname to the script file, as it is defined within the desi_proc ecosystem.
95
    """
96
    expids = prow['EXPID']
1✔
97
    if len(expids) == 0:
1✔
98
        expids = None
1✔
99
    if prow['JOBDESC'] == 'tilenight':
1✔
100
        pathname = get_desi_proc_tilenight_batch_file_pathname(night = prow['NIGHT'], tileid=prow['TILEID'])
1✔
101
    else:
102
        pathname = get_desi_proc_batch_file_pathname(night = prow['NIGHT'], exp=expids, \
1✔
103
                                             jobdesc=prow['JOBDESC'], cameras=prow['PROCCAMWORD'])
104
    scriptfile =  pathname + '.slurm'
1✔
105
    return scriptfile
1✔
106

107

108
def get_jobdesc_to_file_map():
1✔
109
    """
110
    Returns a mapping of job descriptions to the filenames of the output files
111

112
    Args:
113
        None
114

115
    Returns:
116
        dict. Dictionary with keys as lowercase job descriptions and to the
117
            filename of their expected outputs.
118

119
    """
120
    return {'biasnight': 'biasnight',
1✔
121
            'biaspdark': 'preproc_for_dark',
122
            'pdark': 'preproc_for_dark',
123
            'ccdcalib': 'ctecorrnight',
124
            'arc': 'fitpsf',
125
            'psfnight': 'psfnight',
126
            'flat': 'fiberflat',
127
            'nightlyflat': 'fiberflatnight',
128
            'prestdstar': 'sframe',
129
            'stdstarfit': 'stdstars',
130
            'poststdstar': 'cframe',
131
            'spectra': 'spectra_tile',
132
            'coadds': 'coadds_tile',
133
            'redshift': 'redrock_tile'}
134

135

136
def check_for_outputs_on_disk(prow, resubmit_partial_complete=True):
1✔
137
    """
138
    Args:
139
        prow (Table.Row or dict): Must include keyword accessible definitions for processing_table columns found in
140
            desispect.workflow.proctable.get_processing_table_column_defs()
141
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
142
            jobs with some prior data are pruned using PROCCAMWORD to only process the
143
            remaining cameras not found to exist.
144

145
    Returns:
146
        Table.Row or dict: The same prow type and keywords as input except with modified values updated to reflect
147
        the change in job status after creating and submitting the job for processing.
148
    """
149
    prow['STATUS'] = 'UNKNOWN'
1✔
150
    log = get_logger()
1✔
151

152
    if prow['JOBDESC'] in ['linkcal', 'ccdcalib']:
1✔
153
        log.info(f"jobdesc={prow['JOBDESC']} has indeterminated outputs, so "
1✔
154
                + "not checking for files on disk.")
155
        return prow
1✔
156

157
    job_to_file_map = get_jobdesc_to_file_map()
1✔
158

159
    night = prow['NIGHT']
1✔
160
    if prow['JOBDESC'] in ['cumulative','pernight-v0','pernight','perexp']:
1✔
161
        filetype = 'redrock_tile'
1✔
162
    else:
163
        filetype = job_to_file_map[prow['JOBDESC']]
1✔
164
    orig_camword = prow['PROCCAMWORD']
1✔
165

166
    ## if spectro based, look for spectros, else look for cameras
167
    if prow['JOBDESC'] in ['stdstarfit','spectra','coadds','redshift']:
1✔
168
        ## Spectrograph based
169
        spectros = camword_to_spectros(prow['PROCCAMWORD'])
×
170
        n_desired = len(spectros)
×
171
        ## Suppress outputs about using tile based files in findfile if only looking for stdstarfits
172
        if prow['JOBDESC'] == 'stdstarfit':
×
173
            tileid = None
×
174
        else:
175
            tileid = prow['TILEID']
×
176
        expid = prow['EXPID'][0]
×
177
        existing_spectros = []
×
178
        for spectro in spectros:
×
179
            if os.path.exists(findfile(filetype=filetype, night=night, expid=expid, spectrograph=spectro, tile=tileid)):
×
180
                existing_spectros.append(spectro)
×
181
        completed = (len(existing_spectros) == n_desired)
×
182
        if not completed and resubmit_partial_complete and len(existing_spectros) > 0:
×
183
            existing_camword = 'a' + ''.join([str(spec) for spec in sorted(existing_spectros)])
×
184
            prow['PROCCAMWORD'] = difference_camwords(prow['PROCCAMWORD'],existing_camword)
×
185
    elif prow['JOBDESC'] in ['cumulative','pernight-v0','pernight','perexp']:
1✔
186
        ## Spectrograph based
187
        spectros = camword_to_spectros(prow['PROCCAMWORD'])
1✔
188
        n_desired = len(spectros)
1✔
189
        ## Suppress outputs about using tile based files in findfile if only looking for stdstarfits
190
        tileid = prow['TILEID']
1✔
191
        expid = prow['EXPID'][0]
1✔
192
        redux_dir = specprod_root()
1✔
193
        outdir = os.path.join(redux_dir,get_ztile_relpath(tileid,group=prow['JOBDESC'],night=night,expid=expid))
1✔
194
        suffix = get_ztile_script_suffix(tileid, group=prow['JOBDESC'], night=night, expid=expid)
1✔
195
        existing_spectros = []
1✔
196
        for spectro in spectros:
1✔
197
            if os.path.exists(os.path.join(outdir, f"redrock-{spectro}-{suffix}.fits")):
1✔
198
                existing_spectros.append(spectro)
×
199
        completed = (len(existing_spectros) == n_desired)
1✔
200
        if not completed and resubmit_partial_complete and len(existing_spectros) > 0:
1✔
201
            existing_camword = 'a' + ''.join([str(spec) for spec in sorted(existing_spectros)])
×
202
            prow['PROCCAMWORD'] = difference_camwords(prow['PROCCAMWORD'],existing_camword)
×
203
    elif str(prow['JOBDESC']).endswith('pdark') and len(prow['EXPID']) > 1:
1✔
204
        ## camera based, multi exposure
205
        cameras = decode_camword(prow['PROCCAMWORD'])
1✔
206
        nexps = len(prow['EXPID'])
1✔
207
        n_desired = len(cameras)*nexps
1✔
208

209
        missing_cameras = []
1✔
210
        for cam in cameras:
1✔
211
            for expid in prow['EXPID']:
1✔
212
                if not os.path.exists(findfile(filetype=filetype, night=night, expid=expid, camera=cam)):
1✔
213
                    missing_cameras.append(cam)
1✔
214
        completed = (len(missing_cameras) == 0)
1✔
215
        if not completed and resubmit_partial_complete and len(missing_cameras) < n_desired:
1✔
216
            prow['PROCCAMWORD'] = create_camword(np.unique(missing_cameras))
×
217
    else:
218
        ## Otheriwse camera based
219
        cameras = decode_camword(prow['PROCCAMWORD'])
1✔
220
        n_desired = len(cameras)
1✔
221
        if len(prow['EXPID']) > 0:
1✔
222
            expid = prow['EXPID'][0]
1✔
223
        else:
224
            expid = None
×
225
        if len(prow['EXPID']) > 1 and prow['JOBDESC'] not in ['psfnight','nightlyflat']:
1✔
226
            log.warning(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']}. This job type only makes " +
×
227
                     f"sense with a single exposure. Proceeding with {expid}.")
228
        missing_cameras = []
1✔
229
        for cam in cameras:
1✔
230
            if not os.path.exists(findfile(filetype=filetype, night=night, expid=expid, camera=cam)):
1✔
231
                missing_cameras.append(cam)
1✔
232
        completed = (len(missing_cameras) == 0)
1✔
233
        if not completed and resubmit_partial_complete and len(missing_cameras) < n_desired:
1✔
234
            prow['PROCCAMWORD'] = create_camword(missing_cameras)
×
235

236
    if completed:
1✔
237
        prow['STATUS'] = 'COMPLETED'
×
238
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} already has " +
×
239
                 f"the desired {n_desired} {filetype}'s. Not submitting this job.")
240
    elif resubmit_partial_complete and orig_camword != prow['PROCCAMWORD']:
1✔
241
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} already has " +
×
242
                 f"some {filetype}'s. Submitting smaller camword={prow['PROCCAMWORD']}.")
243
    elif not resubmit_partial_complete:
1✔
244
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} doesn't have all " +
×
245
                 f"{filetype}'s and resubmit_partial_complete=False. "+
246
                 f"Submitting full camword={prow['PROCCAMWORD']}.")
247
    else:
248
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} has no " +
1✔
249
                 f"existing {filetype}'s. Submitting full camword={prow['PROCCAMWORD']}.")
250
    return prow
1✔
251

252
def create_and_submit(prow, queue='realtime', reservation=None, dry_run=0,
1✔
253
                      joint=False, strictly_successful=False,
254
                      check_for_outputs=True, resubmit_partial_complete=True,
255
                      system_name=None, use_specter=False,
256
                      extra_job_args=None):
257
    """
258
    Wrapper script that takes a processing table row and three modifier keywords, creates a submission script for the
259
    compute nodes, and then submits that script to the Slurm scheduler with appropriate dependencies.
260

261
    Args:
262
        prow (Table.Row or dict): Must include keyword accessible definitions for processing_table columns found in
263
            desispect.workflow.proctable.get_processing_table_column_defs()
264
        queue (str, optional): The name of the NERSC Slurm queue to submit to. Default is the realtime queue.
265
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
266
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
267
            0 which runs the code normally.
268
            1 writes all files but doesn't submit any jobs to Slurm.
269
            2 writes tables but doesn't write scripts or submit anything.
270
            3 Doesn't write or submit anything but queries Slurm normally for job status.
271
            4 Doesn't write, submit jobs, or query Slurm.
272
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
273
        joint (bool, optional): Whether this is a joint fitting job (the job involves multiple exposures) and therefore needs to be
274
            run with desi_proc_joint_fit. Default is False.
275
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
276
            less desirable because e.g. the sciences can run with SVN default calibrations rather
277
            than failing completely from failed calibrations. Default is False.
278
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
279
            data products for the script being submitted. If all files exist and this is True,
280
            then the script will not be submitted. If some files exist and this is True, only the
281
            subset of the cameras without the final data products will be generated and submitted.
282
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
283
            jobs with some prior data are pruned using PROCCAMWORD to only process the
284
            remaining cameras not found to exist.
285
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
286
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
287
        extra_job_args (dict): Dictionary with key-value pairs that specify additional
288
            information used for a specific type of job. Examples include refnight
289
            and include/exclude lists for linkcals, laststeps for for tilenight, etc.
290

291
    Returns:
292
        Table.Row or dict: The same prow type and keywords as input except with modified values updated to reflect
293
        the change in job status after creating and submitting the job for processing.
294

295
    Note:
296
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
297
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
298
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
299
    """
300
    orig_prow = prow.copy()
1✔
301
    if check_for_outputs:
1✔
302
        prow = check_for_outputs_on_disk(prow, resubmit_partial_complete)
1✔
303
        if prow['STATUS'].upper() == 'COMPLETED':
1✔
304
            return prow
×
305

306
    prow = create_batch_script(prow, queue=queue, dry_run=dry_run, joint=joint,
1✔
307
                               system_name=system_name, use_specter=use_specter,
308
                               extra_job_args=extra_job_args)
309
    prow = submit_batch_script(prow, reservation=reservation, dry_run=dry_run,
1✔
310
                               strictly_successful=strictly_successful)
311

312
    ## If resubmitted partial, the PROCCAMWORD and SCRIPTNAME will correspond
313
    ## to the pruned values. But we want to
314
    ## retain the full job's value, so get those from the old job.
315
    if resubmit_partial_complete:
1✔
316
        prow['PROCCAMWORD'] = orig_prow['PROCCAMWORD']
1✔
317
        prow['SCRIPTNAME'] = orig_prow['SCRIPTNAME']
1✔
318
    return prow
1✔
319

320
def desi_link_calibnight_command(prow, refnight, include=None):
1✔
321
    """
322
    Wrapper script that takes a processing table row (or dictionary with
323
    REFNIGHT, NIGHT, PROCCAMWORD defined) and determines the proper command
324
    line call to link data defined by the input row/dict.
325

326
    Args:
327
        prow (Table.Row or dict): Must include keyword accessible definitions
328
            for 'NIGHT', 'REFNIGHT', and 'PROCCAMWORD'.
329
        refnight (str or int): The night with a valid set of calibrations
330
            be created.
331
        include (list): The filetypes to include in the linking.
332
    Returns:
333
        str: The proper command to be submitted to desi_link_calibnight
334
            to process the job defined by the prow values.
335
    """
336
    cmd = 'desi_link_calibnight'
1✔
337
    cmd += f' --refnight={refnight}'
1✔
338
    cmd += f' --newnight={prow["NIGHT"]}'
1✔
339
    cmd += f' --cameras={prow["PROCCAMWORD"]}'
1✔
340
    if include is not None:
1✔
341
        cmd += f' --include=' + ','.join(list(include))
1✔
342
    return cmd
1✔
343

344
def desi_proc_command(prow, system_name, use_specter=False, queue=None):
1✔
345
    """
346
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, OBSTYPE, JOBDESC, PROCCAMWORD defined)
347
    and determines the proper command line call to process the data defined by the input row/dict.
348

349
    Args:
350
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
351
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, perlmutter-gpu
352
        queue (str, optional): The name of the NERSC Slurm queue to submit to. Default is None (which leaves it to the desi_proc default).
353
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
354

355
    Returns:
356
        str: The proper command to be submitted to desi_proc to process the job defined by the prow values.
357
    """
358
    cmd = 'desi_proc'
1✔
359
    cmd += ' --batch'
1✔
360
    cmd += ' --nosubmit'
1✔
361
    if queue is not None:
1✔
362
        cmd += f' -q {queue}'
1✔
363
    if prow['OBSTYPE'].lower() == 'science':
1✔
364
        if prow['JOBDESC'] == 'prestdstar':
×
365
            cmd += ' --nostdstarfit --nofluxcalib'
×
366
        elif prow['JOBDESC'] == 'poststdstar':
×
367
            cmd += ' --noprestdstarfit --nostdstarfit'
×
368

369
        if use_specter:
×
370
            cmd += ' --use-specter'
×
371
    elif prow['JOBDESC'] in ['flat', 'prestdstar'] and use_specter:
1✔
372
        cmd += ' --use-specter'
×
373
    pcamw = str(prow['PROCCAMWORD'])
1✔
374
    cmd += f" --cameras={pcamw} -n {prow['NIGHT']}"
1✔
375
    if len(prow['EXPID']) > 0:
1✔
376
        ## If ccdcalib job without a dark exposure, don't assign the flat expid
377
        ## since it would incorrectly process the flat using desi_proc
378
        if prow['OBSTYPE'].lower() != 'flat' or prow['JOBDESC'] != 'ccdcalib':
1✔
379
            cmd += f" -e {prow['EXPID'][0]}"
1✔
380
    if prow['BADAMPS'] != '':
1✔
381
        cmd += ' --badamps={}'.format(prow['BADAMPS'])
×
382
    return cmd
1✔
383

384
def desi_proc_joint_fit_command(prow, queue=None):
1✔
385
    """
386
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, OBSTYPE, PROCCAMWORD defined)
387
    and determines the proper command line call to process the data defined by the input row/dict.
388

389
    Args:
390
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
391
        queue (str): The name of the NERSC Slurm queue to submit to. Default is None (which leaves it to the desi_proc default).
392

393
    Returns:
394
        str: The proper command to be submitted to desi_proc_joint_fit
395
            to process the job defined by the prow values.
396
    """
397
    cmd = 'desi_proc_joint_fit'
1✔
398
    cmd += ' --batch'
1✔
399
    cmd += ' --nosubmit'
1✔
400
    if queue is not None:
1✔
401
        cmd += f' -q {queue}'
1✔
402

403
    descriptor = prow['OBSTYPE'].lower()
1✔
404

405
    night = prow['NIGHT']
1✔
406
    specs = str(prow['PROCCAMWORD'])
1✔
407
    expid_str = ','.join([str(eid) for eid in prow['EXPID']])
1✔
408

409
    cmd += f' --obstype {descriptor}'
1✔
410
    cmd += f' --cameras={specs} -n {night}'
1✔
411
    if len(expid_str) > 0:
1✔
412
        cmd += f' -e {expid_str}'
1✔
413
    return cmd
1✔
414

415

416
def create_batch_script(prow, queue='realtime', dry_run=0, joint=False,
1✔
417
                        system_name=None, use_specter=False, extra_job_args=None):
418
    """
419
    Wrapper script that takes a processing table row and three modifier keywords and creates a submission script for the
420
    compute nodes.
421

422
    Args:
423
        prow, Table.Row or dict. Must include keyword accessible definitions for processing_table columns found in
424
            desispect.workflow.proctable.get_processing_table_column_defs()
425
        queue, str. The name of the NERSC Slurm queue to submit to. Default is the realtime queue.
426
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
427
            0 which runs the code normally.
428
            1 writes all scripts but doesn't submit any jobs to Slurm.
429
            2 writes tables but doesn't write scripts or submit anything.
430
            3 Doesn't write or submit anything but queries Slurm normally for job status.
431
            4 Doesn't write, submit jobs, or query Slurm.
432
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
433
        joint, bool. Whether this is a joint fitting job (the job involves multiple exposures) and therefore needs to be
434
            run with desi_proc_joint_fit when not using tilenight. Default is False.
435
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
436
        use_specter, bool, optional. Default is False. If True, use specter, otherwise use gpu_specter by default.
437
        extra_job_args (dict): Dictionary with key-value pairs that specify additional
438
            information used for a specific type of job. Examples include refnight
439
            and include/exclude lists for linkcal, laststeps for tilenight, etc.
440

441
    Returns:
442
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
443
        scriptname.
444

445
    Note:
446
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
447
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
448
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
449
    """
450
    log = get_logger()
1✔
451

452
    if extra_job_args is None:
1✔
453
        extra_job_args = {}
1✔
454

455
    if prow['JOBDESC'] == 'linkcal':
1✔
456
        refnight, include, exclude = -99, None, None
1✔
457
        if 'refnight' in extra_job_args:
1✔
458
            refnight = extra_job_args['refnight']
1✔
459
        if 'include' in extra_job_args:
1✔
460
            include = extra_job_args['include']
1✔
461
        if 'exclude' in extra_job_args:
1✔
462
            exclude = extra_job_args['exclude']
×
463
        include, exclude = derive_include_exclude(include, exclude)
1✔
464
        ## Fiberflatnights shouldn't to be generated with psfs from same time, so
465
        ## shouldn't link psfs without also linking fiberflatnight
466
        ## However, this should be checked at a higher level. If set here,
467
        ## go ahead and do it
468
        # if 'psfnight' in include and not 'fiberflatnight' in include:
469
        #     err = "Must link fiberflatnight if linking psfnight"
470
        #     log.error(err)
471
        #     raise ValueError(err)
472
        if dry_run > 1:
1✔
473
            scriptpathname = batch_script_pathname(prow)
1✔
474
            log.info("Output file would have been: {}".format(scriptpathname))
1✔
475
            cmd = desi_link_calibnight_command(prow, refnight, include)
1✔
476
            log.info("Command to be run: {}".format(cmd.split()))
1✔
477
        else:
478
            if refnight == -99:
×
479
                err = f'For {prow=} asked to link calibration but not given' \
×
480
                      + ' a valid refnight'
481
                log.error(err)
×
482
                raise ValueError(err)
×
483

484
            cmd = desi_link_calibnight_command(prow, refnight, include)
×
485
            log.info(f"Running: {cmd.split()}")
×
486
            scriptpathname = create_linkcal_batch_script(newnight=prow['NIGHT'],
×
487
                                                         cameras=prow['PROCCAMWORD'],
488
                                                         queue=queue,
489
                                                         cmd=cmd,
490
                                                         system_name=system_name)
491
    elif prow['JOBDESC'] in ['biasnight','pdark','biaspdark']:
1✔
492
        if dry_run > 1:
1✔
493
            scriptpathname = get_desi_proc_batch_file_pathname(night=prow['NIGHT'], exp=prow['EXPID'],
1✔
494
                                                   jobdesc=prow['JOBDESC'], cameras=prow['PROCCAMWORD'])
495
        else:
496
            log.info(f"Creating biaspdark script for: {prow}, {extra_job_args}")
1✔
497
            do_biasnight, do_pdark = False, False
1✔
498
            if 'steps' in extra_job_args:
1✔
499
                do_biasnight = 'biasnight' in extra_job_args['steps']
1✔
500
                do_pdark = 'pdark' in extra_job_args['steps']
1✔
501
            scriptpathname = create_biaspdark_batch_script(night=prow['NIGHT'], expids=prow['EXPID'],
1✔
502
                                                   jobdesc=prow['JOBDESC'], camword=prow['PROCCAMWORD'],
503
                                                   do_biasnight=do_biasnight, do_pdark=do_pdark,
504
                                                   queue=queue, system_name=system_name)
505
    elif prow['JOBDESC'] in ['ccdcalib']:
1✔
506
        if dry_run > 1:
1✔
507
            scriptpathname = get_desi_proc_batch_file_pathname(night=prow['NIGHT'], exp=prow['EXPID'],
1✔
508
                                                   jobdesc=prow['JOBDESC'], cameras=prow['PROCCAMWORD'])
509
        else:
510
            log.info(f"Creating ccdcalib script for: {prow}, {extra_job_args}")
1✔
511
                                #  do_ctecorr=False, n_nights_before=None, n_nights_after=None,
512
                                #  dark_expid=None, cte_expids=None
513
            do_darknight, do_badcolumn, do_ctecorr = False, False, False
1✔
514
            if 'steps' in extra_job_args:
1✔
515
                do_darknight = 'darknight' in extra_job_args['steps']
1✔
516
                do_badcolumn = 'badcolumn' in extra_job_args['steps']
1✔
517
                do_ctecorr = 'ctecorr' in extra_job_args['steps']
1✔
518
            n_nights_before, n_nights_after = None, None
1✔
519
            if 'n_nights_before' in extra_job_args:
1✔
UNCOV
520
                n_nights_before = extra_job_args['n_nights_before']
×
521
            if 'n_nights_after' in extra_job_args:
1✔
UNCOV
522
                n_nights_after = extra_job_args['n_nights_after']
×
523
            dark_expid, cte_expids = None, None
1✔
524
            if 'dark_expid' in extra_job_args:
1✔
525
                dark_expid = extra_job_args['dark_expid']
1✔
526
            if 'cte_expids' in extra_job_args:
1✔
527
                cte_expids = extra_job_args['cte_expids']
1✔
528
            scriptpathname = create_ccdcalib_batch_script(night=prow['NIGHT'], expids=prow['EXPID'],
1✔
529
                                                   camword=prow['PROCCAMWORD'],
530
                                                   do_darknight=do_darknight, do_badcolumn=do_badcolumn,
531
                                                   do_ctecorr=do_ctecorr, n_nights_before=n_nights_before,
532
                                                   n_nights_after=n_nights_after,
533
                                                   dark_expid=dark_expid, cte_expids=cte_expids,
534
                                                   queue=queue, system_name=system_name)
535
    elif prow['JOBDESC'] in ['perexp','pernight','pernight-v0','cumulative']:
1✔
536
        if dry_run > 1:
1✔
537
            scriptpathname = get_ztile_script_pathname(tileid=prow['TILEID'],group=prow['JOBDESC'],
1✔
538
                                                               night=prow['NIGHT'], expid=prow['EXPID'][0])
539
            log.info("Output file would have been: {}".format(scriptpathname))
1✔
540
        else:
541
            #- run zmtl for cumulative redshifts but not others
542
            run_zmtl = (prow['JOBDESC'] == 'cumulative')
1✔
543
            no_afterburners = False
1✔
544
            log.info(f"Creating tile redshift script for: {prow}")
1✔
545
            scripts, failed_scripts = generate_tile_redshift_scripts(tileid=prow['TILEID'], group=prow['JOBDESC'],
1✔
546
                                                                     nights=[prow['NIGHT']], expids=prow['EXPID'],
547
                                                                     batch_queue=queue, system_name=system_name,
548
                                                                     run_zmtl=run_zmtl,
549
                                                                     no_afterburners=no_afterburners,
550
                                                                     nosubmit=True)
551
            if len(failed_scripts) > 0:
1✔
552
                log.error(f"Redshifts failed for group={prow['JOBDESC']}, night={prow['NIGHT']}, "+
×
553
                          f"tileid={prow['TILEID']}, expid={prow['EXPID']}.")
554
                log.info(f"Returned failed scriptname is {failed_scripts}")
×
555
            elif len(scripts) > 1:
1✔
556
                log.error(f"More than one redshifts returned for group={prow['JOBDESC']}, night={prow['NIGHT']}, "+
×
557
                          f"tileid={prow['TILEID']}, expid={prow['EXPID']}.")
558
                log.info(f"Returned scriptnames were {scripts}")
×
559
            elif len(scripts) == 0:
1✔
560
                msg = f'No scripts were generated for {prow=}'
×
561
                log.critical(prow)
×
562
                raise ValueError(msg)
×
563
            else:
564
                scriptpathname = scripts[0]
1✔
565
    else:
566
        if prow['JOBDESC'] != 'tilenight':
1✔
567
            ## run known joint jobs as joint even if unspecified
568
            ## in the future we can eliminate the need for "joint"
569
            if joint or prow['JOBDESC'].lower() in ['psfnight', 'nightlyflat']:
1✔
570
                cmd = desi_proc_joint_fit_command(prow, queue=queue)
1✔
571
                ## For consistency with how we edit the other commands, do them
572
                ## here, but future TODO would be to move these into the command
573
                ## generation itself
574
                if 'extra_cmd_args' in extra_job_args:
1✔
575
                    cmd += ' ' + ' '.join(np.atleast_1d(extra_job_args['extra_cmd_args']))
1✔
576
            else:
577
                ## individual arcs and flats
578
                cmd = desi_proc_command(prow, system_name, use_specter, queue=queue)
1✔
579

580
        if dry_run > 1:
1✔
581
            scriptpathname = batch_script_pathname(prow)
1✔
582
            log.info("Output file would have been: {}".format(scriptpathname))
1✔
583
            if prow['JOBDESC'] != 'tilenight':
1✔
584
                log.info("Command to be run: {}".format(cmd.split()))
1✔
585
        else:
586
            expids = prow['EXPID']
1✔
587
            if len(expids) == 0:
1✔
588
                expids = None
×
589

590
            if prow['JOBDESC'] == 'tilenight':
1✔
591
                log.info("Creating tilenight script for tile {}".format(prow['TILEID']))
1✔
592
                if 'laststeps' in extra_job_args:
1✔
593
                    laststeps = extra_job_args['laststeps']
1✔
594
                else:
595
                    err = f'{prow=} job did not specify last steps to tilenight'
×
596
                    log.error(err)
×
597
                    raise ValueError(err)
×
598
                ncameras = len(decode_camword(prow['PROCCAMWORD']))
1✔
599
                scriptpathname = create_desi_proc_tilenight_batch_script(
1✔
600
                                                               night=prow['NIGHT'], exp=expids,
601
                                                               tileid=prow['TILEID'],
602
                                                               ncameras=ncameras,
603
                                                               queue=queue,
604
                                                               mpistdstars=True,
605
                                                               use_specter=use_specter,
606
                                                               system_name=system_name,
607
                                                               laststeps=laststeps)
608
            else:
609
                if expids is not None and len(expids) > 1:
1✔
610
                    expids = expids[:1]
1✔
611
                log.info("Running: {}".format(cmd.split()))
1✔
612
                scriptpathname = create_desi_proc_batch_script(night=prow['NIGHT'], exp=expids,
1✔
613
                                                               cameras=prow['PROCCAMWORD'],
614
                                                               jobdesc=prow['JOBDESC'],
615
                                                               queue=queue, cmdline=cmd,
616
                                                               use_specter=use_specter,
617
                                                               system_name=system_name)
618
    log.info("Outfile is: {}".format(scriptpathname))
1✔
619
    prow['SCRIPTNAME'] = os.path.basename(scriptpathname)
1✔
620
    return prow
1✔
621

622
_fake_qid = int(time.time() - 1.7e9)
1✔
623
def _get_fake_qid():
1✔
624
    """
625
    Return fake slurm queue jobid to use for dry-run testing
626
    """
627
    # Note: not implemented as a yield generator so that this returns a
628
    # genuine int, not a generator object
629
    global _fake_qid
630
    _fake_qid += 1
1✔
631
    return _fake_qid
1✔
632

633
def submit_batch_script(prow, dry_run=0, reservation=None, strictly_successful=False):
1✔
634
    """
635
    Wrapper script that takes a processing table row and three modifier keywords and submits the scripts to the Slurm
636
    scheduler.
637

638
    Args:
639
        prow, Table.Row or dict. Must include keyword accessible definitions for processing_table columns found in
640
            desispect.workflow.proctable.get_processing_table_column_defs()
641
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
642
            0 which runs the code normally.
643
            1 writes all files but doesn't submit any jobs to Slurm.
644
            2 writes tables but doesn't write scripts or submit anything.
645
            3 Doesn't write or submit anything but queries Slurm normally for job status.
646
            4 Doesn't write, submit jobs, or query Slurm.
647
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
648
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
649
        strictly_successful, bool. Whether all jobs require all inputs to have succeeded. For daily processing, this is
650
            less desirable because e.g. the sciences can run with SVN default calibrations rather
651
            than failing completely from failed calibrations. Default is False.
652

653
    Returns:
654
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
655
        scriptname.
656

657
    Note:
658
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
659
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
660
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
661
    """
662
    log = get_logger()
1✔
663
    dep_qids = prow['LATEST_DEP_QID']
1✔
664
    dep_list, dep_str = '', ''
1✔
665

666
    ## With desi_proc_night we now either resubmit failed jobs or exit, so this
667
    ## should no longer be necessary in the normal workflow.
668
    # workaround for sbatch --dependency bug not tracking jobs correctly
669
    # see NERSC TICKET INC0203024
670
    failed_dependency = False
1✔
671
    if len(dep_qids) > 0 and not dry_run:
1✔
672
        non_final_states = get_non_final_states()
×
673
        state_dict = get_queue_states_from_qids(dep_qids, dry_run_level=dry_run, use_cache=True)
×
674
        still_depids = []
×
675
        for depid in dep_qids:
×
676
            if depid in state_dict.keys():
×
677
                if state_dict[int(depid)] == 'COMPLETED':
×
678
                   log.info(f"removing completed jobid {depid}")
×
679
                elif state_dict[int(depid)] not in non_final_states:
×
680
                    failed_dependency = True
×
681
                    log.info("Found a dependency in a bad final state="
×
682
                             + f"{state_dict[int(depid)]} for depjobid={depid},"
683
                             + " not submitting this job.")
684
                    still_depids.append(depid)
×
685
                else:
686
                    still_depids.append(depid)
×
687
            else:
688
                still_depids.append(depid)
×
689
        dep_qids = np.array(still_depids)
×
690

691
    if len(dep_qids) > 0:
1✔
692
        jobtype = prow['JOBDESC']
1✔
693
        if strictly_successful:
1✔
694
            depcond = 'afterok'
1✔
695
        elif jobtype in ['arc', 'psfnight', 'prestdstar', 'stdstarfit']:
×
696
            ## (though psfnight and stdstarfit will require some inputs otherwise they'll go up in flames)
697
            depcond = 'afterany'
×
698
        else:
699
            ## if 'flat','nightlyflat','poststdstar', or any type of redshift, require strict success of inputs
700
            depcond = 'afterok'
×
701

702
        dep_str = f'--dependency={depcond}:'
1✔
703

704
        if np.isscalar(dep_qids):
1✔
705
            dep_list = str(dep_qids).strip(' \t')
×
706
            if dep_list == '':
×
707
                dep_str = ''
×
708
            else:
709
                dep_str += dep_list
×
710
        else:
711
            if len(dep_qids)>1:
1✔
712
                dep_list = ':'.join(np.array(dep_qids).astype(str))
1✔
713
                dep_str += dep_list
1✔
714
            elif len(dep_qids) == 1 and dep_qids[0] not in [None, 0]:
1✔
715
                dep_str += str(dep_qids[0])
1✔
716
            else:
717
                dep_str = ''
×
718

719
    # script = f'{jobname}.slurm'
720
    # script_pathname = pathjoin(batchdir, script)
721
    if prow['JOBDESC'] in ['pernight-v0','pernight','perexp','cumulative']:
1✔
722
        script_pathname = get_ztile_script_pathname(tileid=prow['TILEID'],group=prow['JOBDESC'],
1✔
723
                                                        night=prow['NIGHT'], expid=np.min(prow['EXPID']))
724
        jobname = os.path.basename(script_pathname)
1✔
725
    else:
726
        script_pathname = batch_script_pathname(prow)
1✔
727
        jobname = os.path.basename(script_pathname)
1✔
728

729
    batch_params = ['sbatch', '--parsable']
1✔
730
    if dep_str != '':
1✔
731
        batch_params.append(f'{dep_str}')
1✔
732

733
    reservation = parse_reservation(reservation, prow['JOBDESC'])
1✔
734
    if reservation is not None:
1✔
735
        batch_params.append(f'--reservation={reservation}')
×
736

737
    batch_params.append(f'{script_pathname}')
1✔
738
    submitted = True
1✔
739
    ## If dry_run give it a fake QID
740
    ## if a dependency has failed don't even try to submit the job because
741
    ## Slurm will refuse, instead just mark as unsubmitted.
742
    if dry_run:
1✔
743
        current_qid = _get_fake_qid()
1✔
744
    elif not failed_dependency:
×
745
        #- sbatch sometimes fails; try several times before giving up
746
        max_attempts = 3
×
747
        for attempt in range(max_attempts):
×
748
            try:
×
749
                current_qid = subprocess.check_output(batch_params, stderr=subprocess.STDOUT, text=True)
×
750
                current_qid = int(current_qid.strip(' \t\n'))
×
751
                break
×
752
            except subprocess.CalledProcessError as err:
×
753
                log.error(f'{jobname} submission failure at {datetime.datetime.now()}')
×
754
                log.error(f'{jobname}   {batch_params}')
×
755
                log.error(f'{jobname}   {err.output=}')
×
756
                if attempt < max_attempts - 1:
×
757
                    log.info('Sleeping 60 seconds then retrying')
×
758
                    time.sleep(60)
×
759
        else:  #- for/else happens if loop doesn't succeed
760
            msg = f'{jobname} submission failed {max_attempts} times.' \
×
761
                  + ' setting as unsubmitted and moving on'
762
            log.error(msg)
×
763
            current_qid = get_default_qid()
×
764
            submitted = False
×
765
    else:
766
        current_qid = get_default_qid()
×
767
        submitted = False
×
768

769
    ## Update prow with new information
770
    prow['LATEST_QID'] = current_qid
1✔
771

772
    ## If we didn't submit, don't say we did and don't add to ALL_QIDS
773
    if submitted:
1✔
774
        log.info(batch_params)
1✔
775
        log.info(f'Submitted {jobname} with dependencies {dep_str} and '
1✔
776
                 + f'reservation={reservation}. Returned qid: {current_qid}')
777

778
        ## Update prow with new information
779
        prow['ALL_QIDS'] = np.append(prow['ALL_QIDS'],current_qid)
1✔
780
        prow['STATUS'] = 'SUBMITTED'
1✔
781
        prow['SUBMIT_DATE'] = int(time.time())
1✔
782
    else:
783
        log.info(f"Would have submitted: {batch_params}")
×
784
        prow['STATUS'] = 'UNSUBMITTED'
×
785

786
        ## Update the Slurm jobid cache of job states
787
        update_queue_state_cache(qid=prow['LATEST_QID'], state=prow['STATUS'])
×
788

789
    return prow
1✔
790

791

792
#############################################
793
##########   Row Manipulations   ############
794
#############################################
795
def define_and_assign_dependency(prow, calibjobs, use_tilenight=False,
1✔
796
                                 refnight=None, include_files=None):
797
    """
798
    Given input processing row and possible calibjobs, this defines the
799
    JOBDESC keyword and assigns the dependency appropriate for the job type of
800
    prow.
801

802
    Args:
803
        prow, Table.Row or dict. Must include keyword accessible definitions for
804
            'OBSTYPE'. A row must have column names for
805
            'JOBDESC', 'INT_DEP_IDS', and 'LATEST_DEP_ID'.
806
        calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
807
            and 'nightlyflat'. Each key corresponds to a Table.Row or
808
            None. The table.Row() values are for the corresponding
809
            calibration job. Each value that isn't None must contain
810
            'INTID', and 'LATEST_QID'. If None, it assumes the
811
            dependency doesn't exist and no dependency is assigned.
812
        use_tilenight, bool. Default is False. If True, use desi_proc_tilenight
813
            for prestdstar, stdstar,and poststdstar steps for
814
            science exposures.
815
        refnight, int. The reference night for linking jobs
816
        include_files, list. List of filetypes to include in the linking
817

818
    Returns:
819
        Table.Row or dict: The same prow type and keywords as input except
820
        with modified values updated values for 'JOBDESC', 'INT_DEP_IDS'. and 'LATEST_DEP_ID'.
821

822
    Note:
823
        This modifies the input. Though Table.Row objects are generally copied
824
        on modification, so the change to the input object in memory may or may
825
        not be changed. As of writing, a row from a table given to this function
826
        will not change during the execution of this function (but can be
827
        overwritten explicitly with the returned row if desired).
828
    """
829
    log = get_logger()
1✔
830

831
    if isinstance(calibjobs, Table):
1✔
832
        calibjobs = generate_calibration_dict(calibjobs, files_to_link=include_files)
1✔
833
    elif not isinstance(calibjobs, dict):
1✔
834
        log.error("prow must be a Table.Row or a dict")
×
835
        raise TypeError("prow must be a Table.Row or a dict")
×
836

837
    if prow['OBSTYPE'] in ['science', 'twiflat']:
1✔
838
        if calibjobs['nightlyflat'] is not None:
1✔
839
            dependency = calibjobs['nightlyflat']
1✔
840
        elif calibjobs['psfnight'] is not None:
1✔
841
            dependency = calibjobs['psfnight']
1✔
842
        elif calibjobs['ccdcalib'] is not None:
1✔
843
            dependency = calibjobs['ccdcalib']
1✔
844
        elif calibjobs['biaspdark'] is not None:
1✔
845
            dependency = calibjobs['biaspdark']
×
846
        else:
847
            dependency = calibjobs['linkcal']
1✔
848
        if not use_tilenight:
1✔
849
            prow['JOBDESC'] = 'prestdstar'
1✔
850
    elif prow['OBSTYPE'] == 'flat':
1✔
851
        if calibjobs['psfnight'] is not None:
1✔
852
            dependency = calibjobs['psfnight']
1✔
853
        elif calibjobs['ccdcalib'] is not None:
1✔
854
            dependency = calibjobs['ccdcalib']
1✔
855
        elif calibjobs['biaspdark'] is not None:
1✔
856
            dependency = calibjobs['biaspdark']
1✔
857
        else:
858
            dependency = calibjobs['linkcal']
1✔
859
    elif prow['OBSTYPE'] == 'arc':
1✔
860
        if calibjobs['ccdcalib'] is not None:
1✔
861
            dependency = calibjobs['ccdcalib']
1✔
862
        elif calibjobs['biaspdark'] is not None:
1✔
863
            dependency = calibjobs['biaspdark']
×
864
        else:
865
            dependency = calibjobs['linkcal']
1✔
866
    elif prow['JOBDESC'] in ['badcol', 'nightlybias', 'ccdcalib', 'pdark']:
1✔
867
        if calibjobs['biaspdark'] is not None:
1✔
868
            dependency = calibjobs['biaspdark']
1✔
869
        else:
870
            dependency = calibjobs['linkcal']
1✔
871
    elif prow['JOBDESC'] == 'biaspdark':
1✔
872
        dependency = calibjobs['linkcal']
1✔
873
    elif prow['JOBDESC'] == 'linkcal' and refnight is not None:
1✔
874
        dependency = None
1✔
875
        ## For link cals only, enable cross-night dependencies if available
876
        refproctable = findfile('proctable', night=refnight)
1✔
877
        if os.path.exists(refproctable):
1✔
878
            ptab = load_table(tablename=refproctable, tabletype='proctable')
×
879
            ## This isn't perfect because we may depend on jobs that aren't
880
            ## actually being linked
881
            ## Also allows us to proceed even if jobs don't exist yet
882
            deps, proccamwords = [], []
×
883
            #for job in ['nightlybias', 'ccdcalib', 'psfnight', 'nightlyflat']:
884
            if include_files is not None:
×
885
                for filename in include_files:
×
886
                    job = filename_to_jobname(filename)
×
887
                    if job in ptab['JOBDESC']:
×
888
                        ## add prow to dependencies
889
                        deprow = ptab[ptab['JOBDESC']==job][0]
×
890
                        deps.append(deprow)
×
891
                        proccamwords.append(deprow['PROCCAMWORD'])
×
892
                    elif 'linkcal' in ptab['JOBDESC']:
×
893
                        linkcalprow = ptab[ptab['JOBDESC']=='linkcal'][0]
×
894
                        deps.append(linkcalprow)
×
895
                        proccamwords.append(linkcalprow['PROCCAMWORD'])
×
896
            if len(deps) > 0:
×
897
                dependency = np.unique(deps)
×
898
            ## The proccamword for the linking job is the largest set available from the reference night
899
            ## but restricting back to those requested for the current night, if fewer cameras are available
900
            if len(proccamwords) > 0:
×
901
                prow['PROCCAMWORD'] = camword_intersection([prow['PROCCAMWORD'], camword_union(proccamwords)])
×
902
    else:
903
        dependency = None
1✔
904

905
    prow = assign_dependency(prow, dependency)
1✔
906

907
    return prow
1✔
908

909
def filename_to_jobname(filename):
1✔
910
    """
911
    Convert a filename to the job name it corresponds to.
912
    Example filenames include: 'biasnight', 'darknight', 'badcolumns', 'ctecorrnight', 'psfnight', 'fiberflatnight
913
    Args:
914
        filename, str. The name of the file to convert.
915

916
    Returns:
917
        str: The job name corresponding to the input filename.
918
    """
919
    if filename.startswith('biasnight'):
×
920
        return 'biaspdark'
×
921
    elif filename.startswith('darknight'):
×
922
        return 'ccdcalib'
×
923
    elif filename.startswith('badcolumns'):
×
924
        return 'ccdcalib'
×
925
    elif filename.startswith('ctecorrnight'):
×
926
        return 'ccdcalib'
×
927
    if filename.startswith('psfnight'):
×
928
        return 'psfnight'
×
929
    elif filename.startswith('fiberflatnight'):
×
930
        return 'nightlyflat'
×
931
    else:
932
        return 'tilenight'
×
933

934

935
def assign_dependency(prow, dependency):
1✔
936
    """
937
    Given input processing row and possible arcjob (processing row for psfnight) and flatjob (processing row for
938
    nightlyflat), this defines the JOBDESC keyword and assigns the dependency appropriate for the job type of prow.
939

940
    Args:
941
        prow, Table.Row or dict. Must include keyword accessible definitions for 'OBSTYPE'. A row must have column names for
942
            'JOBDESC', 'INT_DEP_IDS', and 'LATEST_DEP_ID'.
943
        dependency, NoneType or scalar/list/array of Table.Row, dict. Processing row corresponding to the required input
944
            for the job in prow. This must contain keyword
945
            accessible values for 'INTID', and 'LATEST_QID'.
946
            If None, it assumes the dependency doesn't exist
947
            and no dependency is assigned.
948

949
    Returns:
950
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
951
        'JOBDESC', 'INT_DEP_IDS'. and 'LATEST_DEP_ID'.
952

953
    Note:
954
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
955
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
956
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
957
    """
958
    prow['INT_DEP_IDS'] = np.ndarray(shape=0).astype(int)
1✔
959
    prow['LATEST_DEP_QID'] = np.ndarray(shape=0).astype(int)
1✔
960
    if dependency is not None:
1✔
961
        if type(dependency) in [list, np.array]:
1✔
962
            ids, qids = [], []
1✔
963
            for curdep in dependency:
1✔
964
                ids.append(curdep['INTID'])
1✔
965
                if still_a_dependency(curdep):
1✔
966
                    # ids.append(curdep['INTID'])
967
                    qids.append(curdep['LATEST_QID'])
1✔
968
            prow['INT_DEP_IDS'] = np.array(ids, dtype=int)
1✔
969
            prow['LATEST_DEP_QID'] = np.array(qids, dtype=int)
1✔
970
        elif type(dependency) in [dict, OrderedDict, Table.Row]:
1✔
971
            prow['INT_DEP_IDS'] = np.array([dependency['INTID']], dtype=int)
1✔
972
            if still_a_dependency(dependency):
1✔
973
                prow['LATEST_DEP_QID'] = np.array([dependency['LATEST_QID']], dtype=int)
1✔
974
    return prow
1✔
975

976
def still_a_dependency(dependency):
1✔
977
    """
978
    Defines the criteria for which a dependency is deemed complete (and therefore no longer a dependency).
979

980
     Args:
981
        dependency, Table.Row or dict. Processing row corresponding to the required input for the job in prow.
982
            This must contain keyword accessible values for 'STATUS', and 'LATEST_QID'.
983

984
    Returns:
985
        bool: False if the criteria indicate that the dependency is completed and no longer a blocking factor (ie no longer
986
        a genuine dependency). Returns True if the dependency is still a blocking factor such that the slurm
987
        scheduler needs to be aware of the pending job.
988

989
    """
990
    return dependency['LATEST_QID'] > 0 and dependency['STATUS'] != 'COMPLETED'
1✔
991

992
def get_type_and_tile(erow):
1✔
993
    """
994
    Trivial function to return the OBSTYPE and the TILEID from an exposure table row
995

996
    Args:
997
        erow, Table.Row or dict. Must contain 'OBSTYPE' and 'TILEID' as keywords.
998

999
    Returns:
1000
        tuple (str, str), corresponding to the OBSTYPE and TILEID values of the input erow.
1001
    """
1002
    return str(erow['OBSTYPE']).lower(), erow['TILEID']
×
1003

1004

1005
#############################################
1006
#########   Table manipulators   ############
1007
#############################################
1008
def parse_previous_tables(etable, ptable, night):
1✔
1009
    """
1010
    This takes in the exposure and processing tables and regenerates all the working memory variables needed for the
1011
    daily processing script.
1012

1013
    Used by the daily processing to define most of its state-ful variables into working memory.
1014
    If the processing table is empty, these are simply declared and returned for use.
1015
    If the code had previously run and exited (or crashed), however, this will all the code to
1016
    re-establish itself by redefining these values.
1017

1018
    Args:
1019
        etable, Table, Exposure table of all exposures that have been dealt with thus far.
1020
        ptable, Table, Processing table of all exposures that have been processed.
1021
        night, str or int, the night the data was taken.
1022

1023
    Returns:
1024
        tuple: A tuple containing:
1025

1026
        * arcs, list of dicts, list of the individual arc jobs used for the psfnight (NOT all
1027
          the arcs, if multiple sets existed)
1028
        * flats, list of dicts, list of the individual flat jobs used for the nightlyflat (NOT
1029
          all the flats, if multiple sets existed)
1030
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
1031
          (if currently processing that tile)
1032
        * calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'badcol', 'psfnight'
1033
          and 'nightlyflat'. Each key corresponds to a Table.Row or
1034
          None. The table.Row() values are for the corresponding
1035
          calibration job.
1036
        * curtype, None, the obstype of the current job being run. Always None as first new job will define this.
1037
        * lasttype, str or None, the obstype of the last individual exposure row to be processed.
1038
        * curtile, None, the tileid of the current job (if science). Otherwise None. Always None as first
1039
          new job will define this.
1040
        * lasttile, str or None, the tileid of the last job (if science). Otherwise None.
1041
        * internal_id, int, an internal identifier unique to each job. Increments with each new job. This
1042
          is the latest unassigned value.
1043
    """
1044
    log = get_logger()
×
1045
    arcs, flats, sciences = [], [], []
×
1046
    calibjobs = {'nightlybias': None, 'ccdcalib': None, 'badcol': None,
×
1047
                 'psfnight': None, 'nightlyflat': None, 'linkcal': None,
1048
                 'accounted_for': dict()}
1049
    curtype,lasttype = None,None
×
1050
    curtile,lasttile = None,None
×
1051

1052
    if len(ptable) > 0:
×
1053
        prow = ptable[-1]
×
1054
        internal_id = int(prow['INTID'])+1
×
1055
        lasttype,lasttile = get_type_and_tile(ptable[-1])
×
1056
        jobtypes = ptable['JOBDESC']
×
1057

1058
        if 'nightlybias' in jobtypes:
×
1059
            calibjobs['nightlybias'] = table_row_to_dict(ptable[jobtypes=='nightlybias'][0])
×
1060
            log.info("Located nightlybias job in exposure table: {}".format(calibjobs['nightlybias']))
×
1061

1062
        if 'ccdcalib' in jobtypes:
×
1063
            calibjobs['ccdcalib'] = table_row_to_dict(ptable[jobtypes=='ccdcalib'][0])
×
1064
            log.info("Located ccdcalib job in exposure table: {}".format(calibjobs['ccdcalib']))
×
1065

1066
        if 'psfnight' in jobtypes:
×
1067
            calibjobs['psfnight'] = table_row_to_dict(ptable[jobtypes=='psfnight'][0])
×
1068
            log.info("Located joint fit psfnight job in exposure table: {}".format(calibjobs['psfnight']))
×
1069
        elif lasttype == 'arc':
×
1070
            seqnum = 10
×
1071
            for row in ptable[::-1]:
×
1072
                erow = etable[etable['EXPID']==row['EXPID'][0]]
×
1073
                if row['OBSTYPE'].lower() == 'arc' and int(erow['SEQNUM'])<seqnum:
×
1074
                    arcs.append(table_row_to_dict(row))
×
1075
                    seqnum = int(erow['SEQNUM'])
×
1076
                else:
1077
                    break
×
1078
            ## Because we work backword to fill in, we need to reverse them to get chronological order back
1079
            arcs = arcs[::-1]
×
1080

1081
        if 'nightlyflat' in jobtypes:
×
1082
            calibjobs['nightlyflat'] = table_row_to_dict(ptable[jobtypes=='nightlyflat'][0])
×
1083
            log.info("Located joint fit nightlyflat job in exposure table: {}".format(calibjobs['nightlyflat']))
×
1084
        elif lasttype == 'flat':
×
1085
            for row in ptable[::-1]:
×
1086
                erow = etable[etable['EXPID']==row['EXPID'][0]]
×
1087
                if row['OBSTYPE'].lower() == 'flat' and int(erow['SEQTOT']) < 5:
×
1088
                    if float(erow['EXPTIME']) > 100.:
×
1089
                        flats.append(table_row_to_dict(row))
×
1090
                else:
1091
                    break
×
1092
            flats = flats[::-1]
×
1093

1094
        if lasttype.lower() == 'science':
×
1095
            for row in ptable[::-1]:
×
1096
                if row['OBSTYPE'].lower() == 'science' and row['TILEID'] == lasttile and \
×
1097
                   row['JOBDESC'] == 'prestdstar' and row['LASTSTEP'] != 'skysub':
1098
                    sciences.append(table_row_to_dict(row))
×
1099
                else:
1100
                    break
×
1101
            sciences = sciences[::-1]
×
1102
    else:
1103
        internal_id = night_to_starting_iid(night)
×
1104

1105
    return arcs,flats,sciences, \
×
1106
           calibjobs, \
1107
           curtype, lasttype, \
1108
           curtile, lasttile,\
1109
           internal_id
1110

1111
def generate_calibration_dict(ptable, files_to_link=None):
1✔
1112
    """
1113
    This takes in a processing table and regenerates the working memory calibration
1114
    dictionary for dependency tracking. Used by the daily processing to define
1115
    most of its state-ful variables into working memory.
1116
    If the processing table is empty, these are simply declared and returned for use.
1117
    If the code had previously run and exited (or crashed), however, this will all the code to
1118
    re-establish itself by redefining these values.
1119

1120
    Args:
1121
        ptable, Table, Processing table of all exposures that have been processed.
1122
        files_to_link, set, Set of filenames that the linkcal job will link.
1123

1124
    Returns:
1125
        calibjobs, dict. Dictionary containing 'nightlybias', 'badcol', 'ccdcalib',
1126
            'psfnight', 'nightlyflat', 'linkcal', and 'completed'. Each key corresponds to a
1127
            Table.Row or None. The table.Row() values are for the corresponding
1128
            calibration job.
1129
    """
1130
    log = get_logger()
1✔
1131
    job_to_file_map = get_jobdesc_to_file_map()
1✔
1132
    accounted_for = {'biasnight': False, 'darknight':False, 'badcolumns': False,
1✔
1133
                     'ctecorrnight': False, 'psfnight': False,
1134
                     'fiberflatnight': False}
1135
    calibjobs = {'biasnight': None, 'biaspdark': None, 'ccdcalib': None, #'badcol': None,
1✔
1136
                 'psfnight': None, 'nightlyflat': None, 'linkcal': None}
1137

1138
    ptable_jobtypes = ptable['JOBDESC']
1✔
1139

1140
    for jobtype in calibjobs.keys():
1✔
1141
        if jobtype in ptable_jobtypes:
1✔
1142
            calibjobs[jobtype] = table_row_to_dict(ptable[ptable_jobtypes==jobtype][0])
1✔
1143
            log.info(f"Located {jobtype} job in processing table: {calibjobs[jobtype]}")
1✔
1144
            if jobtype in ['linkcal', 'biaspdark', 'ccdcalib'] and files_to_link is None:
1✔
1145
                night = int(ptable['NIGHT'][0]) ## jobtypes not empty, so has 1 or more rows
1✔
1146
                override_pathname = findfile('override', night=night, readonly=True)
1✔
1147
                overrides = load_override_file(filepathname=override_pathname)
1✔
1148
                cal_override = {}
1✔
1149
                if 'calibration' in overrides:
1✔
1150
                    cal_override = overrides['calibration']
1✔
1151
                orig_files_to_link = files_to_link
1✔
1152
                ## Determine calibrations that will be linked
1153
                if 'linkcal' in cal_override:
1✔
1154
                    files_to_link, files_not_linked = None, None
1✔
1155
                    if 'include' in cal_override['linkcal']:
1✔
1156
                        files_to_link = cal_override['linkcal']['include']
1✔
1157
                    if 'exclude' in cal_override['linkcal']:
1✔
1158
                        files_not_linked = cal_override['linkcal']['exclude']
×
1159
                    files_to_link, files_not_linked = derive_include_exclude(files_to_link,
1✔
1160
                                                                 files_not_linked)
1161
                    warn = f"linkcal job exists but no files given: {orig_files_to_link=}. " \
1✔
1162
                           + f"Used {override_pathname} to identify the following " \
1163
                           + f"linked files: {files_to_link}"
1164
                    log.warning(warn)
1✔
1165

1166
            if jobtype == 'linkcal':
1✔
1167
                if files_to_link is not None and len(files_to_link) > 0:
1✔
1168
                    log.info(f"Assuming existing linkcal job processed "
1✔
1169
                             + f"{files_to_link} since given in override file.")
1170
                    accounted_for = update_accounted_for_with_linking(accounted_for,
1✔
1171
                                                                  files_to_link)
1172
                else:
1173
                    err = f"linkcal job exists but no files given: {files_to_link=}"
×
1174
                    log.error(err)
×
1175
                    raise ValueError(err)
×
1176
            elif jobtype in ['biaspdark', 'ccdcalib']:
1✔
1177
                ## These are multi-step jobs, so assume what was processed
1178
                ## based on linking
1179
                if jobtype == 'biaspdark':
1✔
1180
                    ## we don't care about preproc_for_dark since we can still
1181
                    ## make a darknight without darks specifically from tonight
1182
                    possible_files = set(['biasnight'])
1✔
1183
                else:
1184
                    possible_files = set(['darknight', 'badcolumns', 'ctecorrnight'])
×
1185

1186
                if files_to_link is None:
1✔
UNCOV
1187
                    files_accounted_for = possible_files
×
1188
                else:
1189
                    files_accounted_for = possible_files.difference(files_to_link)
1✔
1190
                    files_linked = possible_files.intersection(files_to_link)
1✔
1191
                    log.info(f"Assuming existing {jobtype} job processed "
1✔
1192
                             + f"{files_accounted_for} since {files_linked} "
1193
                             + f"are linked.")
1194
                for fil in files_accounted_for:
1✔
1195
                    accounted_for[fil] = True
1✔
1196
            else:
1197
                accounted_for[job_to_file_map[jobtype]] = True
1✔
1198

1199
    calibjobs['accounted_for'] = accounted_for
1✔
1200
    return calibjobs
1✔
1201

1202
def update_accounted_for_with_linking(accounted_for, files_to_link):
1✔
1203
    """
1204
    This takes in a dictionary summarizing the calibration files accounted for
1205
     and updates it based on the files_to_link, which are assumed to have
1206
     already been linked such that those files already exist on disk and
1207
     don't need ot be generated.
1208

1209
    Parameters
1210
    ----------
1211
        accounted_for: dict
1212
            Dictionary containing 'biasnight', 'badcolumns', 'ctecorrnight',
1213
            'psfnight', and 'fiberflatnight'. Each value is True if file is
1214
            accounted for and False if it is not.
1215
        files_to_link: set
1216
            Set of filenames that the linkcal job will link.
1217

1218
    Returns
1219
    -------
1220
        accounted_for: dict
1221
            Dictionary containing 'biasnight', 'badcolumns', 'ctecorrnight',
1222
            'psfnight', and 'fiberflatnight'. Each value is True if file is
1223
            accounted for and False if it is not.
1224
    """
1225
    log = get_logger()
1✔
1226

1227
    for fil in files_to_link:
1✔
1228
        if fil in accounted_for:
1✔
1229
            accounted_for[fil] = True
1✔
1230
        else:
1231
            err = f"{fil} doesn't match an expected filetype: "
×
1232
            err += f"{accounted_for.keys()}"
×
1233
            log.error(err)
×
1234
            raise ValueError(err)
×
1235

1236
    return accounted_for
1✔
1237

1238
def all_calibs_submitted(accounted_for, do_cte_flats=True, do_darknight=True):
1✔
1239
    """
1240
    Function that returns the boolean logic to determine if the necessary
1241
    calibration jobs have been submitted for calibration.
1242

1243
    Args:
1244
        accounted_for, dict, Dictionary with keys corresponding to the calibration
1245
            filenames and values of True or False.
1246
        do_cte_flats, bool, whether ctecorrnight files are expected or not.
1247
        do_darknight, bool, whether darknight files are expected or not.
1248

1249
    Returns:
1250
        bool, True if all necessary calibrations have been submitted or handled, False otherwise.
1251
    """
1252
    test_dict = accounted_for.copy()
1✔
1253
    if not do_cte_flats:
1✔
1254
        test_dict.pop('ctecorrnight')
1✔
1255
    if not do_darknight:
1✔
1256
        test_dict.pop('darknight')
1✔
1257

1258
    return np.all(list(test_dict.values()))
1✔
1259

1260
def update_and_recursively_submit(proc_table, submits=0, max_resubs=100,
1✔
1261
                                  resubmission_states=None,
1262
                                  no_resub_failed=False, ptab_name=None,
1263
                                  dry_run_level=0, reservation=None,
1264
                                  expids=None, tileids=None):
1265
    """
1266
    Given an processing table, this loops over job rows and resubmits failed jobs (as defined by resubmission_states).
1267
    Before submitting a job, it checks the dependencies for failures. If a dependency needs to be resubmitted, it recursively
1268
    follows dependencies until it finds the first job without a failed dependency and resubmits that. Then resubmits the
1269
    other jobs with the new Slurm jobID's for proper dependency coordination within Slurm.
1270

1271
    Args:
1272
        proc_table, Table, the processing table with a row per job.
1273
        submits, int, the number of submissions made to the queue. Used for saving files and in not overloading the scheduler.
1274
        max_resubs, int, the number of times a job should be resubmitted before giving up. Default is very high at 100.
1275
        resubmission_states, list or array of strings, each element should be a capitalized string corresponding to a
1276
            possible Slurm scheduler state, where you wish for jobs with that
1277
            outcome to be resubmitted
1278
        no_resub_failed: bool. Set to True if you do NOT want to resubmit
1279
            jobs with Slurm status 'FAILED' by default. Default is False.
1280
        ptab_name, str, the full pathname where the processing table should be saved.
1281
        dry_run_level (int, optional): If nonzero, this is a simulated run. Default is 0.
1282
            0 which runs the code normally.
1283
            1 writes all files but doesn't submit any jobs to Slurm.
1284
            2 writes tables but doesn't write scripts or submit anything.
1285
            3 Doesn't write or submit anything but queries Slurm normally for job status.
1286
            4 Doesn't write, submit jobs, or query Slurm.
1287
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
1288
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
1289
        expids: list of ints. The exposure ids to resubmit (along with the jobs they depend on).
1290
        tileids: list of ints. The tile ids to resubmit (along with the jobs they depend on).
1291

1292
    Returns:
1293
        tuple: A tuple containing:
1294

1295
        * proc_table: Table, a table with the same rows as the input except that Slurm and jobid relevant columns have
1296
          been updated for those jobs that needed to be resubmitted.
1297
        * submits: int, the number of submissions made to the queue. This is incremented from the input submits, so it is
1298
          the number of submissions made from this function call plus the input submits value.
1299

1300
    Note:
1301
        This modifies the inputs of both proc_table and submits and returns them.
1302
    """
1303
    log = get_logger()
1✔
1304
    if tileids is not None and expids is not None:
1✔
UNCOV
1305
        msg = f"Provided both expids and tilesids. Please only provide one."
×
1306
        log.critical(msg)
×
1307
        raise AssertionError(msg)
1308
    elif tileids is not None:
1✔
1309
        msg = f"Only resubmitting the following tileids and the jobs they depend on: {tileids=}"
×
1310
        log.info(msg)
×
1311
    elif expids is not None:
1✔
UNCOV
1312
        msg = f"Only resubmitting the following expids and the jobs they depend on: {expids=}"
×
UNCOV
1313
        log.info(msg)
×
1314

1315
    if resubmission_states is None:
1✔
1316
        resubmission_states = get_resubmission_states(no_resub_failed=no_resub_failed)
1✔
1317

1318
    log.info(f"Resubmitting jobs with current states in the following: {resubmission_states}")
1✔
1319
    proc_table = update_from_queue(proc_table, dry_run_level=dry_run_level)
1✔
1320

1321
    log.info("Updated processing table queue information:")
1✔
1322
    cols = ['INTID', 'INT_DEP_IDS', 'EXPID', 'TILEID',
1✔
1323
            'OBSTYPE', 'JOBDESC', 'LATEST_QID', 'STATUS']
1324
    log.info(np.array(cols))
1✔
1325
    for row in proc_table:
1✔
1326
        log.info(np.array(row[cols]))
1✔
1327

1328
    ## If expids or tileids are given, subselect to the processing table rows
1329
    ## that included those exposures or tiles otherwise just list all indices
1330
    ## NOTE: Other rows can still be submitted if the selected rows depend on them
1331
    ## we hand the entire table to recursive_submit_failed(), which will walk the
1332
    ## entire dependency tree as necessary.
1333
    if expids is not None:
1✔
UNCOV
1334
        select_ptab_rows = np.where([np.any(np.isin(prow_eids, expids)) for prow_eids in proc_table['EXPID']])[0]
×
1335
    elif tileids is not None:
1✔
UNCOV
1336
        select_ptab_rows = np.where(np.isin(proc_table['TILEID'], tileids))[0]
×
1337
    else:
1338
        select_ptab_rows = np.arange(len(proc_table))
1✔
1339

1340
    log.info("\n")
1✔
1341
    id_to_row_map = {row['INTID']: rown for rown, row in enumerate(proc_table)}
1✔
1342
    ## Loop over all requested rows and resubmit those that have failed
1343
    for rown in select_ptab_rows:
1✔
1344
        if proc_table['STATUS'][rown] in resubmission_states:
1✔
1345
            proc_table, submits = recursive_submit_failed(rown=rown, proc_table=proc_table,
1✔
1346
                                                          submits=submits, max_resubs=max_resubs,
1347
                                                          id_to_row_map=id_to_row_map,
1348
                                                          ptab_name=ptab_name,
1349
                                                          resubmission_states=resubmission_states,
1350
                                                          reservation=reservation,
1351
                                                          dry_run_level=dry_run_level)
1352

1353
    proc_table = update_from_queue(proc_table, dry_run_level=dry_run_level)
1✔
1354

1355
    return proc_table, submits
1✔
1356

1357
def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, max_resubs=100, ptab_name=None,
1✔
1358
                            resubmission_states=None, reservation=None, dry_run_level=0):
1359
    """
1360
    Given a row of a processing table and the full processing table, this resubmits the given job.
1361
    Before submitting a job, it checks the dependencies for failures in the processing table. If a dependency needs to
1362
    be resubmitted, it recursively follows dependencies until it finds the first job without a failed dependency and
1363
    resubmits that. Then resubmits the other jobs with the new Slurm jobID's for proper dependency coordination within Slurm.
1364

1365
    Args:
1366
        rown, Table.Row, the row of the processing table that you want to resubmit.
1367
        proc_table, Table, the processing table with a row per job.
1368
        submits, int, the number of submissions made to the queue. Used for saving files and in not overloading the scheduler.
1369
        id_to_row_map, dict, lookup dictionary where the keys are internal ids (INTID's) and the values are the row position
1370
            in the processing table.
1371
        max_resubs, int, the number of times a job should be resubmitted before giving up. Default is very high at 100.
1372
        ptab_name, str, the full pathname where the processing table should be saved.
1373
        resubmission_states, list or array of strings, each element should be a capitalized string corresponding to a
1374
            possible Slurm scheduler state, where you wish for jobs with that
1375
            outcome to be resubmitted
1376
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
1377
        dry_run_level (int, optional): If nonzero, this is a simulated run. Default is 0.
1378
            0 which runs the code normally.
1379
            1 writes all files but doesn't submit any jobs to Slurm.
1380
            2 writes tables but doesn't write scripts or submit anything.
1381
            3 Doesn't write or submit anything but queries Slurm normally for job status.
1382
            4 Doesn't write, submit jobs, or query Slurm.
1383
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
1384

1385
    Returns:
1386
        tuple: A tuple containing:
1387

1388
        * proc_table: Table, a table with the same rows as the input except that Slurm and jobid relevant columns have
1389
          been updated for those jobs that needed to be resubmitted.
1390
        * submits: int, the number of submissions made to the queue. This is incremented from the input submits, so it is
1391
          the number of submissions made from this function call plus the input submits value.
1392

1393
    Note:
1394
        This modifies the inputs of both proc_table and submits and returns them.
1395
    """
1396
    log = get_logger()
1✔
1397
    row = proc_table[rown]
1✔
1398
    log.info(f"Identified row {row['INTID']} as needing resubmission.")
1✔
1399
    log.info(f"\t{row['INTID']}: Tileid={row['TILEID']}, Expid(s)={row['EXPID']}, Jobdesc={row['JOBDESC']}")
1✔
1400
    if len(proc_table['ALL_QIDS'][rown]) > max_resubs:
1✔
1401
        log.warning(f"Tileid={row['TILEID']}, Expid(s)={row['EXPID']}, "
×
1402
                    + f"Jobdesc={row['JOBDESC']} has already been submitted "
1403
                    + f"{max_resubs+1} times. Not resubmitting.")
1404
        proc_table['STATUS'][rown] = "MAX_RESUB"
×
UNCOV
1405
        return proc_table, submits
×
1406
    if resubmission_states is None:
1✔
1407
        resubmission_states = get_resubmission_states()
×
1408
    ideps = proc_table['INT_DEP_IDS'][rown]
1✔
1409
    if ideps is None or len(ideps)==0:
1✔
UNCOV
1410
        proc_table['LATEST_DEP_QID'][rown] = np.ndarray(shape=0).astype(int)
×
1411
    else:
1412
        all_valid_states = list(resubmission_states.copy())
1✔
1413
        good_states = ['RUNNING','PENDING','SUBMITTED','COMPLETED']
1✔
1414
        all_valid_states.extend(good_states)
1✔
1415
        othernight_idep_row_lookup = {}
1✔
1416
        for idep in np.sort(np.atleast_1d(ideps)):
1✔
1417
            if idep not in id_to_row_map:
1✔
1418
                if idep // 1000 != row['INTID'] // 1000:
1✔
1419
                    log.debug("Internal ID: %d not in id_to_row_map. "
1✔
1420
                             + "This is expected since it is from another day. ", idep)
1421
                    reference_night = 20000000 + (idep // 1000)
1✔
1422
                    reftab = read_minimal_full_proctab_cols(nights=[reference_night])
1✔
1423
                    if reftab is None:
1✔
UNCOV
1424
                        msg = f"The dependency is from night={reference_night}" \
×
1425
                              + f" but read_minimal_full_proctab_cols couldn't" \
1426
                              + f" locate that processing table, this is a " \
1427
                              +  f"fatal error."
UNCOV
1428
                        log.critical(msg)
×
UNCOV
1429
                        raise ValueError(msg)
×
1430
                    reftab = update_from_queue(reftab, dry_run_level=dry_run_level)
1✔
1431
                    entry = reftab[reftab['INTID'] == idep][0]
1✔
1432
                    if entry['STATUS'] not in good_states:
1✔
1433
                        msg = f"Internal ID: {idep} not in id_to_row_map. " \
1✔
1434
                              + f"Since the dependency is from night={reference_night} " \
1435
                              + f"and that job isn't in a good state this is an " \
1436
                              + f"error we can't overcome."
1437
                        log.error(msg)
1✔
1438
                        proc_table['STATUS'][rown] = "DEP_NOT_SUBD"
1✔
1439
                        return proc_table, submits
1✔
1440
                    else:
1441
                        ## otherwise if incomplete, just update the cache to use this
1442
                        ## in the next stage
1443
                        othernight_idep_row_lookup[idep] = entry
1✔
1444
                        update_full_ptab_cache(reftab)
1✔
1445
                else:
1446
                    msg = f"Internal ID: {idep} not in id_to_row_map. " \
×
1447
                         + f"Since the dependency is from the same night" \
1448
                         + f" and we can't find it, this is a fatal error."
1449
                    log.critical(msg)
×
UNCOV
1450
                    raise ValueError(msg)
×
1451
            elif proc_table['STATUS'][id_to_row_map[idep]] not in all_valid_states:
1✔
UNCOV
1452
                log.error(f"Proc INTID: {proc_table['INTID'][rown]} depended on" +
×
1453
                            f" INTID {proc_table['INTID'][id_to_row_map[idep]]}" +
1454
                            f" but that exposure has state" +
1455
                            f" {proc_table['STATUS'][id_to_row_map[idep]]} that" +
1456
                            f" isn't in the list of resubmission states." +
1457
                            f" Exiting this job's resubmission attempt.")
UNCOV
1458
                proc_table['STATUS'][rown] = "DEP_NOT_SUBD"
×
UNCOV
1459
                return proc_table, submits
×
1460
        qdeps = []
1✔
1461
        for idep in np.sort(np.atleast_1d(ideps)):
1✔
1462
            if idep in id_to_row_map:
1✔
1463
                if proc_table['STATUS'][id_to_row_map[idep]] in resubmission_states:
1✔
UNCOV
1464
                    proc_table, submits = recursive_submit_failed(id_to_row_map[idep],
×
1465
                                                                  proc_table, submits,
1466
                                                                  id_to_row_map,
1467
                                                                  reservation=reservation,
1468
                                                                  dry_run_level=dry_run_level)
1469
                ## Now that we've resubmitted the dependency if necessary,
1470
                ## add the most recent QID to the list assuming it isn't COMPLETED
1471
                if still_a_dependency(proc_table[id_to_row_map[idep]]):
1✔
1472
                    qdeps.append(proc_table['LATEST_QID'][id_to_row_map[idep]])
1✔
1473
                else:
UNCOV
1474
                    log.info(f"{idep} is COMPLETED. Not submitting as a dependency.")
×
1475
            else:
1476
                ## Since we verified above that the cross night QID is still
1477
                ## either pending or successful, add that to the list of QID's
1478
                if still_a_dependency(othernight_idep_row_lookup[idep]):
1✔
1479
                    qdeps.append(othernight_idep_row_lookup[idep]['LATEST_QID'])
1✔
1480
                else:
UNCOV
1481
                    log.info(f"{idep} is COMPLETED. Not submitting as a dependency.")
×
1482

1483
        qdeps = np.atleast_1d(qdeps)
1✔
1484
        proc_table['LATEST_DEP_QID'][rown] = qdeps
1✔
1485
        if len(qdeps) < len(ideps):
1✔
UNCOV
1486
            log.warning(f"Number of internal dependencies was {len(ideps)} but number "
×
1487
                        + f"of queue deps is {len(qdeps)} for Rown {rown}, ideps {ideps}."
1488
                        + " This is expected if the ideps were status=COMPLETED")
1489

1490
    proc_table[rown] = submit_batch_script(proc_table[rown], reservation=reservation,
1✔
1491
                                           strictly_successful=True, dry_run=dry_run_level)
1492
    submits += 1
1✔
1493

1494
    if dry_run_level < 3:
1✔
1495
        if ptab_name is None:
×
1496
            write_table(proc_table, tabletype='processing', overwrite=True)
×
1497
        else:
UNCOV
1498
            write_table(proc_table, tablename=ptab_name, overwrite=True)
×
UNCOV
1499
        sleep_and_report(0.1 + 0.1*(submits % 10 == 0),
×
1500
                         message_suffix=f"after submitting job to queue and writing proctable")
1501
    return proc_table, submits
1✔
1502

1503

1504
#########################################
1505
########     Joint fit     ##############
1506
#########################################
1507
def joint_fit(ptable, prows, internal_id, queue, reservation, descriptor, z_submit_types=None,
1✔
1508
              dry_run=0, strictly_successful=False, check_for_outputs=True, resubmit_partial_complete=True,
1509
              system_name=None):
1510
    """
1511
    DEPRECATED
1512
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1513
    joint fitting job given by descriptor. If the joint fitting job is standard star fitting, the post standard star fits
1514
    for all the individual exposures also created and submitted. The returned ptable has all of these rows added to the
1515
    table given as input.
1516

1517
    Args:
1518
        ptable (Table): The processing table where each row is a processed job.
1519
        prows (list or array of dict): The rows corresponding to the individual exposure jobs that are
1520
            inputs to the joint fit.
1521
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1522
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1523
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1524
        descriptor (str): Description of the joint fitting job. Can either be 'science' or 'stdstarfit', 'arc' or 'psfnight',
1525
            or 'flat' or 'nightlyflat'.
1526
        z_submit_types (list of str, optional): The "group" types of redshifts that should be submitted with each
1527
            exposure. If not specified or None, then no redshifts are submitted.
1528
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
1529
            0 which runs the code normally.
1530
            1 writes all files but doesn't submit any jobs to Slurm.
1531
            2 writes tables but doesn't write scripts or submit anything.
1532
            3 Doesn't write or submit anything but queries Slurm normally for job status.
1533
            4 Doesn't write, submit jobs, or query Slurm.
1534
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
1535
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1536
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1537
            than failing completely from failed calibrations. Default is False.
1538
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1539
            data products for the script being submitted. If all files exist and this is True,
1540
            then the script will not be submitted. If some files exist and this is True, only the
1541
            subset of the cameras without the final data products will be generated and submitted.
1542
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1543
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1544
            remaining cameras not found to exist.
1545
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1546

1547
    Returns:
1548
        tuple: A tuple containing:
1549

1550
        * ptable, Table. The same processing table as input except with added rows for the joint fit job and, in the case
1551
          of a stdstarfit, the poststdstar science exposure jobs.
1552
        * joint_prow, dict. Row of a processing table corresponding to the joint fit job.
1553
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1554
    """
UNCOV
1555
    log = get_logger()
×
1556
    if len(prows) < 1:
×
1557
        return ptable, None, internal_id
×
1558

1559
    if descriptor is None:
×
1560
        return ptable, None
×
1561
    elif descriptor == 'arc':
×
1562
        descriptor = 'psfnight'
×
1563
    elif descriptor == 'flat':
×
1564
        descriptor = 'nightlyflat'
×
UNCOV
1565
    elif descriptor == 'science':
×
1566
        if z_submit_types is None or len(z_submit_types) == 0:
×
1567
            descriptor = 'stdstarfit'
×
1568

1569
    if descriptor not in ['psfnight', 'nightlyflat', 'science','stdstarfit']:
×
1570
        return ptable, None, internal_id
×
1571

1572
    log.info(" ")
×
1573
    log.info(f"Joint fit criteria found. Running {descriptor}.\n")
×
1574

1575
    if descriptor == 'science':
×
1576
        joint_prow, internal_id = make_joint_prow(prows, descriptor='stdstarfit', internal_id=internal_id)
×
1577
    else:
UNCOV
1578
        joint_prow, internal_id = make_joint_prow(prows, descriptor=descriptor, internal_id=internal_id)
×
1579
    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1580
                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1581
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1582
    ptable.add_row(joint_prow)
×
1583

1584
    if descriptor in ['science','stdstarfit']:
×
1585
        if descriptor == 'science':
×
1586
            zprows = []
×
1587
        log.info(" ")
×
1588
        log.info(f"Submitting individual science exposures now that joint fitting of standard stars is submitted.\n")
×
1589
        for row in prows:
×
UNCOV
1590
            if row['LASTSTEP'] == 'stdstarfit':
×
UNCOV
1591
                continue
×
1592
            row['JOBDESC'] = 'poststdstar'
×
1593

1594
            # poststdstar job can't process cameras not included in its stdstar joint fit
1595
            stdcamword = joint_prow['PROCCAMWORD']
×
1596
            thiscamword = row['PROCCAMWORD']
×
1597
            proccamword = camword_intersection([stdcamword, thiscamword])
×
1598
            if proccamword != thiscamword:
×
1599
                dropcams = difference_camwords(thiscamword, proccamword)
×
UNCOV
1600
                assert dropcams != ''  #- i.e. if they differ, we should be dropping something
×
1601
                log.warning(f"Dropping exp {row['EXPID']} poststdstar cameras {dropcams} since they weren't included in stdstar fit {stdcamword}")
×
1602
                row['PROCCAMWORD'] = proccamword
×
1603

1604
            row['INTID'] = internal_id
×
1605
            internal_id += 1
×
UNCOV
1606
            row['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
×
UNCOV
1607
            row = assign_dependency(row, joint_prow)
×
1608
            row = create_and_submit(row, queue=queue, reservation=reservation, dry_run=dry_run,
×
1609
                                    strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1610
                                    resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
UNCOV
1611
            ptable.add_row(row)
×
UNCOV
1612
            if descriptor == 'science' and row['LASTSTEP'] == 'all':
×
1613
                zprows.append(row)
×
1614

1615
    ## Now run redshifts
UNCOV
1616
    if descriptor == 'science' and len(zprows) > 0 and z_submit_types is not None:
×
UNCOV
1617
        prow_selection = (  (ptable['OBSTYPE'] == 'science')
×
1618
                          & (ptable['LASTSTEP'] == 'all')
1619
                          & (ptable['JOBDESC'] == 'poststdstar')
1620
                          & (ptable['TILEID'] == int(zprows[0]['TILEID'])) )
UNCOV
1621
        nightly_zprows = []
×
1622
        if np.sum(prow_selection) == len(zprows):
×
1623
            nightly_zprows = zprows.copy()
×
1624
        else:
1625
            for prow in ptable[prow_selection]:
×
1626
                nightly_zprows.append(table_row_to_dict(prow))
×
1627

1628
        for zsubtype in z_submit_types:
×
1629
            if zsubtype == 'perexp':
×
1630
                for zprow in zprows:
×
1631
                    log.info(" ")
×
UNCOV
1632
                    log.info(f"Submitting redshift fit of type {zsubtype} for TILEID {zprow['TILEID']} and EXPID {zprow['EXPID']}.\n")
×
UNCOV
1633
                    joint_prow, internal_id = make_joint_prow([zprow], descriptor=zsubtype, internal_id=internal_id)
×
1634
                    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1635
                                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1636
                                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1637
                    ptable.add_row(joint_prow)
×
1638
            else:
1639
                log.info(" ")
×
1640
                log.info(f"Submitting joint redshift fits of type {zsubtype} for TILEID {nightly_zprows[0]['TILEID']}.")
×
1641
                expids = [prow['EXPID'][0] for prow in nightly_zprows]
×
UNCOV
1642
                log.info(f"Expids: {expids}.\n")
×
UNCOV
1643
                joint_prow, internal_id = make_joint_prow(nightly_zprows, descriptor=zsubtype, internal_id=internal_id)
×
1644
                joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1645
                                               strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1646
                                               resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1647
                ptable.add_row(joint_prow)
×
1648

UNCOV
1649
    if descriptor in ['psfnight', 'nightlyflat']:
×
1650
        log.info(f"Setting the calibration exposures as calibrators in the processing table.\n")
×
UNCOV
1651
        ptable = set_calibrator_flag(prows, ptable)
×
1652

UNCOV
1653
    return ptable, joint_prow, internal_id
×
1654

1655
def joint_cal_fit(descriptor, ptable, prows, internal_id, queue, reservation,
1✔
1656
                  dry_run=0, strictly_successful=False, check_for_outputs=True,
1657
                  resubmit_partial_complete=True, system_name=None):
1658
    """
1659
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1660
    joint fitting job given by descriptor. If the joint fitting job is standard star fitting, the post standard star fits
1661
    for all the individual exposures also created and submitted. The returned ptable has all of these rows added to the
1662
    table given as input.
1663

1664
    Args:
1665
        descriptor (str): Description of the joint fitting job. Can either be 'science' or 'stdstarfit', 'arc' or 'psfnight',
1666
            or 'flat' or 'nightlyflat'.
1667
        prows (list or array of dict): The rows corresponding to the individual exposure jobs that are
1668
            inputs to the joint fit.
1669
        ptable (Table): The processing table where each row is a processed job.
1670
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1671
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1672
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1673
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
1674
            0 which runs the code normally.
1675
            1 writes all files but doesn't submit any jobs to Slurm.
1676
            2 writes tables but doesn't write scripts or submit anything.
1677
            3 Doesn't write or submit anything but queries Slurm normally for job status.
1678
            4 Doesn't write, submit jobs, or query Slurm.
1679
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
1680
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1681
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1682
            than failing completely from failed calibrations. Default is False.
1683
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1684
            data products for the script being submitted. If all files exist and this is True,
1685
            then the script will not be submitted. If some files exist and this is True, only the
1686
            subset of the cameras without the final data products will be generated and submitted.
1687
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1688
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1689
            remaining cameras not found to exist.
1690
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1691

1692
    Returns:
1693
        tuple: A tuple containing:
1694

1695
        * ptable, Table. The same processing table as input except with added rows for the joint fit job and, in the case
1696
          of a stdstarfit, the poststdstar science exposure jobs.
1697
        * joint_prow, dict. Row of a processing table corresponding to the joint fit job.
1698
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1699
    """
UNCOV
1700
    log = get_logger()
×
1701
    if len(prows) < 1:
×
1702
        return ptable, None, internal_id
×
1703

1704
    if descriptor is None:
×
1705
        return ptable, None
×
1706
    elif descriptor == 'arc':
×
UNCOV
1707
        descriptor = 'psfnight'
×
1708
    elif descriptor == 'flat':
×
1709
        descriptor = 'nightlyflat'
×
1710

1711
    if descriptor not in ['psfnight', 'nightlyflat']:
×
1712
        return ptable, None, internal_id
×
1713

1714
    log.info(" ")
×
1715
    log.info(f"Joint fit criteria found. Running {descriptor}.\n")
×
1716

UNCOV
1717
    joint_prow, internal_id = make_joint_prow(prows, descriptor=descriptor, internal_id=internal_id)
×
1718
    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1719
                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1720
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1721
    ptable.add_row(joint_prow)
×
1722

UNCOV
1723
    if descriptor in ['psfnight', 'nightlyflat']:
×
1724
        log.info(f"Setting the calibration exposures as calibrators in the processing table.\n")
×
UNCOV
1725
        ptable = set_calibrator_flag(prows, ptable)
×
1726

UNCOV
1727
    return ptable, joint_prow, internal_id
×
1728

1729

1730
#########################################
1731
########     Redshifts     ##############
1732
#########################################
1733
def submit_redshifts(ptable, prows, tnight, internal_id, queue, reservation,
1✔
1734
              dry_run=0, strictly_successful=False,
1735
              check_for_outputs=True, resubmit_partial_complete=True,
1736
              z_submit_types=None, system_name=None):
1737
    """
1738
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1739
    tilenight job given by descriptor. The returned ptable has all of these rows added to the
1740
    table given as input.
1741

1742
    Args:
1743
        ptable (Table): The processing table where each row is a processed job.
1744
        prows (list or array of dict): Unsubmitted prestdstar jobs that are first steps in tilenight.
1745
        tnight (Table.Row): The processing table row of the tilenight job on which the redshifts depend.
1746
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1747
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1748
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1749
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
1750
            0 which runs the code normally.
1751
            1 writes all files but doesn't submit any jobs to Slurm.
1752
            2 writes tables but doesn't write scripts or submit anything.
1753
            3 Doesn't write or submit anything but queries Slurm normally for job status.
1754
            4 Doesn't write, submit jobs, or query Slurm.
1755
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
1756
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1757
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1758
            than failing completely from failed calibrations. Default is False.
1759
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1760
            data products for the script being submitted. If all files exist and this is True,
1761
            then the script will not be submitted. If some files exist and this is True, only the
1762
            subset of the cameras without the final data products will be generated and submitted.
1763
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1764
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1765
            remaining cameras not found to exist.
1766
        z_submit_types (list of str): The "group" types of redshifts that should be submitted with each
1767
            exposure. If not specified or None, then no redshifts are submitted.
1768
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1769

1770
    Returns:
1771
        tuple: A tuple containing:
1772

1773
        * ptable, Table. The same processing table as input except with added rows for the joint fit job.
1774
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1775
    """
1776
    log = get_logger()
1✔
1777
    if len(prows) < 1 or z_submit_types == None:
1✔
1778
        return ptable, internal_id
1✔
1779

1780
    log.info(" ")
1✔
1781
    log.info(f"Running redshifts.\n")
1✔
1782

1783
    ## Now run redshifts
1784
    zprows = []
1✔
1785
    for row in prows:
1✔
1786
        if row['LASTSTEP'] == 'all':
1✔
1787
            zprows.append(row)
1✔
1788

1789
    if len(zprows) > 0:
1✔
1790
        for zsubtype in z_submit_types:
1✔
1791
            log.info(" ")
1✔
1792
            log.info(f"Submitting joint redshift fits of type {zsubtype} for TILEID {zprows[0]['TILEID']}.")
1✔
1793
            if zsubtype == 'perexp':
1✔
1794
                for zprow in zprows:
×
1795
                    log.info(f"EXPID: {zprow['EXPID']}.\n")
×
UNCOV
1796
                    redshift_prow = make_redshift_prow([zprow], tnight, descriptor=zsubtype, internal_id=internal_id)
×
UNCOV
1797
                    internal_id += 1
×
1798
                    redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1799
                                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1800
                                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
UNCOV
1801
                    ptable.add_row(redshift_prow)
×
1802
            elif zsubtype == 'cumulative':
1✔
1803
                tileids = np.unique([prow['TILEID'] for prow in zprows])
1✔
1804
                if len(tileids) > 1:
1✔
UNCOV
1805
                    msg = f"Error, more than one tileid provided for cumulative redshift job: {tileids}"
×
UNCOV
1806
                    log.critical(msg)
×
1807
                    raise ValueError(msg)
×
1808
                nights = np.unique([prow['NIGHT'] for prow in zprows])
1✔
1809
                if len(nights) > 1:
1✔
UNCOV
1810
                    msg = f"Error, more than one night provided for cumulative redshift job: {nights}"
×
UNCOV
1811
                    log.critical(msg)
×
UNCOV
1812
                    raise ValueError(msg)
×
1813
                tileid, night = tileids[0], nights[0]
1✔
1814
                ## For cumulative redshifts, get any existing processing rows for tile
1815
                matched_prows = read_minimal_tilenight_proctab_cols(tileids=tileids)
1✔
1816
                ## Identify the processing rows that should be assigned as dependecies
1817
                ## tnight should be first such that the new job inherits the other metadata from it
1818
                tnights = [tnight]
1✔
1819
                if matched_prows is not None:
1✔
1820
                    matched_prows = matched_prows[matched_prows['NIGHT'] <= night]
1✔
1821
                    for prow in matched_prows:
1✔
1822
                        if prow['INTID'] != tnight['INTID']:
1✔
1823
                            tnights.append(prow)
1✔
1824
                log.info(f"Internal Processing IDs: {[prow['INTID'] for prow in tnights]}.\n")
1✔
1825
                ## Identify all exposures that should go into the fit
1826
                expids = [prow['EXPID'][0] for prow in zprows]
1✔
1827
                ## note we can actually get the full list of exposures, but for now
1828
                ## we'll stay consistent with old processing where we only list exposures
1829
                ## from the current night
1830
                ## For cumulative redshifts, get valid expids from exptables
1831
                #matched_erows = read_minimal_science_exptab_cols(tileids=tileids)
1832
                #matched_erows = matched_erows[matched_erows['NIGHT']<=night]
1833
                #expids = list(set([prow['EXPID'][0] for prow in zprows])+set(matched_erows['EXPID']))
1834
                log.info(f"Expids: {expids}.\n")
1✔
1835
                redshift_prow, internal_id = make_joint_prow(tnights, descriptor=zsubtype, internal_id=internal_id)
1✔
1836
                redshift_prow['EXPID'] = expids
1✔
1837
                redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
1✔
1838
                                               strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1839
                                               resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1840
                ptable.add_row(redshift_prow)
1✔
1841
            else: # pernight
1842
                expids = [prow['EXPID'][0] for prow in zprows]
×
1843
                log.info(f"Expids: {expids}.\n")
×
UNCOV
1844
                redshift_prow = make_redshift_prow(zprows, tnight, descriptor=zsubtype, internal_id=internal_id)
×
UNCOV
1845
                internal_id += 1
×
1846
                redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1847
                                               strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1848
                                               resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
UNCOV
1849
                ptable.add_row(redshift_prow)
×
1850

1851
    return ptable, internal_id
1✔
1852

1853
#########################################
1854
########     Tilenight     ##############
1855
#########################################
1856
def submit_tilenight(ptable, prows, calibjobs, internal_id, queue, reservation,
1✔
1857
              dry_run=0, strictly_successful=False, resubmit_partial_complete=True,
1858
              system_name=None, use_specter=False, extra_job_args=None):
1859
    """
1860
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1861
    tilenight job given by descriptor. The returned ptable has all of these rows added to the
1862
    table given as input.
1863

1864
    Args:
1865
        ptable (Table): The processing table where each row is a processed job.
1866
        prows (list or array of dict): Unsubmitted prestdstar jobs that are first steps in tilenight.
1867
        calibjobs (dict): Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
1868
            and 'nightlyflat'. Each key corresponds to a Table.Row or
1869
            None. The table.Row() values are for the corresponding
1870
            calibration job.
1871
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1872
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1873
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1874
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
1875
            0 which runs the code normally.
1876
            1 writes all files but doesn't submit any jobs to Slurm.
1877
            2 writes tables but doesn't write scripts or submit anything.
1878
            3 Doesn't write or submit anything but queries Slurm normally for job status.
1879
            4 Doesn't write, submit jobs, or query Slurm.
1880
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
1881
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1882
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1883
            than failing completely from failed calibrations. Default is False.
1884
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1885
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1886
            remaining cameras not found to exist.
1887
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1888
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
1889
        extra_job_args (dict): Dictionary with key-value pairs that specify additional
1890
            information used for a specific type of job. Examples include
1891
            laststeps for for tilenight, etc.
1892

1893
    Returns:
1894
        tuple: A tuple containing:
1895

1896
        * ptable, Table. The same processing table as input except with added rows for the joint fit job.
1897
        * tnight_prow, dict. Row of a processing table corresponding to the tilenight job.
1898
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1899
    """
1900
    log = get_logger()
1✔
1901
    if len(prows) < 1:
1✔
UNCOV
1902
        return ptable, None, internal_id
×
1903

1904
    log.info(" ")
1✔
1905
    log.info(f"Running tilenight.\n")
1✔
1906

1907
    tnight_prow = make_tnight_prow(prows, calibjobs, internal_id=internal_id)
1✔
1908
    internal_id += 1
1✔
1909
    tnight_prow = create_and_submit(tnight_prow, queue=queue, reservation=reservation, dry_run=dry_run,
1✔
1910
                                   strictly_successful=strictly_successful, check_for_outputs=False,
1911
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name,
1912
                                   use_specter=use_specter, extra_job_args=extra_job_args)
1913
    ptable.add_row(tnight_prow)
1✔
1914

1915
    return ptable, tnight_prow, internal_id
1✔
1916

1917
## wrapper functions for joint fitting
1918
def science_joint_fit(ptable, sciences, internal_id, queue='realtime', reservation=None,
1✔
1919
                      z_submit_types=None, dry_run=0, strictly_successful=False,
1920
                      check_for_outputs=True, resubmit_partial_complete=True,
1921
                      system_name=None):
1922
    """
1923
    Wrapper function for desiproc.workflow.processing.joint_fit specific to the stdstarfit joint fit and redshift fitting.
1924

1925
    All variables are the same except:
1926

1927
        Arg 'sciences' is mapped to the prows argument of joint_fit.
1928
        The joint_fit argument descriptor is pre-defined as 'science'.
1929
    """
UNCOV
1930
    return joint_fit(ptable=ptable, prows=sciences, internal_id=internal_id, queue=queue, reservation=reservation,
×
1931
                     descriptor='science', z_submit_types=z_submit_types, dry_run=dry_run,
1932
                     strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1933
                     resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1934

1935

1936
def flat_joint_fit(ptable, flats, internal_id, queue='realtime',
1✔
1937
                   reservation=None, dry_run=0, strictly_successful=False,
1938
                   check_for_outputs=True, resubmit_partial_complete=True,
1939
                   system_name=None):
1940
    """
1941
    Wrapper function for desiproc.workflow.processing.joint_fit specific to the nightlyflat joint fit.
1942

1943
    All variables are the same except:
1944

1945
        Arg 'flats' is mapped to the prows argument of joint_fit.
1946
        The joint_fit argument descriptor is pre-defined as 'nightlyflat'.
1947
    """
UNCOV
1948
    return joint_fit(ptable=ptable, prows=flats, internal_id=internal_id, queue=queue, reservation=reservation,
×
1949
                     descriptor='nightlyflat', dry_run=dry_run, strictly_successful=strictly_successful,
1950
                     check_for_outputs=check_for_outputs, resubmit_partial_complete=resubmit_partial_complete,
1951
                     system_name=system_name)
1952

1953

1954
def arc_joint_fit(ptable, arcs, internal_id, queue='realtime',
1✔
1955
                  reservation=None, dry_run=0, strictly_successful=False,
1956
                  check_for_outputs=True, resubmit_partial_complete=True,
1957
                  system_name=None):
1958
    """
1959
    Wrapper function for desiproc.workflow.processing.joint_fit specific to the psfnight joint fit.
1960

1961
    All variables are the same except:
1962

1963
        Arg 'arcs' is mapped to the prows argument of joint_fit.
1964
        The joint_fit argument descriptor is pre-defined as 'psfnight'.
1965
    """
UNCOV
1966
    return joint_fit(ptable=ptable, prows=arcs, internal_id=internal_id, queue=queue, reservation=reservation,
×
1967
                     descriptor='psfnight', dry_run=dry_run, strictly_successful=strictly_successful,
1968
                     check_for_outputs=check_for_outputs, resubmit_partial_complete=resubmit_partial_complete,
1969
                     system_name=system_name)
1970

1971
def make_joint_prow(prows, descriptor, internal_id):
1✔
1972
    """
1973
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
1974
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
1975
    input prows).
1976

1977
    Args:
1978
        prows, list or array of dicts. The rows corresponding to the individual exposure jobs that are
1979
            inputs to the joint fit.
1980
        descriptor, str. Description of the joint fitting job. Can either be 'stdstarfit', 'psfnight', or 'nightlyflat'.
1981
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1982

1983
    Returns:
1984
        dict: Row of a processing table corresponding to the joint fit job.
1985
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1986
    """
1987
    log = get_logger()
1✔
1988
    first_row = table_row_to_dict(prows[0])
1✔
1989
    joint_prow = first_row.copy()
1✔
1990

1991
    joint_prow['INTID'] = internal_id
1✔
1992
    internal_id += 1
1✔
1993
    joint_prow['JOBDESC'] = descriptor
1✔
1994
    joint_prow['LATEST_QID'] = -99
1✔
1995
    joint_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
1✔
1996
    joint_prow['SUBMIT_DATE'] = -99
1✔
1997
    joint_prow['STATUS'] = 'UNSUBMITTED'
1✔
1998
    joint_prow['SCRIPTNAME'] = ''
1✔
1999
    joint_prow['EXPID'] = np.unique(np.concatenate([currow['EXPID'] for currow in prows])).astype(int)
1✔
2000

2001
    ## Assign the PROCCAMWORD based on the descriptor and the input exposures
2002
    ## UPDATE 2024-04-24: badamps are now included in arc/flat joint fits,
2003
    ## so grab all PROCCAMWORDs instead of filtering out BADAMP cameras
2004
    ## For flats we want any camera that exists in all 12 exposures
2005
    ## For arcs we want any camera that exists in at least 3 exposures
2006
    pcamwords = [prow['PROCCAMWORD'] for prow in prows]
1✔
2007
    if descriptor in 'stdstarfit':
1✔
UNCOV
2008
        joint_prow['PROCCAMWORD'] = camword_union(pcamwords,
×
2009
                                                  full_spectros_only=True)
2010
    elif descriptor in ['pernight', 'cumulative']:
1✔
2011
        joint_prow['PROCCAMWORD'] = camword_union(pcamwords,
1✔
2012
                                                  full_spectros_only=False)
2013
    elif descriptor == 'nightlyflat':
1✔
2014
        joint_prow['PROCCAMWORD'] = camword_intersection(pcamwords,
1✔
2015
                                                         full_spectros_only=False)
2016
    elif descriptor == 'psfnight':
1✔
2017
        ## Count number of exposures each camera is present for
2018
        camcheck = {}
1✔
2019
        for camword in pcamwords:
1✔
2020
            for cam in decode_camword(camword):
1✔
2021
                if cam in camcheck:
1✔
2022
                    camcheck[cam] += 1
1✔
2023
                else:
2024
                    camcheck[cam] = 1
1✔
2025
        ## if exists in 3 or more exposures, then include it
2026
        goodcams = []
1✔
2027
        for cam,camcount in camcheck.items():
1✔
2028
            if camcount >= 3:
1✔
2029
                goodcams.append(cam)
1✔
2030
        joint_prow['PROCCAMWORD'] = create_camword(goodcams)
1✔
2031
    else:
UNCOV
2032
        log.warning("Warning asked to produce joint proc table row for unknown"
×
2033
                    + f" job description {descriptor}")
2034

2035
    joint_prow = assign_dependency(joint_prow, dependency=prows)
1✔
2036
    return joint_prow, internal_id
1✔
2037

2038
def make_exposure_prow(erow, int_id, calibjobs, jobdesc=None):
1✔
2039
    prow = erow_to_prow(erow)
1✔
2040
    prow['INTID'] = int_id
1✔
2041
    int_id += 1
1✔
2042
    if jobdesc is None:
1✔
2043
        prow['JOBDESC'] = prow['OBSTYPE']
1✔
2044
    else:
2045
        prow['JOBDESC'] = jobdesc
1✔
2046
    prow = define_and_assign_dependency(prow, calibjobs)
1✔
2047
    return prow, int_id
1✔
2048

2049
def make_tnight_prow(prows, calibjobs, internal_id):
1✔
2050
    """
2051
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
2052
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
2053
    input prows).
2054

2055
    Args:
2056
        prows, list or array of dicts. Unsumbitted rows corresponding to the individual prestdstar jobs that are
2057
            the first steps of tilenight.
2058
        calibjobs, dict. Dictionary containing keys that each corresponds to a Table.Row or
2059
            None, with each table.Row() value corresponding to a calibration job
2060
            on which the tilenight job depends.
2061
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
2062

2063
    Returns:
2064
        dict: Row of a processing table corresponding to the tilenight job.
2065
    """
2066
    first_row = table_row_to_dict(prows[0])
1✔
2067
    joint_prow = first_row.copy()
1✔
2068

2069
    joint_prow['INTID'] = internal_id
1✔
2070
    joint_prow['JOBDESC'] = 'tilenight'
1✔
2071
    joint_prow['LATEST_QID'] = -99
1✔
2072
    joint_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
1✔
2073
    joint_prow['SUBMIT_DATE'] = -99
1✔
2074
    joint_prow['STATUS'] = 'UNSUBMITTED'
1✔
2075
    joint_prow['SCRIPTNAME'] = ''
1✔
2076
    joint_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int)
1✔
2077

2078
    joint_prow = define_and_assign_dependency(joint_prow, calibjobs, use_tilenight=True)
1✔
2079

2080
    return joint_prow
1✔
2081

2082
def make_redshift_prow(prows, tnights, descriptor, internal_id):
1✔
2083
    """
2084
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
2085
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
2086
    input prows).
2087

2088
    Args:
2089
        prows, list or array of dicts. Unsumbitted rows corresponding to the individual prestdstar jobs that are
2090
            the first steps of tilenight.
2091
        tnights, list or array of Table.Row objects. Rows corresponding to the tilenight jobs on which the redshift job depends.
2092
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
2093

2094
    Returns:
2095
        dict: Row of a processing table corresponding to the tilenight jobs.
2096
    """
2097
    first_row = table_row_to_dict(prows[0])
×
2098
    redshift_prow = first_row.copy()
×
2099

2100
    redshift_prow['INTID'] = internal_id
×
2101
    redshift_prow['JOBDESC'] = descriptor
×
2102
    redshift_prow['LATEST_QID'] = -99
×
2103
    redshift_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
×
2104
    redshift_prow['SUBMIT_DATE'] = -99
×
UNCOV
2105
    redshift_prow['STATUS'] = 'UNSUBMITTED'
×
2106
    redshift_prow['SCRIPTNAME'] = ''
×
UNCOV
2107
    redshift_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int)
×
2108

UNCOV
2109
    redshift_prow = assign_dependency(redshift_prow,dependency=tnights)
×
2110

UNCOV
2111
    return redshift_prow
×
2112

2113
def check_darknight_deps_and_update_prow(prow, n_nights_before=None, n_nights_after=None,
1✔
2114
                                    proc_table_path=None):
2115
    """
2116
    Update the processing row with the darknight dependencies.
2117

2118
    Args:
2119
        prow (dict): Processing row to be updated.
2120
        n_nights_before (int, optional): Number of nights before the given night to include in the darknight calculation.
2121
        n_nights_after (int, optional): Number of nights after the given night to include in the darknight calculation.
2122
        proc_table_path (str): Path to the processing table files.
2123

2124
    Returns:
2125
        dict: If sufficient cameras returns updated processing row with darknight dependencies, else the input prow
2126
    """
2127
    log = get_logger()
1✔
2128
    refnight = prow['NIGHT']
1✔
2129

2130
    compdarkparser = compute_dark_parser()
1✔
2131
    options = ['--reference-night', str(refnight), '-c', 'b1', '-o', 'temp',
1✔
2132
               '--skip-camera-check', '--dont-search-filesystem']
2133
    if n_nights_before is not None:
1✔
UNCOV
2134
        options.extend(['--before', str(n_nights_before)])
×
2135
    if n_nights_after is not None:
1✔
UNCOV
2136
        options.extend(['--after', str(n_nights_after)])
×
2137
    compdarkargs = compdarkparser.parse_args(options)
1✔
2138

2139
    exptab_for_dark_night = get_stacked_dark_exposure_table(compdarkargs)
1✔
2140

2141
    ## AK: as of now we always have enough darks, we just don't know if they are viable, so this is wasted computation
2142
    ### First see if we have enough exposures for each camera to make a viable darknight
2143
    #camera_counter = {cam: 0 for cam in decode_camword(prow['PROCCAMWORD'])}
2144
    #for erow in exptab_for_dark_night:
2145
    #    for cam in decode_camword(erow_to_goodcamword(erow, suppress_logging=True, exclude_badamps=True)):
2146
    #        camera_counter[cam] += 1
2147
    #enough_darks = np.all([count >= compdarkargs.min_dark_exposures for count in camera_counter.values()])
2148
    #if not enough_darks:
2149
    #    log.critical("Requested to do darknights, but not all cameras have sufficienct darks:"
2150
    #                 + f" {camera_counter=}, min_dark_exposures={compdarkargs.min_dark_exposures}, "
2151
    #                 + f"{n_nights_before=}, {n_nights_after=}. Exiting without submitting ccdcalib job.")
2152
    #    return prow, enough_darks
2153

2154
    ## Since we have enough exposures, update the processing table row with dependencies
2155
    nights = np.unique(np.append(exptab_for_dark_night['NIGHT'].data, [refnight]))
1✔
2156
    dep_intids, dep_qids = [], []
1✔
2157
    for night in nights:
1✔
2158
        nightly_expids = exptab_for_dark_night['EXPID'][exptab_for_dark_night['NIGHT'] == night].data
1✔
2159

2160
        ## Load in the files defined above
2161
        proc_table_pathname = findfile('processing_table', night=night, readonly=True)
1✔
2162
        if proc_table_path is not None:
1✔
2163
            proc_table_pathname = os.path.join(proc_table_path, os.path.basename(proc_table_pathname))
1✔
2164

2165
        ptable = load_table(tablename=proc_table_pathname, tabletype='proctable', suppress_logging=True)
1✔
2166
        if len(ptable) == 0:
1✔
2167
            log.error(f"Expected bias and/or pdark processing on {night=} for expids={nightly_expids}, but didn't find a table. Continuing")
1✔
2168
            continue
1✔
2169
        else:
UNCOV
2170
            biasdarks = ptable[np.isin(ptable['JOBDESC'].data, np.asarray([b'biasnight', b'biaspdark', b'pdark']))]
×
UNCOV
2171
            if len(biasdarks) == 0:
×
UNCOV
2172
                log.error(f"Expected biasnight, biaspdark, or pdark processing on {night=} for expids={nightly_expids}, but didn't find any "
×
2173
                          + f"entries: jobdescs={ptable['JOBDESC'].data}, expids={ptable['EXPID'].data},"
2174
                          + f"obstypes={ptable['OBSTYPE'].data}. Continuing anyway")
UNCOV
2175
            for biasdark in biasdarks:
×
2176
                ## if bias for the reference night or if any darks in the job are used for
2177
                ## the darknight, we should depend on the job
UNCOV
2178
                if np.any(np.isin(biasdark['EXPID'].data, nightly_expids)) or (str(biasdark['JOBDESC']).startswith('bias') and night == refnight):
×
UNCOV
2179
                    dep_intids.append(biasdark['INTID'])
×
UNCOV
2180
                    dep_qids.append(biasdark['LATEST_QID'])
×
2181

2182
    if len(dep_intids) > 0:
1✔
2183
        ## Note originally used argsort so that INT_DEP_IDS order would correspond to
2184
        ## LATEST_DEP_QID order, but because we remove LATEST_DEP_QID's if completed,
2185
        ## these are not the same length. So now we just sort INT_DEP_IDS and leave
2186
        ## LATEST_DEP_QID as-is
UNCOV
2187
        prow['INT_DEP_IDS'] = np.sort(np.concatenate([prow['INT_DEP_IDS'], dep_intids]))
×
UNCOV
2188
        prow['LATEST_DEP_QID'] = np.concatenate([prow['LATEST_DEP_QID'], dep_qids])
×
2189
        ## LATEST_DEP_QID contains 1's for jobs that already had files completed outside
2190
        ## the normal pipeline. Ignore those otherwise Slurm gets confused
UNCOV
2191
        prow['LATEST_DEP_QID'] = prow['LATEST_DEP_QID'][prow['LATEST_DEP_QID']>1]
×
2192

2193
    return prow#, enough_darks
1✔
2194

2195
def checkfor_and_submit_joint_job(ptable, arcs, flats, sciences, calibjobs,
1✔
2196
                                  lasttype, internal_id, z_submit_types=None, dry_run=0,
2197
                                  queue='realtime', reservation=None, strictly_successful=False,
2198
                                  check_for_outputs=True, resubmit_partial_complete=True,
2199
                                  system_name=None):
2200
    """
2201
    Takes all the state-ful data from daily processing and determines whether a joint fit needs to be submitted. Places
2202
    the decision criteria into a single function for easier maintainability over time. These are separate from the
2203
    new standard manifest*.json method of indicating a calibration sequence is complete. That is checked independently
2204
    elsewhere and doesn't interact with this.
2205

2206
    Args:
2207
        ptable (Table): Processing table of all exposures that have been processed.
2208
        arcs (list of dict): list of the individual arc jobs to be used for the psfnight (NOT all
2209
            the arcs, if multiple sets existed). May be empty if none identified yet.
2210
        flats (list of dict): list of the individual flat jobs to be used for the nightlyflat (NOT
2211
            all the flats, if multiple sets existed). May be empty if none identified yet.
2212
        sciences (list of dict): list of the most recent individual prestdstar science exposures
2213
            (if currently processing that tile). May be empty if none identified yet.
2214
        calibjobs (dict): Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
2215
            and 'nightlyflat'. Each key corresponds to a Table.Row or
2216
            None. The table.Row() values are for the corresponding
2217
            calibration job.
2218
        lasttype (str or None): the obstype of the last individual exposure row to be processed.
2219
        internal_id (int): an internal identifier unique to each job. Increments with each new job. This
2220
            is the smallest unassigned value.
2221
        z_submit_types (list of str): The "group" types of redshifts that should be submitted with each
2222
            exposure. If not specified or None, then no redshifts are submitted.
2223
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
2224
            0 which runs the code normally.
2225
            1 writes all files but doesn't submit any jobs to Slurm.
2226
            2 writes tables but doesn't write scripts or submit anything.
2227
            3 Doesn't write or submit anything but queries Slurm normally for job status.
2228
            4 Doesn't write, submit jobs, or query Slurm.
2229
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
2230
        queue (str, optional): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
2231
        reservation (str, optional): The reservation to submit jobs to. If None, it is not submitted to a reservation.
2232
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
2233
            less desirable because e.g. the sciences can run with SVN default calibrations rather
2234
            than failing completely from failed calibrations. Default is False.
2235
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
2236
            data products for the script being submitted. If all files exist and this is True,
2237
            then the script will not be submitted. If some files exist and this is True, only the
2238
            subset of the cameras without the final data products will be generated and submitted.
2239
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
2240
            jobs with some prior data are pruned using PROCCAMWORD to only process the
2241
            remaining cameras not found to exist.
2242
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, permutter-gpu
2243

2244
    Returns:
2245
        tuple: A tuple containing:
2246

2247
        * ptable, Table, Processing table of all exposures that have been processed.
2248
        * calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
2249
          and 'nightlyflat'. Each key corresponds to a Table.Row or
2250
          None. The table.Row() values are for the corresponding
2251
          calibration job.
2252
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
2253
          (if currently processing that tile). May be empty if none identified yet or
2254
          we just submitted them for processing.
2255
        * internal_id, int, if no job is submitted, this is the same as the input, otherwise it is incremented upward from
2256
          from the input such that it represents the smallest unused ID.
2257
    """
2258
    if lasttype == 'science' and len(sciences) > 0:
×
2259
        log = get_logger()
×
2260
        skysubonly = np.array([sci['LASTSTEP'] == 'skysub' for sci in sciences])
×
2261
        if np.all(skysubonly):
×
UNCOV
2262
            log.error("Identified all exposures in joint fitting request as skysub-only. Not submitting")
×
2263
            sciences = []
×
2264
            return ptable, calibjobs, sciences, internal_id
×
2265

2266
        if np.any(skysubonly):
×
2267
            log.error("Identified skysub-only exposures in joint fitting request")
×
2268
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
2269
            log.info("LASTSTEP's: {}".format([row['LASTSTEP'] for row in sciences]))
×
2270
            sciences = (np.array(sciences,dtype=object)[~skysubonly]).tolist()
×
UNCOV
2271
            log.info("Removed skysub only exposures in joint fitting:")
×
2272
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
2273
            log.info("LASTSTEP's: {}".format([row['LASTSTEP'] for row in sciences]))
×
2274

2275
        from collections import Counter
×
2276
        tiles = np.array([sci['TILEID'] for sci in sciences])
×
2277
        counts = Counter(tiles)
×
2278
        if len(counts.most_common()) > 1:
×
2279
            log.error("Identified more than one tile in a joint fitting request")
×
UNCOV
2280
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
UNCOV
2281
            log.info("Tileid's: {}".format(tiles))
×
UNCOV
2282
            log.info("Returning without joint fitting any of these exposures.")
×
2283
            # most_common, nmost_common = counts.most_common()[0]
2284
            # if most_common == -99:
2285
            #     most_common, nmost_common = counts.most_common()[1]
2286
            # log.warning(f"Given multiple tiles to jointly fit: {counts}. "+
2287
            #             "Only processing the most common non-default " +
2288
            #             f"tile: {most_common} with {nmost_common} exposures")
2289
            # sciences = (np.array(sciences,dtype=object)[tiles == most_common]).tolist()
2290
            # log.info("Tiles and exposure id's being submitted for joint fitting:")
2291
            # log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
2292
            # log.info("Tileid's: {}".format([row['TILEID'] for row in sciences]))
2293
            sciences = []
×
UNCOV
2294
            return ptable, calibjobs, sciences, internal_id
×
2295

UNCOV
2296
        ptable, tilejob, internal_id = science_joint_fit(ptable, sciences, internal_id, z_submit_types=z_submit_types,
×
2297
                                                         dry_run=dry_run, queue=queue, reservation=reservation,
2298
                                                         strictly_successful=strictly_successful,
2299
                                                         check_for_outputs=check_for_outputs,
2300
                                                         resubmit_partial_complete=resubmit_partial_complete,
2301
                                                         system_name=system_name)
2302
        if tilejob is not None:
×
UNCOV
2303
            sciences = []
×
2304

UNCOV
2305
    elif lasttype == 'flat' and calibjobs['nightlyflat'] is None and len(flats) == 12:
×
2306
        ## Note here we have an assumption about the number of expected flats being greater than 11
UNCOV
2307
        ptable, calibjobs['nightlyflat'], internal_id \
×
2308
            = flat_joint_fit(ptable, flats, internal_id, dry_run=dry_run, queue=queue,
2309
                             reservation=reservation, strictly_successful=strictly_successful,
2310
                             check_for_outputs=check_for_outputs,
2311
                             resubmit_partial_complete=resubmit_partial_complete,
2312
                             system_name=system_name
2313
                            )
2314

UNCOV
2315
    elif lasttype == 'arc' and calibjobs['psfnight'] is None and len(arcs) == 5:
×
2316
        ## Note here we have an assumption about the number of expected arcs being greater than 4
UNCOV
2317
        ptable, calibjobs['psfnight'], internal_id \
×
2318
            = arc_joint_fit(ptable, arcs, internal_id, dry_run=dry_run, queue=queue,
2319
                            reservation=reservation, strictly_successful=strictly_successful,
2320
                            check_for_outputs=check_for_outputs,
2321
                            resubmit_partial_complete=resubmit_partial_complete,
2322
                            system_name=system_name
2323
                            )
UNCOV
2324
    return ptable, calibjobs, sciences, internal_id
×
2325

2326
def submit_tilenight_and_redshifts(ptable, sciences, calibjobs, internal_id, dry_run=0,
1✔
2327
                                  queue='realtime', reservation=None, strictly_successful=False,
2328
                                  check_for_outputs=True, resubmit_partial_complete=True,
2329
                                  system_name=None,use_specter=False, extra_job_args=None):
2330
    """
2331
    Takes all the state-ful data from daily processing and determines whether a tilenight job needs to be submitted.
2332

2333
    Args:
2334
        ptable (Table): Processing table of all exposures that have been processed.
2335
        sciences (list of dict): list of the most recent individual prestdstar science exposures
2336
            (if currently processing that tile). May be empty if none identified yet.
2337
        internal_id (int): an internal identifier unique to each job. Increments with each new job. This
2338
            is the smallest unassigned value.
2339
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
2340
            0 which runs the code normally.
2341
            1 writes all files but doesn't submit any jobs to Slurm.
2342
            2 writes tables but doesn't write scripts or submit anything.
2343
            3 Doesn't write or submit anything but queries Slurm normally for job status.
2344
            4 Doesn't write, submit jobs, or query Slurm.
2345
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
2346
        queue (str, optional): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
2347
        reservation (str, optional): The reservation to submit jobs to. If None, it is not submitted to a reservation.
2348
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
2349
            less desirable because e.g. the sciences can run with SVN default calibrations rather
2350
            than failing completely from failed calibrations. Default is False.
2351
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
2352
            data products for the script being submitted. If all files exist and this is True,
2353
            then the script will not be submitted. If some files exist and this is True, only the
2354
            subset of the cameras without the final data products will be generated and submitted.
2355
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
2356
            jobs with some prior data are pruned using PROCCAMWORD to only process the
2357
            remaining cameras not found to exist.
2358
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, permutter-gpu
2359
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
2360
        extra_job_args (dict, optional): Dictionary with key-value pairs that specify additional
2361
            information used for a specific type of job. Examples include
2362
            laststeps for tilenight, z_submit_types for redshifts, etc.
2363

2364
    Returns:
2365
        tuple: A tuple containing:
2366

2367
        * ptable, Table, Processing table of all exposures that have been processed.
2368
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
2369
          (if currently processing that tile). May be empty if none identified yet or
2370
          we just submitted them for processing.
2371
        * internal_id, int, if no job is submitted, this is the same as the input, otherwise it is incremented upward from
2372
          from the input such that it represents the smallest unused ID.
2373
    """
2374
    ptable, tnight, internal_id = submit_tilenight(ptable, sciences, calibjobs, internal_id,
1✔
2375
                                             queue=queue, reservation=reservation,
2376
                                             dry_run=dry_run, strictly_successful=strictly_successful,
2377
                                             resubmit_partial_complete=resubmit_partial_complete,
2378
                                             system_name=system_name,use_specter=use_specter,
2379
                                             extra_job_args=extra_job_args)
2380

2381
    z_submit_types = None
1✔
2382
    if 'z_submit_types'  in extra_job_args:
1✔
2383
        z_submit_types = extra_job_args['z_submit_types']
1✔
2384

2385
    ptable, internal_id = submit_redshifts(ptable, sciences, tnight, internal_id,
1✔
2386
                                    queue=queue, reservation=reservation,
2387
                                    dry_run=dry_run, strictly_successful=strictly_successful,
2388
                                    check_for_outputs=check_for_outputs,
2389
                                    resubmit_partial_complete=resubmit_partial_complete,
2390
                                    z_submit_types=z_submit_types,
2391
                                    system_name=system_name)
2392

2393
    if tnight is not None:
1✔
2394
        sciences = []
1✔
2395

2396
    return ptable, sciences, internal_id
1✔
2397

2398
def set_calibrator_flag(prows, ptable):
1✔
2399
    """
2400
    Sets the "CALIBRATOR" column of a procesing table row to 1 (integer representation of True)
2401
     for all input rows. Used within joint fitting code to flag the exposures that were input
2402
     to the psfnight or nightlyflat for later reference.
2403

2404
    Args:
2405
        prows, list or array of Table.Rows or dicts. The rows corresponding to the individual exposure jobs that are
2406
            inputs to the joint fit.
2407
        ptable, Table. The processing table where each row is a processed job.
2408

2409
    Returns:
2410
        Table: The same processing table as input except with added rows for the joint fit job and, in the case
2411
        of a stdstarfit, the poststdstar science exposure jobs.
2412
    """
2413
    for prow in prows:
1✔
2414
        ptable['CALIBRATOR'][ptable['INTID'] == prow['INTID']] = 1
1✔
2415
    return ptable
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc