• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

desihub / desispec / 19149943254

06 Nov 2025 09:09PM UTC coverage: 37.713% (+0.7%) from 37.002%
19149943254

Pull #2521

github

web-flow
Merge c9b4e1570 into 6a90a0547
Pull Request #2521: Add redshift QA scripts

12988 of 34439 relevant lines covered (37.71%)

0.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.22
/py/desispec/workflow/processing.py
1
"""
2
desispec.workflow.processing
3
============================
4

5
"""
6
import sys, os, glob
1✔
7
import json
1✔
8
from astropy.io import fits
1✔
9
from astropy.table import Table, join
1✔
10
from desispec.scripts.compute_dark import compute_dark_parser, get_stacked_dark_exposure_table
1✔
11
from desispec.workflow.batch_writer import create_biaspdark_batch_script, create_ccdcalib_batch_script, create_desi_proc_batch_script, create_desi_proc_tilenight_batch_script, create_linkcal_batch_script, get_desi_proc_batch_file_pathname, get_desi_proc_tilenight_batch_file_pathname
1✔
12
import numpy as np
1✔
13

14
import time, datetime
1✔
15
from collections import OrderedDict
1✔
16
import subprocess
1✔
17

18
from desispec.scripts.link_calibnight import derive_include_exclude
1✔
19
from desispec.scripts.tile_redshifts import generate_tile_redshift_scripts
1✔
20
from desispec.workflow.redshifts import get_ztile_script_pathname, \
1✔
21
    get_ztile_relpath, \
22
    get_ztile_script_suffix
23
from desispec.workflow.exptable import read_minimal_science_exptab_cols
1✔
24
from desispec.workflow.queue import get_resubmission_states, update_from_queue, \
1✔
25
    queue_info_from_qids, get_queue_states_from_qids, update_queue_state_cache, \
26
    get_non_final_states
27
from desispec.workflow.timing import what_night_is_it
1✔
28
from desispec.workflow.batch_writer import get_desi_proc_batch_file_path
1✔
29
from desispec.workflow.batch import parse_reservation
1✔
30
from desispec.workflow.utils import pathjoin, sleep_and_report, \
1✔
31
    load_override_file
32
from desispec.workflow.tableio import write_table, load_table
1✔
33
from desispec.workflow.proctable import get_pdarks_from_ptable, table_row_to_dict, erow_to_prow, \
1✔
34
    read_minimal_tilenight_proctab_cols, read_minimal_full_proctab_cols, \
35
    update_full_ptab_cache, default_prow, get_default_qid
36
from desiutil.log import get_logger
1✔
37

38
from desispec.io import findfile, specprod_root
1✔
39
from desispec.io.util import decode_camword, create_camword, \
1✔
40
    difference_camwords, erow_to_goodcamword, \
41
    camword_to_spectros, camword_union, camword_intersection, parse_badamps
42

43

44
#################################################
45
############## Misc Functions ###################
46
#################################################
47
def night_to_starting_iid(night=None):
1✔
48
    """
49
    Creates an internal ID for a given night. The resulting integer is an 8 digit number.
50
    The digits are YYMMDDxxx where YY is the years since 2000, MM and DD are the month and day. xxx are 000,
51
    and are incremented for up to 1000 unique job ID's for a given night.
52

53
    Args:
54
        night (str or int): YYYYMMDD of the night to get the starting internal ID for.
55

56
    Returns:
57
        int: 9 digit number consisting of YYMMDD000. YY is years after 2000, MMDD is month and day.
58
        000 being the starting job number (0).
59
    """
60
    if night is None:
1✔
61
        night = what_night_is_it()
×
62
    night = int(night)
1✔
63
    internal_id = (night - 20000000) * 1000
1✔
64
    return internal_id
1✔
65

66
class ProcessingParams():
1✔
67
    def __init__(self, dry_run_level=0, queue='realtime',
1✔
68
                 reservation=None, strictly_successful=True,
69
                 check_for_outputs=True,
70
                 resubmit_partial_complete=True,
71
                 system_name='perlmutter', use_specter=True):
72

73
        self.dry_run_level = dry_run_level
×
74
        self.system_name = system_name
×
75
        self.queue = queue
×
76
        self.reservation = reservation
×
77
        self.strictly_successful = strictly_successful
×
78
        self.check_for_outputs = check_for_outputs
×
79
        self.resubmit_partial_complete = resubmit_partial_complete
×
80
        self.use_specter = use_specter
×
81

82
#################################################
83
############ Script Functions ###################
84
#################################################
85
def batch_script_pathname(prow):
1✔
86
    """
87
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, JOBDESC, PROCCAMWORD defined)
88
    and determines the script file pathname as defined by desi_proc's helper functions.
89

90
    Args:
91
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
92

93
    Returns:
94
        str: The complete pathname to the script file, as it is defined within the desi_proc ecosystem.
95
    """
96
    expids = prow['EXPID']
1✔
97
    if len(expids) == 0:
1✔
98
        expids = None
1✔
99
    if prow['JOBDESC'] == 'tilenight':
1✔
100
        pathname = get_desi_proc_tilenight_batch_file_pathname(night = prow['NIGHT'], tileid=prow['TILEID'])
1✔
101
    else:
102
        pathname = get_desi_proc_batch_file_pathname(night = prow['NIGHT'], exp=expids, \
1✔
103
                                             jobdesc=prow['JOBDESC'], cameras=prow['PROCCAMWORD'])
104
    scriptfile =  pathname + '.slurm'
1✔
105
    return scriptfile
1✔
106

107

108
def get_jobdesc_to_file_map():
1✔
109
    """
110
    Returns a mapping of job descriptions to the filenames of the output files
111

112
    Args:
113
        None
114

115
    Returns:
116
        dict. Dictionary with keys as lowercase job descriptions and to the
117
            filename of their expected outputs.
118

119
    """
120
    return {'biasnight': 'biasnight',
1✔
121
            'biaspdark': 'preproc_for_dark',
122
            'pdark': 'preproc_for_dark',
123
            'ccdcalib': 'ctecorrnight',
124
            'arc': 'fitpsf',
125
            'psfnight': 'psfnight',
126
            'flat': 'fiberflat',
127
            'nightlyflat': 'fiberflatnight',
128
            'prestdstar': 'sframe',
129
            'stdstarfit': 'stdstars',
130
            'poststdstar': 'cframe',
131
            'spectra': 'spectra_tile',
132
            'coadds': 'coadds_tile',
133
            'redshift': 'redrock_tile'}
134

135

136
def check_for_outputs_on_disk(prow, resubmit_partial_complete=True):
1✔
137
    """
138
    Args:
139
        prow (Table.Row or dict): Must include keyword accessible definitions for processing_table columns found in
140
            desispect.workflow.proctable.get_processing_table_column_defs()
141
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
142
            jobs with some prior data are pruned using PROCCAMWORD to only process the
143
            remaining cameras not found to exist.
144

145
    Returns:
146
        Table.Row or dict: The same prow type and keywords as input except with modified values updated to reflect
147
        the change in job status after creating and submitting the job for processing.
148
    """
149
    prow['STATUS'] = 'UNKNOWN'
1✔
150
    log = get_logger()
1✔
151

152
    if prow['JOBDESC'] in ['linkcal', 'ccdcalib']:
1✔
153
        log.info(f"jobdesc={prow['JOBDESC']} has indeterminated outputs, so "
1✔
154
                + "not checking for files on disk.")
155
        return prow
1✔
156

157
    job_to_file_map = get_jobdesc_to_file_map()
1✔
158

159
    night = prow['NIGHT']
1✔
160
    if prow['JOBDESC'] in ['cumulative','pernight-v0','pernight','perexp']:
1✔
161
        filetype = 'redrock_tile'
1✔
162
    else:
163
        filetype = job_to_file_map[prow['JOBDESC']]
1✔
164
    orig_camword = prow['PROCCAMWORD']
1✔
165

166
    ## if spectro based, look for spectros, else look for cameras
167
    if prow['JOBDESC'] in ['stdstarfit','spectra','coadds','redshift']:
1✔
168
        ## Spectrograph based
169
        spectros = camword_to_spectros(prow['PROCCAMWORD'])
×
170
        n_desired = len(spectros)
×
171
        ## Suppress outputs about using tile based files in findfile if only looking for stdstarfits
172
        if prow['JOBDESC'] == 'stdstarfit':
×
173
            tileid = None
×
174
        else:
175
            tileid = prow['TILEID']
×
176
        expid = prow['EXPID'][0]
×
177
        existing_spectros = []
×
178
        for spectro in spectros:
×
179
            if os.path.exists(findfile(filetype=filetype, night=night, expid=expid, spectrograph=spectro, tile=tileid)):
×
180
                existing_spectros.append(spectro)
×
181
        completed = (len(existing_spectros) == n_desired)
×
182
        if not completed and resubmit_partial_complete and len(existing_spectros) > 0:
×
183
            existing_camword = 'a' + ''.join([str(spec) for spec in sorted(existing_spectros)])
×
184
            prow['PROCCAMWORD'] = difference_camwords(prow['PROCCAMWORD'],existing_camword)
×
185
    elif prow['JOBDESC'] in ['cumulative','pernight-v0','pernight','perexp']:
1✔
186
        ## Spectrograph based
187
        spectros = camword_to_spectros(prow['PROCCAMWORD'])
1✔
188
        n_desired = len(spectros)
1✔
189
        ## Suppress outputs about using tile based files in findfile if only looking for stdstarfits
190
        tileid = prow['TILEID']
1✔
191
        expid = prow['EXPID'][0]
1✔
192
        redux_dir = specprod_root()
1✔
193
        outdir = os.path.join(redux_dir,get_ztile_relpath(tileid,group=prow['JOBDESC'],night=night,expid=expid))
1✔
194
        suffix = get_ztile_script_suffix(tileid, group=prow['JOBDESC'], night=night, expid=expid)
1✔
195
        existing_spectros = []
1✔
196
        for spectro in spectros:
1✔
197
            if os.path.exists(os.path.join(outdir, f"redrock-{spectro}-{suffix}.fits")):
1✔
198
                existing_spectros.append(spectro)
×
199
        completed = (len(existing_spectros) == n_desired)
1✔
200
        if not completed and resubmit_partial_complete and len(existing_spectros) > 0:
1✔
201
            existing_camword = 'a' + ''.join([str(spec) for spec in sorted(existing_spectros)])
×
202
            prow['PROCCAMWORD'] = difference_camwords(prow['PROCCAMWORD'],existing_camword)
×
203
    elif str(prow['JOBDESC']).endswith('pdark') and len(prow['EXPID']) > 1:
1✔
204
        ## camera based, multi exposure
205
        cameras = decode_camword(prow['PROCCAMWORD'])
1✔
206
        nexps = len(prow['EXPID'])
1✔
207
        n_desired = len(cameras)*nexps
1✔
208

209
        missing_cameras = []
1✔
210
        for cam in cameras:
1✔
211
            for expid in prow['EXPID']:
1✔
212
                if not os.path.exists(findfile(filetype=filetype, night=night, expid=expid, camera=cam)):
1✔
213
                    missing_cameras.append(cam)
1✔
214
        completed = (len(missing_cameras) == 0)
1✔
215
        if not completed and resubmit_partial_complete and len(missing_cameras) < n_desired:
1✔
216
            prow['PROCCAMWORD'] = create_camword(np.unique(missing_cameras))
×
217
    else:
218
        ## Otheriwse camera based
219
        cameras = decode_camword(prow['PROCCAMWORD'])
1✔
220
        n_desired = len(cameras)
1✔
221
        if len(prow['EXPID']) > 0:
1✔
222
            expid = prow['EXPID'][0]
1✔
223
        else:
224
            expid = None
×
225
        if len(prow['EXPID']) > 1 and prow['JOBDESC'] not in ['psfnight','nightlyflat']:
1✔
226
            log.warning(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']}. This job type only makes " +
×
227
                     f"sense with a single exposure. Proceeding with {expid}.")
228
        missing_cameras = []
1✔
229
        for cam in cameras:
1✔
230
            if not os.path.exists(findfile(filetype=filetype, night=night, expid=expid, camera=cam)):
1✔
231
                missing_cameras.append(cam)
1✔
232
        completed = (len(missing_cameras) == 0)
1✔
233
        if not completed and resubmit_partial_complete and len(missing_cameras) < n_desired:
1✔
234
            prow['PROCCAMWORD'] = create_camword(missing_cameras)
×
235

236
    if completed:
1✔
237
        prow['STATUS'] = 'COMPLETED'
×
238
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} already has " +
×
239
                 f"the desired {n_desired} {filetype}'s. Not submitting this job.")
240
    elif resubmit_partial_complete and orig_camword != prow['PROCCAMWORD']:
1✔
241
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} already has " +
×
242
                 f"some {filetype}'s. Submitting smaller camword={prow['PROCCAMWORD']}.")
243
    elif not resubmit_partial_complete:
1✔
244
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} doesn't have all " +
×
245
                 f"{filetype}'s and resubmit_partial_complete=False. "+
246
                 f"Submitting full camword={prow['PROCCAMWORD']}.")
247
    else:
248
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} has no " +
1✔
249
                 f"existing {filetype}'s. Submitting full camword={prow['PROCCAMWORD']}.")
250
    return prow
1✔
251

252
def create_and_submit(prow, queue='realtime', reservation=None, dry_run=0,
1✔
253
                      joint=False, strictly_successful=False,
254
                      check_for_outputs=True, resubmit_partial_complete=True,
255
                      system_name=None, use_specter=False,
256
                      extra_job_args=None):
257
    """
258
    Wrapper script that takes a processing table row and three modifier keywords, creates a submission script for the
259
    compute nodes, and then submits that script to the Slurm scheduler with appropriate dependencies.
260

261
    Args:
262
        prow (Table.Row or dict): Must include keyword accessible definitions for processing_table columns found in
263
            desispect.workflow.proctable.get_processing_table_column_defs()
264
        queue (str, optional): The name of the NERSC Slurm queue to submit to. Default is the realtime queue.
265
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
266
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
267
            0 which runs the code normally.
268
            1 writes all files but doesn't submit any jobs to Slurm.
269
            2 writes tables but doesn't write scripts or submit anything.
270
            3 Doesn't write or submit anything but queries Slurm normally for job status.
271
            4 Doesn't write, submit jobs, or query Slurm.
272
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
273
        joint (bool, optional): Whether this is a joint fitting job (the job involves multiple exposures) and therefore needs to be
274
            run with desi_proc_joint_fit. Default is False.
275
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
276
            less desirable because e.g. the sciences can run with SVN default calibrations rather
277
            than failing completely from failed calibrations. Default is False.
278
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
279
            data products for the script being submitted. If all files exist and this is True,
280
            then the script will not be submitted. If some files exist and this is True, only the
281
            subset of the cameras without the final data products will be generated and submitted.
282
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
283
            jobs with some prior data are pruned using PROCCAMWORD to only process the
284
            remaining cameras not found to exist.
285
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
286
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
287
        extra_job_args (dict): Dictionary with key-value pairs that specify additional
288
            information used for a specific type of job. Examples include refnight
289
            and include/exclude lists for linkcals, laststeps for for tilenight, etc.
290

291
    Returns:
292
        Table.Row or dict: The same prow type and keywords as input except with modified values updated to reflect
293
        the change in job status after creating and submitting the job for processing.
294

295
    Note:
296
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
297
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
298
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
299
    """
300
    orig_prow = prow.copy()
1✔
301
    if check_for_outputs:
1✔
302
        prow = check_for_outputs_on_disk(prow, resubmit_partial_complete)
1✔
303
        if prow['STATUS'].upper() == 'COMPLETED':
1✔
304
            return prow
×
305

306
    prow = create_batch_script(prow, queue=queue, dry_run=dry_run, joint=joint,
1✔
307
                               system_name=system_name, use_specter=use_specter,
308
                               extra_job_args=extra_job_args)
309
    prow = submit_batch_script(prow, reservation=reservation, dry_run=dry_run,
1✔
310
                               strictly_successful=strictly_successful)
311

312
    ## If resubmitted partial, the PROCCAMWORD and SCRIPTNAME will correspond
313
    ## to the pruned values. But we want to
314
    ## retain the full job's value, so get those from the old job.
315
    if resubmit_partial_complete:
1✔
316
        prow['PROCCAMWORD'] = orig_prow['PROCCAMWORD']
1✔
317
        prow['SCRIPTNAME'] = orig_prow['SCRIPTNAME']
1✔
318
    return prow
1✔
319

320
def desi_link_calibnight_command(prow, refnight, include=None):
1✔
321
    """
322
    Wrapper script that takes a processing table row (or dictionary with
323
    REFNIGHT, NIGHT, PROCCAMWORD defined) and determines the proper command
324
    line call to link data defined by the input row/dict.
325

326
    Args:
327
        prow (Table.Row or dict): Must include keyword accessible definitions
328
            for 'NIGHT', 'REFNIGHT', and 'PROCCAMWORD'.
329
        refnight (str or int): The night with a valid set of calibrations
330
            be created.
331
        include (list): The filetypes to include in the linking.
332
    Returns:
333
        str: The proper command to be submitted to desi_link_calibnight
334
            to process the job defined by the prow values.
335
    """
336
    cmd = 'desi_link_calibnight'
1✔
337
    cmd += f' --refnight={refnight}'
1✔
338
    cmd += f' --newnight={prow["NIGHT"]}'
1✔
339
    cmd += f' --cameras={prow["PROCCAMWORD"]}'
1✔
340
    if include is not None:
1✔
341
        cmd += f' --include=' + ','.join(list(include))
1✔
342
    return cmd
1✔
343

344
def desi_proc_command(prow, system_name, use_specter=False, queue=None):
1✔
345
    """
346
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, OBSTYPE, JOBDESC, PROCCAMWORD defined)
347
    and determines the proper command line call to process the data defined by the input row/dict.
348

349
    Args:
350
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
351
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, perlmutter-gpu
352
        queue (str, optional): The name of the NERSC Slurm queue to submit to. Default is None (which leaves it to the desi_proc default).
353
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
354

355
    Returns:
356
        str: The proper command to be submitted to desi_proc to process the job defined by the prow values.
357
    """
358
    cmd = 'desi_proc'
1✔
359
    cmd += ' --batch'
1✔
360
    cmd += ' --nosubmit'
1✔
361
    if queue is not None:
1✔
362
        cmd += f' -q {queue}'
1✔
363
    if prow['OBSTYPE'].lower() == 'science':
1✔
364
        if prow['JOBDESC'] == 'prestdstar':
×
365
            cmd += ' --nostdstarfit --nofluxcalib'
×
366
        elif prow['JOBDESC'] == 'poststdstar':
×
367
            cmd += ' --noprestdstarfit --nostdstarfit'
×
368

369
        if use_specter:
×
370
            cmd += ' --use-specter'
×
371
    elif prow['JOBDESC'] in ['flat', 'prestdstar'] and use_specter:
1✔
372
        cmd += ' --use-specter'
×
373
    pcamw = str(prow['PROCCAMWORD'])
1✔
374
    cmd += f" --cameras={pcamw} -n {prow['NIGHT']}"
1✔
375
    if len(prow['EXPID']) > 0:
1✔
376
        ## If ccdcalib job without a dark exposure, don't assign the flat expid
377
        ## since it would incorrectly process the flat using desi_proc
378
        if prow['OBSTYPE'].lower() != 'flat' or prow['JOBDESC'] != 'ccdcalib':
1✔
379
            cmd += f" -e {prow['EXPID'][0]}"
1✔
380
    if prow['BADAMPS'] != '':
1✔
381
        cmd += ' --badamps={}'.format(prow['BADAMPS'])
×
382
    return cmd
1✔
383

384
def desi_proc_joint_fit_command(prow, queue=None):
1✔
385
    """
386
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, OBSTYPE, PROCCAMWORD defined)
387
    and determines the proper command line call to process the data defined by the input row/dict.
388

389
    Args:
390
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
391
        queue (str): The name of the NERSC Slurm queue to submit to. Default is None (which leaves it to the desi_proc default).
392

393
    Returns:
394
        str: The proper command to be submitted to desi_proc_joint_fit
395
            to process the job defined by the prow values.
396
    """
397
    cmd = 'desi_proc_joint_fit'
1✔
398
    cmd += ' --batch'
1✔
399
    cmd += ' --nosubmit'
1✔
400
    if queue is not None:
1✔
401
        cmd += f' -q {queue}'
1✔
402

403
    descriptor = prow['OBSTYPE'].lower()
1✔
404

405
    night = prow['NIGHT']
1✔
406
    specs = str(prow['PROCCAMWORD'])
1✔
407
    expid_str = ','.join([str(eid) for eid in prow['EXPID']])
1✔
408

409
    cmd += f' --obstype {descriptor}'
1✔
410
    cmd += f' --cameras={specs} -n {night}'
1✔
411
    if len(expid_str) > 0:
1✔
412
        cmd += f' -e {expid_str}'
1✔
413
    return cmd
1✔
414

415

416
def create_batch_script(prow, queue='realtime', dry_run=0, joint=False,
1✔
417
                        system_name=None, use_specter=False, extra_job_args=None):
418
    """
419
    Wrapper script that takes a processing table row and three modifier keywords and creates a submission script for the
420
    compute nodes.
421

422
    Args:
423
        prow, Table.Row or dict. Must include keyword accessible definitions for processing_table columns found in
424
            desispect.workflow.proctable.get_processing_table_column_defs()
425
        queue, str. The name of the NERSC Slurm queue to submit to. Default is the realtime queue.
426
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
427
            0 which runs the code normally.
428
            1 writes all scripts but doesn't submit any jobs to Slurm.
429
            2 writes tables but doesn't write scripts or submit anything.
430
            3 Doesn't write or submit anything but queries Slurm normally for job status.
431
            4 Doesn't write, submit jobs, or query Slurm.
432
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
433
        joint, bool. Whether this is a joint fitting job (the job involves multiple exposures) and therefore needs to be
434
            run with desi_proc_joint_fit when not using tilenight. Default is False.
435
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
436
        use_specter, bool, optional. Default is False. If True, use specter, otherwise use gpu_specter by default.
437
        extra_job_args (dict): Dictionary with key-value pairs that specify additional
438
            information used for a specific type of job. Examples include refnight
439
            and include/exclude lists for linkcal, laststeps for tilenight, etc.
440

441
    Returns:
442
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
443
        scriptname.
444

445
    Note:
446
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
447
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
448
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
449
    """
450
    log = get_logger()
1✔
451

452
    if extra_job_args is None:
1✔
453
        extra_job_args = {}
1✔
454

455
    if prow['JOBDESC'] == 'linkcal':
1✔
456
        refnight, include, exclude = -99, None, None
1✔
457
        if 'refnight' in extra_job_args:
1✔
458
            refnight = extra_job_args['refnight']
1✔
459
        if 'include' in extra_job_args:
1✔
460
            include = extra_job_args['include']
1✔
461
        if 'exclude' in extra_job_args:
1✔
462
            exclude = extra_job_args['exclude']
×
463
        include, exclude = derive_include_exclude(include, exclude)
1✔
464
        ## Fiberflatnights shouldn't to be generated with psfs from same time, so
465
        ## shouldn't link psfs without also linking fiberflatnight
466
        ## However, this should be checked at a higher level. If set here,
467
        ## go ahead and do it
468
        # if 'psfnight' in include and not 'fiberflatnight' in include:
469
        #     err = "Must link fiberflatnight if linking psfnight"
470
        #     log.error(err)
471
        #     raise ValueError(err)
472
        if dry_run > 1:
1✔
473
            scriptpathname = batch_script_pathname(prow)
1✔
474
            log.info("Output file would have been: {}".format(scriptpathname))
1✔
475
            cmd = desi_link_calibnight_command(prow, refnight, include)
1✔
476
            log.info("Command to be run: {}".format(cmd.split()))
1✔
477
        else:
478
            if refnight == -99:
×
479
                err = f'For {prow=} asked to link calibration but not given' \
×
480
                      + ' a valid refnight'
481
                log.error(err)
×
482
                raise ValueError(err)
×
483

484
            cmd = desi_link_calibnight_command(prow, refnight, include)
×
485
            log.info(f"Running: {cmd.split()}")
×
486
            scriptpathname = create_linkcal_batch_script(newnight=prow['NIGHT'],
×
487
                                                         cameras=prow['PROCCAMWORD'],
488
                                                         queue=queue,
489
                                                         cmd=cmd,
490
                                                         system_name=system_name)
491
    elif prow['JOBDESC'] in ['biasnight','pdark','biaspdark']:
1✔
492
        if dry_run > 1:
1✔
493
            scriptpathname = get_desi_proc_batch_file_pathname(night=prow['NIGHT'], exp=prow['EXPID'], 
1✔
494
                                                   jobdesc=prow['JOBDESC'], cameras=prow['PROCCAMWORD'])
495
        else:
496
            log.info(f"Creating biaspdark script for: {prow}, {extra_job_args}")
1✔
497
            do_biasnight, do_pdark = False, False
1✔
498
            if 'steps' in extra_job_args:
1✔
499
                do_biasnight = 'biasnight' in extra_job_args['steps']
1✔
500
                do_pdark = 'pdark' in extra_job_args['steps']
1✔
501
            scriptpathname = create_biaspdark_batch_script(night=prow['NIGHT'], expids=prow['EXPID'],
1✔
502
                                                   jobdesc=prow['JOBDESC'], camword=prow['PROCCAMWORD'],
503
                                                   do_biasnight=do_biasnight, do_pdark=do_pdark, 
504
                                                   queue=queue, system_name=system_name)
505
    elif prow['JOBDESC'] in ['ccdcalib']:
1✔
506
        if dry_run > 1:
1✔
507
            scriptpathname = get_desi_proc_batch_file_pathname(night=prow['NIGHT'], exp=prow['EXPID'], 
1✔
508
                                                   jobdesc=prow['JOBDESC'], cameras=prow['PROCCAMWORD'])
509
        else:
510
            log.info(f"Creating ccdcalib script for: {prow}, {extra_job_args}")
1✔
511
                                #  do_ctecorr=False, n_nights_before=None, n_nights_after=None,
512
                                #  dark_expid=None, cte_expids=None
513
            do_darknight, do_badcolumn, do_ctecorr = False, False, False
1✔
514
            if 'steps' in extra_job_args:
1✔
515
                do_darknight = 'darknight' in extra_job_args['steps']
1✔
516
                do_badcolumn = 'badcolumn' in extra_job_args['steps']
1✔
517
                do_ctecorr = 'ctecorr' in extra_job_args['steps']
1✔
518
            n_nights_before, n_nights_after = None, None
1✔
519
            if 'n_nights_before' in extra_job_args:
1✔
520
                n_nights_before = extra_job_args['n_nights_before']
1✔
521
            if 'n_nights_after' in extra_job_args:
1✔
522
                n_nights_after = extra_job_args['n_nights_after']
1✔
523
            dark_expid, cte_expids = None, None
1✔
524
            if 'dark_expid' in extra_job_args:
1✔
525
                dark_expid = extra_job_args['dark_expid']
1✔
526
            if 'cte_expids' in extra_job_args:
1✔
527
                cte_expids = extra_job_args['cte_expids']
1✔
528
            scriptpathname = create_ccdcalib_batch_script(night=prow['NIGHT'], expids=prow['EXPID'],
1✔
529
                                                   camword=prow['PROCCAMWORD'],
530
                                                   do_darknight=do_darknight, do_badcolumn=do_badcolumn, 
531
                                                   do_ctecorr=do_ctecorr, n_nights_before=n_nights_before, 
532
                                                   n_nights_after=n_nights_after,
533
                                                   dark_expid=dark_expid, cte_expids=cte_expids, 
534
                                                   queue=queue, system_name=system_name)
535
    elif prow['JOBDESC'] in ['perexp','pernight','pernight-v0','cumulative']:
1✔
536
        if dry_run > 1:
1✔
537
            scriptpathname = get_ztile_script_pathname(tileid=prow['TILEID'],group=prow['JOBDESC'],
1✔
538
                                                               night=prow['NIGHT'], expid=prow['EXPID'][0])
539
            log.info("Output file would have been: {}".format(scriptpathname))
1✔
540
        else:
541
            #- run zmtl for cumulative redshifts but not others
542
            run_zmtl = (prow['JOBDESC'] == 'cumulative')
1✔
543
            no_afterburners = False
1✔
544
            log.info(f"Creating tile redshift script for: {prow}")
1✔
545
            scripts, failed_scripts = generate_tile_redshift_scripts(tileid=prow['TILEID'], group=prow['JOBDESC'],
1✔
546
                                                                     nights=[prow['NIGHT']], expids=prow['EXPID'],
547
                                                                     batch_queue=queue, system_name=system_name,
548
                                                                     run_zmtl=run_zmtl,
549
                                                                     no_afterburners=no_afterburners,
550
                                                                     nosubmit=True)
551
            if len(failed_scripts) > 0:
1✔
552
                log.error(f"Redshifts failed for group={prow['JOBDESC']}, night={prow['NIGHT']}, "+
×
553
                          f"tileid={prow['TILEID']}, expid={prow['EXPID']}.")
554
                log.info(f"Returned failed scriptname is {failed_scripts}")
×
555
            elif len(scripts) > 1:
1✔
556
                log.error(f"More than one redshifts returned for group={prow['JOBDESC']}, night={prow['NIGHT']}, "+
×
557
                          f"tileid={prow['TILEID']}, expid={prow['EXPID']}.")
558
                log.info(f"Returned scriptnames were {scripts}")
×
559
            elif len(scripts) == 0:
1✔
560
                msg = f'No scripts were generated for {prow=}'
×
561
                log.critical(prow)
×
562
                raise ValueError(msg)
×
563
            else:
564
                scriptpathname = scripts[0]
1✔
565
    else:
566
        if prow['JOBDESC'] != 'tilenight':
1✔
567
            ## run known joint jobs as joint even if unspecified
568
            ## in the future we can eliminate the need for "joint"
569
            if joint or prow['JOBDESC'].lower() in ['psfnight', 'nightlyflat']:
1✔
570
                cmd = desi_proc_joint_fit_command(prow, queue=queue)
1✔
571
                ## For consistency with how we edit the other commands, do them
572
                ## here, but future TODO would be to move these into the command
573
                ## generation itself
574
                if 'extra_cmd_args' in extra_job_args:
1✔
575
                    cmd += ' ' + ' '.join(np.atleast_1d(extra_job_args['extra_cmd_args']))
1✔
576
            else:
577
                ## individual arcs and flats
578
                cmd = desi_proc_command(prow, system_name, use_specter, queue=queue)
1✔
579

580
        if dry_run > 1:
1✔
581
            scriptpathname = batch_script_pathname(prow)
1✔
582
            log.info("Output file would have been: {}".format(scriptpathname))
1✔
583
            if prow['JOBDESC'] != 'tilenight':
1✔
584
                log.info("Command to be run: {}".format(cmd.split()))
1✔
585
        else:
586
            expids = prow['EXPID']
1✔
587
            if len(expids) == 0:
1✔
588
                expids = None
×
589

590
            if prow['JOBDESC'] == 'tilenight':
1✔
591
                log.info("Creating tilenight script for tile {}".format(prow['TILEID']))
1✔
592
                if 'laststeps' in extra_job_args:
1✔
593
                    laststeps = extra_job_args['laststeps']
1✔
594
                else:
595
                    err = f'{prow=} job did not specify last steps to tilenight'
×
596
                    log.error(err)
×
597
                    raise ValueError(err)
×
598
                ncameras = len(decode_camword(prow['PROCCAMWORD']))
1✔
599
                scriptpathname = create_desi_proc_tilenight_batch_script(
1✔
600
                                                               night=prow['NIGHT'], exp=expids,
601
                                                               tileid=prow['TILEID'],
602
                                                               ncameras=ncameras,
603
                                                               queue=queue,
604
                                                               mpistdstars=True,
605
                                                               use_specter=use_specter,
606
                                                               system_name=system_name,
607
                                                               laststeps=laststeps)
608
            else:
609
                if expids is not None and len(expids) > 1:
1✔
610
                    expids = expids[:1]
1✔
611
                log.info("Running: {}".format(cmd.split()))
1✔
612
                scriptpathname = create_desi_proc_batch_script(night=prow['NIGHT'], exp=expids,
1✔
613
                                                               cameras=prow['PROCCAMWORD'],
614
                                                               jobdesc=prow['JOBDESC'],
615
                                                               queue=queue, cmdline=cmd,
616
                                                               use_specter=use_specter,
617
                                                               system_name=system_name)
618
    log.info("Outfile is: {}".format(scriptpathname))
1✔
619
    prow['SCRIPTNAME'] = os.path.basename(scriptpathname)
1✔
620
    return prow
1✔
621

622
_fake_qid = int(time.time() - 1.7e9)
1✔
623
def _get_fake_qid():
1✔
624
    """
625
    Return fake slurm queue jobid to use for dry-run testing
626
    """
627
    # Note: not implemented as a yield generator so that this returns a
628
    # genuine int, not a generator object
629
    global _fake_qid
630
    _fake_qid += 1
1✔
631
    return _fake_qid
1✔
632

633
def submit_batch_script(prow, dry_run=0, reservation=None, strictly_successful=False):
1✔
634
    """
635
    Wrapper script that takes a processing table row and three modifier keywords and submits the scripts to the Slurm
636
    scheduler.
637

638
    Args:
639
        prow, Table.Row or dict. Must include keyword accessible definitions for processing_table columns found in
640
            desispect.workflow.proctable.get_processing_table_column_defs()
641
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
642
            0 which runs the code normally.
643
            1 writes all files but doesn't submit any jobs to Slurm.
644
            2 writes tables but doesn't write scripts or submit anything.
645
            3 Doesn't write or submit anything but queries Slurm normally for job status.
646
            4 Doesn't write, submit jobs, or query Slurm.
647
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
648
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
649
        strictly_successful, bool. Whether all jobs require all inputs to have succeeded. For daily processing, this is
650
            less desirable because e.g. the sciences can run with SVN default calibrations rather
651
            than failing completely from failed calibrations. Default is False.
652

653
    Returns:
654
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
655
        scriptname.
656

657
    Note:
658
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
659
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
660
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
661
    """
662
    log = get_logger()
1✔
663
    dep_qids = prow['LATEST_DEP_QID']
1✔
664
    dep_list, dep_str = '', ''
1✔
665

666
    ## With desi_proc_night we now either resubmit failed jobs or exit, so this
667
    ## should no longer be necessary in the normal workflow.
668
    # workaround for sbatch --dependency bug not tracking jobs correctly
669
    # see NERSC TICKET INC0203024
670
    failed_dependency = False
1✔
671
    if len(dep_qids) > 0 and not dry_run:
1✔
672
        non_final_states = get_non_final_states()
×
673
        state_dict = get_queue_states_from_qids(dep_qids, dry_run_level=dry_run, use_cache=True)
×
674
        still_depids = []
×
675
        for depid in dep_qids:
×
676
            if depid in state_dict.keys():
×
677
                if state_dict[int(depid)] == 'COMPLETED':
×
678
                   log.info(f"removing completed jobid {depid}")
×
679
                elif state_dict[int(depid)] not in non_final_states:
×
680
                    failed_dependency = True
×
681
                    log.info("Found a dependency in a bad final state="
×
682
                             + f"{state_dict[int(depid)]} for depjobid={depid},"
683
                             + " not submitting this job.")
684
                    still_depids.append(depid)
×
685
                else:
686
                    still_depids.append(depid)
×
687
            else:
688
                still_depids.append(depid)
×
689
        dep_qids = np.array(still_depids)
×
690

691
    if len(dep_qids) > 0:
1✔
692
        jobtype = prow['JOBDESC']
1✔
693
        if strictly_successful:
1✔
694
            depcond = 'afterok'
1✔
695
        elif jobtype in ['arc', 'psfnight', 'prestdstar', 'stdstarfit']:
×
696
            ## (though psfnight and stdstarfit will require some inputs otherwise they'll go up in flames)
697
            depcond = 'afterany'
×
698
        else:
699
            ## if 'flat','nightlyflat','poststdstar', or any type of redshift, require strict success of inputs
700
            depcond = 'afterok'
×
701

702
        dep_str = f'--dependency={depcond}:'
1✔
703

704
        if np.isscalar(dep_qids):
1✔
705
            dep_list = str(dep_qids).strip(' \t')
×
706
            if dep_list == '':
×
707
                dep_str = ''
×
708
            else:
709
                dep_str += dep_list
×
710
        else:
711
            if len(dep_qids)>1:
1✔
712
                dep_list = ':'.join(np.array(dep_qids).astype(str))
1✔
713
                dep_str += dep_list
1✔
714
            elif len(dep_qids) == 1 and dep_qids[0] not in [None, 0]:
1✔
715
                dep_str += str(dep_qids[0])
1✔
716
            else:
717
                dep_str = ''
×
718

719
    # script = f'{jobname}.slurm'
720
    # script_pathname = pathjoin(batchdir, script)
721
    if prow['JOBDESC'] in ['pernight-v0','pernight','perexp','cumulative']:
1✔
722
        script_pathname = get_ztile_script_pathname(tileid=prow['TILEID'],group=prow['JOBDESC'],
1✔
723
                                                        night=prow['NIGHT'], expid=np.min(prow['EXPID']))
724
        jobname = os.path.basename(script_pathname)
1✔
725
    else:
726
        script_pathname = batch_script_pathname(prow)
1✔
727
        jobname = os.path.basename(script_pathname)
1✔
728

729
    batch_params = ['sbatch', '--parsable']
1✔
730
    if dep_str != '':
1✔
731
        batch_params.append(f'{dep_str}')
1✔
732

733
    reservation = parse_reservation(reservation, prow['JOBDESC'])
1✔
734
    if reservation is not None:
1✔
735
        batch_params.append(f'--reservation={reservation}')
×
736

737
    batch_params.append(f'{script_pathname}')
1✔
738
    submitted = True
1✔
739
    ## If dry_run give it a fake QID
740
    ## if a dependency has failed don't even try to submit the job because
741
    ## Slurm will refuse, instead just mark as unsubmitted.
742
    if dry_run:
1✔
743
        current_qid = _get_fake_qid()
1✔
744
    elif not failed_dependency:
×
745
        #- sbatch sometimes fails; try several times before giving up
746
        max_attempts = 3
×
747
        for attempt in range(max_attempts):
×
748
            try:
×
749
                current_qid = subprocess.check_output(batch_params, stderr=subprocess.STDOUT, text=True)
×
750
                current_qid = int(current_qid.strip(' \t\n'))
×
751
                break
×
752
            except subprocess.CalledProcessError as err:
×
753
                log.error(f'{jobname} submission failure at {datetime.datetime.now()}')
×
754
                log.error(f'{jobname}   {batch_params}')
×
755
                log.error(f'{jobname}   {err.output=}')
×
756
                if attempt < max_attempts - 1:
×
757
                    log.info('Sleeping 60 seconds then retrying')
×
758
                    time.sleep(60)
×
759
        else:  #- for/else happens if loop doesn't succeed
760
            msg = f'{jobname} submission failed {max_attempts} times.' \
×
761
                  + ' setting as unsubmitted and moving on'
762
            log.error(msg)
×
763
            current_qid = get_default_qid()
×
764
            submitted = False
×
765
    else:
766
        current_qid = get_default_qid()
×
767
        submitted = False
×
768

769
    ## Update prow with new information
770
    prow['LATEST_QID'] = current_qid
1✔
771

772
    ## If we didn't submit, don't say we did and don't add to ALL_QIDS
773
    if submitted:
1✔
774
        log.info(batch_params)
1✔
775
        log.info(f'Submitted {jobname} with dependencies {dep_str} and '
1✔
776
                 + f'reservation={reservation}. Returned qid: {current_qid}')
777

778
        ## Update prow with new information
779
        prow['ALL_QIDS'] = np.append(prow['ALL_QIDS'],current_qid)
1✔
780
        prow['STATUS'] = 'SUBMITTED'
1✔
781
        prow['SUBMIT_DATE'] = int(time.time())
1✔
782
    else:
783
        log.info(f"Would have submitted: {batch_params}")
×
784
        prow['STATUS'] = 'UNSUBMITTED'
×
785

786
        ## Update the Slurm jobid cache of job states
787
        update_queue_state_cache(qid=prow['LATEST_QID'], state=prow['STATUS'])
×
788

789
    return prow
1✔
790

791

792
#############################################
793
##########   Row Manipulations   ############
794
#############################################
795
def define_and_assign_dependency(prow, calibjobs, use_tilenight=False,
1✔
796
                                 refnight=None, include_files=None):
797
    """
798
    Given input processing row and possible calibjobs, this defines the
799
    JOBDESC keyword and assigns the dependency appropriate for the job type of
800
    prow.
801

802
    Args:
803
        prow, Table.Row or dict. Must include keyword accessible definitions for
804
            'OBSTYPE'. A row must have column names for
805
            'JOBDESC', 'INT_DEP_IDS', and 'LATEST_DEP_ID'.
806
        calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
807
            and 'nightlyflat'. Each key corresponds to a Table.Row or
808
            None. The table.Row() values are for the corresponding
809
            calibration job. Each value that isn't None must contain
810
            'INTID', and 'LATEST_QID'. If None, it assumes the
811
            dependency doesn't exist and no dependency is assigned.
812
        use_tilenight, bool. Default is False. If True, use desi_proc_tilenight
813
            for prestdstar, stdstar,and poststdstar steps for
814
            science exposures.
815
        refnight, int. The reference night for linking jobs
816
        include_files, list. List of filetypes to include in the linking
817

818
    Returns:
819
        Table.Row or dict: The same prow type and keywords as input except
820
        with modified values updated values for 'JOBDESC', 'INT_DEP_IDS'. and 'LATEST_DEP_ID'.
821

822
    Note:
823
        This modifies the input. Though Table.Row objects are generally copied
824
        on modification, so the change to the input object in memory may or may
825
        not be changed. As of writing, a row from a table given to this function
826
        will not change during the execution of this function (but can be
827
        overwritten explicitly with the returned row if desired).
828
    """
829
    log = get_logger()
1✔
830

831
    if isinstance(calibjobs, Table):
1✔
832
        calibjobs = generate_calibration_dict(calibjobs, files_to_link=include_files)
1✔
833
    elif not isinstance(calibjobs, dict):
1✔
834
        log.error("prow must be a Table.Row or a dict")
×
835
        raise TypeError("prow must be a Table.Row or a dict")
×
836
    
837
    if prow['OBSTYPE'] in ['science', 'twiflat']:
1✔
838
        if calibjobs['nightlyflat'] is not None:
1✔
839
            dependency = calibjobs['nightlyflat']
1✔
840
        elif calibjobs['psfnight'] is not None:
1✔
841
            dependency = calibjobs['psfnight']
1✔
842
        elif calibjobs['ccdcalib'] is not None:
1✔
843
            dependency = calibjobs['ccdcalib']
1✔
844
        elif calibjobs['biaspdark'] is not None:
×
845
            dependency = calibjobs['biaspdark']
×
846
        else:
847
            dependency = calibjobs['linkcal']
×
848
        if not use_tilenight:
1✔
849
            prow['JOBDESC'] = 'prestdstar'
1✔
850
    elif prow['OBSTYPE'] == 'flat':
1✔
851
        if calibjobs['psfnight'] is not None:
1✔
852
            dependency = calibjobs['psfnight']
1✔
853
        elif calibjobs['ccdcalib'] is not None:
1✔
854
            dependency = calibjobs['ccdcalib']
1✔
855
        elif calibjobs['biaspdark'] is not None:
1✔
856
            dependency = calibjobs['biaspdark']
1✔
857
        else:
858
            dependency = calibjobs['linkcal']
1✔
859
    elif prow['OBSTYPE'] == 'arc':
1✔
860
        if calibjobs['ccdcalib'] is not None:
1✔
861
            dependency = calibjobs['ccdcalib']
1✔
862
        elif calibjobs['biaspdark'] is not None:
×
863
            dependency = calibjobs['biaspdark']
×
864
        else:
865
            dependency = calibjobs['linkcal']
×
866
    elif prow['JOBDESC'] in ['badcol', 'nightlybias', 'ccdcalib', 'pdark']:
1✔
867
        if calibjobs['biaspdark'] is not None:
1✔
868
            dependency = calibjobs['biaspdark']
1✔
869
        else:
870
            dependency = calibjobs['linkcal']
1✔
871
    elif prow['JOBDESC'] == 'biaspdark':
1✔
872
        dependency = calibjobs['linkcal']
1✔
873
    elif prow['JOBDESC'] == 'linkcal' and refnight is not None:
1✔
874
        dependency = None
1✔
875
        ## For link cals only, enable cross-night dependencies if available
876
        refproctable = findfile('proctable', night=refnight)
1✔
877
        if os.path.exists(refproctable):
1✔
878
            ptab = load_table(tablename=refproctable, tabletype='proctable')
×
879
            ## This isn't perfect because we may depend on jobs that aren't
880
            ## actually being linked
881
            ## Also allows us to proceed even if jobs don't exist yet
882
            deps, proccamwords = [], []
×
883
            #for job in ['nightlybias', 'ccdcalib', 'psfnight', 'nightlyflat']:
884
            if include_files is not None:
×
885
                for filename in include_files:
×
886
                    job = filename_to_jobname(filename)
×
887
                    if job in ptab['JOBDESC']:
×
888
                        ## add prow to dependencies
889
                        deprow = ptab[ptab['JOBDESC']==job][0]
×
890
                        deps.append(deprow)
×
891
                        proccamwords.append(deprow['PROCCAMWORD'])
×
892
                    elif 'linkcal' in ptab['JOBDESC']:
×
893
                        linkcalprow = ptab[ptab['JOBDESC']=='linkcal'][0]
×
894
                        deps.append(linkcalprow)
×
895
                        proccamwords.append(linkcalprow['PROCCAMWORD'])
×
896
            if len(deps) > 0:
×
897
                dependency = np.unique(deps)
×
898
            ## The proccamword for the linking job is the largest set available from the reference night
899
            ## but restricting back to those requested for the current night, if fewer cameras are available
900
            if len(proccamwords) > 0:
×
901
                prow['PROCCAMWORD'] = camword_intersection([prow['PROCCAMWORD'], camword_union(proccamwords)])
×
902
    else:
903
        dependency = None
×
904

905
    prow = assign_dependency(prow, dependency)
1✔
906

907
    return prow
1✔
908

909
def filename_to_jobname(filename):
1✔
910
    """
911
    Convert a filename to the job name it corresponds to.
912
    Example filenames include: 'biasnight', 'darknight', 'badcolumns', 'ctecorrnight', 'psfnight', 'fiberflatnight
913
    Args:
914
        filename, str. The name of the file to convert.
915

916
    Returns:
917
        str: The job name corresponding to the input filename.
918
    """
919
    if filename.startswith('biasnight'):
×
920
        return 'biaspdark'
×
921
    elif filename.startswith('darknight'):
×
922
        return 'ccdcalib'    
×
923
    elif filename.startswith('badcolumns'):
×
924
        return 'ccdcalib'
×
925
    elif filename.startswith('ctecorrnight'):
×
926
        return 'ccdcalib'
×
927
    if filename.startswith('psfnight'):
×
928
        return 'psfnight'
×
929
    elif filename.startswith('fiberflatnight'):
×
930
        return 'nightlyflat'
×
931
    else:
932
        return 'tilenight'
×
933

934

935
def assign_dependency(prow, dependency):
1✔
936
    """
937
    Given input processing row and possible arcjob (processing row for psfnight) and flatjob (processing row for
938
    nightlyflat), this defines the JOBDESC keyword and assigns the dependency appropriate for the job type of prow.
939

940
    Args:
941
        prow, Table.Row or dict. Must include keyword accessible definitions for 'OBSTYPE'. A row must have column names for
942
            'JOBDESC', 'INT_DEP_IDS', and 'LATEST_DEP_ID'.
943
        dependency, NoneType or scalar/list/array of Table.Row, dict. Processing row corresponding to the required input
944
            for the job in prow. This must contain keyword
945
            accessible values for 'INTID', and 'LATEST_QID'.
946
            If None, it assumes the dependency doesn't exist
947
            and no dependency is assigned.
948

949
    Returns:
950
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
951
        'JOBDESC', 'INT_DEP_IDS'. and 'LATEST_DEP_ID'.
952

953
    Note:
954
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
955
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
956
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
957
    """
958
    prow['INT_DEP_IDS'] = np.ndarray(shape=0).astype(int)
1✔
959
    prow['LATEST_DEP_QID'] = np.ndarray(shape=0).astype(int)
1✔
960
    if dependency is not None:
1✔
961
        if type(dependency) in [list, np.array]:
1✔
962
            ids, qids = [], []
1✔
963
            for curdep in dependency:
1✔
964
                ids.append(curdep['INTID'])
1✔
965
                if still_a_dependency(curdep):
1✔
966
                    # ids.append(curdep['INTID'])
967
                    qids.append(curdep['LATEST_QID'])
1✔
968
            prow['INT_DEP_IDS'] = np.array(ids, dtype=int)
1✔
969
            prow['LATEST_DEP_QID'] = np.array(qids, dtype=int)
1✔
970
        elif type(dependency) in [dict, OrderedDict, Table.Row]:
1✔
971
            prow['INT_DEP_IDS'] = np.array([dependency['INTID']], dtype=int)
1✔
972
            if still_a_dependency(dependency):
1✔
973
                prow['LATEST_DEP_QID'] = np.array([dependency['LATEST_QID']], dtype=int)
1✔
974
    return prow
1✔
975

976
def still_a_dependency(dependency):
1✔
977
    """
978
    Defines the criteria for which a dependency is deemed complete (and therefore no longer a dependency).
979

980
     Args:
981
        dependency, Table.Row or dict. Processing row corresponding to the required input for the job in prow.
982
            This must contain keyword accessible values for 'STATUS', and 'LATEST_QID'.
983

984
    Returns:
985
        bool: False if the criteria indicate that the dependency is completed and no longer a blocking factor (ie no longer
986
        a genuine dependency). Returns True if the dependency is still a blocking factor such that the slurm
987
        scheduler needs to be aware of the pending job.
988

989
    """
990
    return dependency['LATEST_QID'] > 0 and dependency['STATUS'] != 'COMPLETED'
1✔
991

992
def get_type_and_tile(erow):
1✔
993
    """
994
    Trivial function to return the OBSTYPE and the TILEID from an exposure table row
995

996
    Args:
997
        erow, Table.Row or dict. Must contain 'OBSTYPE' and 'TILEID' as keywords.
998

999
    Returns:
1000
        tuple (str, str), corresponding to the OBSTYPE and TILEID values of the input erow.
1001
    """
1002
    return str(erow['OBSTYPE']).lower(), erow['TILEID']
×
1003

1004

1005
#############################################
1006
#########   Table manipulators   ############
1007
#############################################
1008
def parse_previous_tables(etable, ptable, night):
1✔
1009
    """
1010
    This takes in the exposure and processing tables and regenerates all the working memory variables needed for the
1011
    daily processing script.
1012

1013
    Used by the daily processing to define most of its state-ful variables into working memory.
1014
    If the processing table is empty, these are simply declared and returned for use.
1015
    If the code had previously run and exited (or crashed), however, this will all the code to
1016
    re-establish itself by redefining these values.
1017

1018
    Args:
1019
        etable, Table, Exposure table of all exposures that have been dealt with thus far.
1020
        ptable, Table, Processing table of all exposures that have been processed.
1021
        night, str or int, the night the data was taken.
1022

1023
    Returns:
1024
        tuple: A tuple containing:
1025

1026
        * arcs, list of dicts, list of the individual arc jobs used for the psfnight (NOT all
1027
          the arcs, if multiple sets existed)
1028
        * flats, list of dicts, list of the individual flat jobs used for the nightlyflat (NOT
1029
          all the flats, if multiple sets existed)
1030
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
1031
          (if currently processing that tile)
1032
        * calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'badcol', 'psfnight'
1033
          and 'nightlyflat'. Each key corresponds to a Table.Row or
1034
          None. The table.Row() values are for the corresponding
1035
          calibration job.
1036
        * curtype, None, the obstype of the current job being run. Always None as first new job will define this.
1037
        * lasttype, str or None, the obstype of the last individual exposure row to be processed.
1038
        * curtile, None, the tileid of the current job (if science). Otherwise None. Always None as first
1039
          new job will define this.
1040
        * lasttile, str or None, the tileid of the last job (if science). Otherwise None.
1041
        * internal_id, int, an internal identifier unique to each job. Increments with each new job. This
1042
          is the latest unassigned value.
1043
    """
1044
    log = get_logger()
×
1045
    arcs, flats, sciences = [], [], []
×
1046
    calibjobs = {'nightlybias': None, 'ccdcalib': None, 'badcol': None,
×
1047
                 'psfnight': None, 'nightlyflat': None, 'linkcal': None,
1048
                 'accounted_for': dict()}
1049
    curtype,lasttype = None,None
×
1050
    curtile,lasttile = None,None
×
1051

1052
    if len(ptable) > 0:
×
1053
        prow = ptable[-1]
×
1054
        internal_id = int(prow['INTID'])+1
×
1055
        lasttype,lasttile = get_type_and_tile(ptable[-1])
×
1056
        jobtypes = ptable['JOBDESC']
×
1057

1058
        if 'nightlybias' in jobtypes:
×
1059
            calibjobs['nightlybias'] = table_row_to_dict(ptable[jobtypes=='nightlybias'][0])
×
1060
            log.info("Located nightlybias job in exposure table: {}".format(calibjobs['nightlybias']))
×
1061

1062
        if 'ccdcalib' in jobtypes:
×
1063
            calibjobs['ccdcalib'] = table_row_to_dict(ptable[jobtypes=='ccdcalib'][0])
×
1064
            log.info("Located ccdcalib job in exposure table: {}".format(calibjobs['ccdcalib']))
×
1065

1066
        if 'psfnight' in jobtypes:
×
1067
            calibjobs['psfnight'] = table_row_to_dict(ptable[jobtypes=='psfnight'][0])
×
1068
            log.info("Located joint fit psfnight job in exposure table: {}".format(calibjobs['psfnight']))
×
1069
        elif lasttype == 'arc':
×
1070
            seqnum = 10
×
1071
            for row in ptable[::-1]:
×
1072
                erow = etable[etable['EXPID']==row['EXPID'][0]]
×
1073
                if row['OBSTYPE'].lower() == 'arc' and int(erow['SEQNUM'])<seqnum:
×
1074
                    arcs.append(table_row_to_dict(row))
×
1075
                    seqnum = int(erow['SEQNUM'])
×
1076
                else:
1077
                    break
×
1078
            ## Because we work backword to fill in, we need to reverse them to get chronological order back
1079
            arcs = arcs[::-1]
×
1080

1081
        if 'nightlyflat' in jobtypes:
×
1082
            calibjobs['nightlyflat'] = table_row_to_dict(ptable[jobtypes=='nightlyflat'][0])
×
1083
            log.info("Located joint fit nightlyflat job in exposure table: {}".format(calibjobs['nightlyflat']))
×
1084
        elif lasttype == 'flat':
×
1085
            for row in ptable[::-1]:
×
1086
                erow = etable[etable['EXPID']==row['EXPID'][0]]
×
1087
                if row['OBSTYPE'].lower() == 'flat' and int(erow['SEQTOT']) < 5:
×
1088
                    if float(erow['EXPTIME']) > 100.:
×
1089
                        flats.append(table_row_to_dict(row))
×
1090
                else:
1091
                    break
×
1092
            flats = flats[::-1]
×
1093

1094
        if lasttype.lower() == 'science':
×
1095
            for row in ptable[::-1]:
×
1096
                if row['OBSTYPE'].lower() == 'science' and row['TILEID'] == lasttile and \
×
1097
                   row['JOBDESC'] == 'prestdstar' and row['LASTSTEP'] != 'skysub':
1098
                    sciences.append(table_row_to_dict(row))
×
1099
                else:
1100
                    break
×
1101
            sciences = sciences[::-1]
×
1102
    else:
1103
        internal_id = night_to_starting_iid(night)
×
1104

1105
    return arcs,flats,sciences, \
×
1106
           calibjobs, \
1107
           curtype, lasttype, \
1108
           curtile, lasttile,\
1109
           internal_id
1110

1111
def generate_calibration_dict(ptable, files_to_link=None):
1✔
1112
    """
1113
    This takes in a processing table and regenerates the working memory calibration
1114
    dictionary for dependency tracking. Used by the daily processing to define 
1115
    most of its state-ful variables into working memory.
1116
    If the processing table is empty, these are simply declared and returned for use.
1117
    If the code had previously run and exited (or crashed), however, this will all the code to
1118
    re-establish itself by redefining these values.
1119

1120
    Args:
1121
        ptable, Table, Processing table of all exposures that have been processed.
1122
        files_to_link, set, Set of filenames that the linkcal job will link.
1123

1124
    Returns:
1125
        calibjobs, dict. Dictionary containing 'nightlybias', 'badcol', 'ccdcalib',
1126
            'psfnight', 'nightlyflat', 'linkcal', and 'completed'. Each key corresponds to a
1127
            Table.Row or None. The table.Row() values are for the corresponding
1128
            calibration job.
1129
    """
1130
    log = get_logger()
1✔
1131
    job_to_file_map = get_jobdesc_to_file_map()
1✔
1132
    accounted_for = {'biasnight': False, 'darknight':False, 'badcolumns': False,
1✔
1133
                     'ctecorrnight': False, 'psfnight': False,
1134
                     'fiberflatnight': False}
1135
    calibjobs = {'biasnight': None, 'biaspdark': None, 'ccdcalib': None, #'badcol': None,
1✔
1136
                 'psfnight': None, 'nightlyflat': None, 'linkcal': None}
1137

1138
    ptable_jobtypes = ptable['JOBDESC']
1✔
1139

1140
    for jobtype in calibjobs.keys():
1✔
1141
        if jobtype in ptable_jobtypes:
1✔
1142
            calibjobs[jobtype] = table_row_to_dict(ptable[ptable_jobtypes==jobtype][0])
1✔
1143
            log.info(f"Located {jobtype} job in processing table: {calibjobs[jobtype]}")
1✔
1144
            if jobtype in ['linkcal', 'biaspdark', 'ccdcalib'] and files_to_link is None:
1✔
1145
                night = int(ptable['NIGHT'][0]) ## jobtypes not empty, so has 1 or more rows
1✔
1146
                override_pathname = findfile('override', night=night, readonly=True)
1✔
1147
                overrides = load_override_file(filepathname=override_pathname)
1✔
1148
                cal_override = {}
1✔
1149
                if 'calibration' in overrides:
1✔
1150
                    cal_override = overrides['calibration']
1✔
1151
                orig_files_to_link = files_to_link
1✔
1152
                ## Determine calibrations that will be linked
1153
                if 'linkcal' in cal_override:
1✔
1154
                    files_to_link, files_not_linked = None, None
1✔
1155
                    if 'include' in cal_override['linkcal']:
1✔
1156
                        files_to_link = cal_override['linkcal']['include']
1✔
1157
                    if 'exclude' in cal_override['linkcal']:
1✔
1158
                        files_not_linked = cal_override['linkcal']['exclude']
×
1159
                    files_to_link, files_not_linked = derive_include_exclude(files_to_link,
1✔
1160
                                                                 files_not_linked)
1161
                    warn = f"linkcal job exists but no files given: {orig_files_to_link=}. " \
1✔
1162
                           + f"Used {override_pathname} to identify the following " \
1163
                           + f"linked files: {files_to_link}"
1164
                    log.warning(warn)
1✔
1165
                
1166
            if jobtype == 'linkcal':
1✔
1167
                if files_to_link is not None and len(files_to_link) > 0:
1✔
1168
                    log.info(f"Assuming existing linkcal job processed "
1✔
1169
                             + f"{files_to_link} since given in override file.")
1170
                    accounted_for = update_accounted_for_with_linking(accounted_for,
1✔
1171
                                                                  files_to_link)
1172
                else:
1173
                    err = f"linkcal job exists but no files given: {files_to_link=}"
×
1174
                    log.error(err)
×
1175
                    raise ValueError(err)
×
1176
            elif jobtype in ['biaspdark', 'ccdcalib']:
1✔
1177
                ## These are multi-step jobs, so assume what was processed
1178
                ## based on linking
1179
                if jobtype == 'biaspdark':
1✔
1180
                    ## we don't care about preproc_for_dark since we can still
1181
                    ## make a darknight without darks specifically from tonight
1182
                    possible_files = set(['biasnight']) 
1✔
1183
                else:
1184
                    possible_files = set(['darknight', 'badcolumns', 'ctecorrnight'])
×
1185
                    
1186
                if files_to_link is None:
1✔
1187
                    files_accounted_for = possible_files
1✔
1188
                else:
1189
                    files_accounted_for = possible_files.difference(files_to_link)
1✔
1190
                    files_linked = possible_files.intersection(files_to_link)
1✔
1191
                    log.info(f"Assuming existing {jobtype} job processed "
1✔
1192
                             + f"{files_accounted_for} since {files_linked} "
1193
                             + f"are linked.")
1194
                for fil in files_accounted_for:
1✔
1195
                    accounted_for[fil] = True
1✔
1196
            else:
1197
                accounted_for[job_to_file_map[jobtype]] = True
×
1198

1199
    calibjobs['accounted_for'] = accounted_for
1✔
1200
    return calibjobs
1✔
1201

1202
def update_accounted_for_with_linking(accounted_for, files_to_link):
1✔
1203
    """
1204
    This takes in a dictionary summarizing the calibration files accounted for
1205
     and updates it based on the files_to_link, which are assumed to have
1206
     already been linked such that those files already exist on disk and
1207
     don't need ot be generated.
1208

1209
    Parameters
1210
    ----------
1211
        accounted_for: dict
1212
            Dictionary containing 'biasnight', 'badcolumns', 'ctecorrnight',
1213
            'psfnight', and 'fiberflatnight'. Each value is True if file is
1214
            accounted for and False if it is not.
1215
        files_to_link: set
1216
            Set of filenames that the linkcal job will link.
1217

1218
    Returns
1219
    -------
1220
        accounted_for: dict
1221
            Dictionary containing 'biasnight', 'badcolumns', 'ctecorrnight',
1222
            'psfnight', and 'fiberflatnight'. Each value is True if file is
1223
            accounted for and False if it is not.
1224
    """
1225
    log = get_logger()
1✔
1226
    
1227
    for fil in files_to_link:
1✔
1228
        if fil in accounted_for:
1✔
1229
            accounted_for[fil] = True
1✔
1230
        else:
1231
            err = f"{fil} doesn't match an expected filetype: "
×
1232
            err += f"{accounted_for.keys()}"
×
1233
            log.error(err)
×
1234
            raise ValueError(err)
×
1235

1236
    return accounted_for
1✔
1237

1238
def all_calibs_submitted(accounted_for, do_cte_flats):
1✔
1239
    """
1240
    Function that returns the boolean logic to determine if the necessary
1241
    calibration jobs have been submitted for calibration.
1242

1243
    Args:
1244
        accounted_for, dict, Dictionary with keys corresponding to the calibration
1245
            filenames and values of True or False.
1246
        do_cte_flats, bool, whether ctecorrnight files are expected or not.
1247

1248
    Returns:
1249
        bool, True if all necessary calibrations have been submitted or handled, False otherwise.
1250
    """
1251
    test_dict = accounted_for.copy()
1✔
1252
    if not do_cte_flats:
1✔
1253
        test_dict.pop('ctecorrnight')
1✔
1254

1255
    return np.all(list(test_dict.values()))
1✔
1256

1257
def update_and_recursively_submit(proc_table, submits=0, max_resubs=100,
1✔
1258
                                  resubmission_states=None,
1259
                                  no_resub_failed=False, ptab_name=None,
1260
                                  dry_run_level=0, reservation=None,
1261
                                  expids=None, tileids=None):
1262
    """
1263
    Given an processing table, this loops over job rows and resubmits failed jobs (as defined by resubmission_states).
1264
    Before submitting a job, it checks the dependencies for failures. If a dependency needs to be resubmitted, it recursively
1265
    follows dependencies until it finds the first job without a failed dependency and resubmits that. Then resubmits the
1266
    other jobs with the new Slurm jobID's for proper dependency coordination within Slurm.
1267

1268
    Args:
1269
        proc_table, Table, the processing table with a row per job.
1270
        submits, int, the number of submissions made to the queue. Used for saving files and in not overloading the scheduler.
1271
        max_resubs, int, the number of times a job should be resubmitted before giving up. Default is very high at 100.
1272
        resubmission_states, list or array of strings, each element should be a capitalized string corresponding to a
1273
            possible Slurm scheduler state, where you wish for jobs with that
1274
            outcome to be resubmitted
1275
        no_resub_failed: bool. Set to True if you do NOT want to resubmit
1276
            jobs with Slurm status 'FAILED' by default. Default is False.
1277
        ptab_name, str, the full pathname where the processing table should be saved.
1278
        dry_run_level (int, optional): If nonzero, this is a simulated run. Default is 0.
1279
            0 which runs the code normally.
1280
            1 writes all files but doesn't submit any jobs to Slurm.
1281
            2 writes tables but doesn't write scripts or submit anything.
1282
            3 Doesn't write or submit anything but queries Slurm normally for job status.
1283
            4 Doesn't write, submit jobs, or query Slurm.
1284
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
1285
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
1286
        expids: list of ints. The exposure ids to resubmit (along with the jobs they depend on).
1287
        tileids: list of ints. The tile ids to resubmit (along with the jobs they depend on).
1288

1289
    Returns:
1290
        tuple: A tuple containing:
1291

1292
        * proc_table: Table, a table with the same rows as the input except that Slurm and jobid relevant columns have
1293
          been updated for those jobs that needed to be resubmitted.
1294
        * submits: int, the number of submissions made to the queue. This is incremented from the input submits, so it is
1295
          the number of submissions made from this function call plus the input submits value.
1296

1297
    Note:
1298
        This modifies the inputs of both proc_table and submits and returns them.
1299
    """
1300
    log = get_logger()
1✔
1301
    if tileids is not None and expids is not None:
1✔
1302
        msg = f"Provided both expids and tilesids. Please only provide one."
×
1303
        log.critical(msg)
×
1304
        raise AssertionError(msg)
1305
    elif tileids is not None:
1✔
1306
        msg = f"Only resubmitting the following tileids and the jobs they depend on: {tileids=}"
×
1307
        log.info(msg)
×
1308
    elif expids is not None:
1✔
1309
        msg = f"Only resubmitting the following expids and the jobs they depend on: {expids=}"
×
1310
        log.info(msg)
×
1311

1312
    if resubmission_states is None:
1✔
1313
        resubmission_states = get_resubmission_states(no_resub_failed=no_resub_failed)
1✔
1314

1315
    log.info(f"Resubmitting jobs with current states in the following: {resubmission_states}")
1✔
1316
    proc_table = update_from_queue(proc_table, dry_run_level=dry_run_level)
1✔
1317

1318
    log.info("Updated processing table queue information:")
1✔
1319
    cols = ['INTID', 'INT_DEP_IDS', 'EXPID', 'TILEID',
1✔
1320
            'OBSTYPE', 'JOBDESC', 'LATEST_QID', 'STATUS']
1321
    log.info(np.array(cols))
1✔
1322
    for row in proc_table:
1✔
1323
        log.info(np.array(row[cols]))
1✔
1324

1325
    ## If expids or tileids are given, subselect to the processing table rows
1326
    ## that included those exposures or tiles otherwise just list all indices
1327
    ## NOTE: Other rows can still be submitted if the selected rows depend on them
1328
    ## we hand the entire table to recursive_submit_failed(), which will walk the
1329
    ## entire dependency tree as necessary.
1330
    if expids is not None:
1✔
1331
        select_ptab_rows = np.where([np.any(np.isin(prow_eids, expids)) for prow_eids in proc_table['EXPID']])[0]
×
1332
    elif tileids is not None:
1✔
1333
        select_ptab_rows = np.where(np.isin(proc_table['TILEID'], tileids))[0]
×
1334
    else:
1335
        select_ptab_rows = np.arange(len(proc_table))
1✔
1336

1337
    log.info("\n")
1✔
1338
    id_to_row_map = {row['INTID']: rown for rown, row in enumerate(proc_table)}
1✔
1339
    ## Loop over all requested rows and resubmit those that have failed
1340
    for rown in select_ptab_rows:
1✔
1341
        if proc_table['STATUS'][rown] in resubmission_states:
1✔
1342
            proc_table, submits = recursive_submit_failed(rown=rown, proc_table=proc_table,
1✔
1343
                                                          submits=submits, max_resubs=max_resubs,
1344
                                                          id_to_row_map=id_to_row_map,
1345
                                                          ptab_name=ptab_name,
1346
                                                          resubmission_states=resubmission_states,
1347
                                                          reservation=reservation,
1348
                                                          dry_run_level=dry_run_level)
1349

1350
    proc_table = update_from_queue(proc_table, dry_run_level=dry_run_level)
1✔
1351

1352
    return proc_table, submits
1✔
1353

1354
def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, max_resubs=100, ptab_name=None,
1✔
1355
                            resubmission_states=None, reservation=None, dry_run_level=0):
1356
    """
1357
    Given a row of a processing table and the full processing table, this resubmits the given job.
1358
    Before submitting a job, it checks the dependencies for failures in the processing table. If a dependency needs to
1359
    be resubmitted, it recursively follows dependencies until it finds the first job without a failed dependency and
1360
    resubmits that. Then resubmits the other jobs with the new Slurm jobID's for proper dependency coordination within Slurm.
1361

1362
    Args:
1363
        rown, Table.Row, the row of the processing table that you want to resubmit.
1364
        proc_table, Table, the processing table with a row per job.
1365
        submits, int, the number of submissions made to the queue. Used for saving files and in not overloading the scheduler.
1366
        id_to_row_map, dict, lookup dictionary where the keys are internal ids (INTID's) and the values are the row position
1367
            in the processing table.
1368
        max_resubs, int, the number of times a job should be resubmitted before giving up. Default is very high at 100.
1369
        ptab_name, str, the full pathname where the processing table should be saved.
1370
        resubmission_states, list or array of strings, each element should be a capitalized string corresponding to a
1371
            possible Slurm scheduler state, where you wish for jobs with that
1372
            outcome to be resubmitted
1373
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
1374
        dry_run_level (int, optional): If nonzero, this is a simulated run. Default is 0.
1375
            0 which runs the code normally.
1376
            1 writes all files but doesn't submit any jobs to Slurm.
1377
            2 writes tables but doesn't write scripts or submit anything.
1378
            3 Doesn't write or submit anything but queries Slurm normally for job status.
1379
            4 Doesn't write, submit jobs, or query Slurm.
1380
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
1381

1382
    Returns:
1383
        tuple: A tuple containing:
1384

1385
        * proc_table: Table, a table with the same rows as the input except that Slurm and jobid relevant columns have
1386
          been updated for those jobs that needed to be resubmitted.
1387
        * submits: int, the number of submissions made to the queue. This is incremented from the input submits, so it is
1388
          the number of submissions made from this function call plus the input submits value.
1389

1390
    Note:
1391
        This modifies the inputs of both proc_table and submits and returns them.
1392
    """
1393
    log = get_logger()
1✔
1394
    row = proc_table[rown]
1✔
1395
    log.info(f"Identified row {row['INTID']} as needing resubmission.")
1✔
1396
    log.info(f"\t{row['INTID']}: Tileid={row['TILEID']}, Expid(s)={row['EXPID']}, Jobdesc={row['JOBDESC']}")
1✔
1397
    if len(proc_table['ALL_QIDS'][rown]) > max_resubs:
1✔
1398
        log.warning(f"Tileid={row['TILEID']}, Expid(s)={row['EXPID']}, "
×
1399
                    + f"Jobdesc={row['JOBDESC']} has already been submitted "
1400
                    + f"{max_resubs+1} times. Not resubmitting.")
1401
        proc_table['STATUS'][rown] = "MAX_RESUB"
×
1402
        return proc_table, submits
×
1403
    if resubmission_states is None:
1✔
1404
        resubmission_states = get_resubmission_states()
×
1405
    ideps = proc_table['INT_DEP_IDS'][rown]
1✔
1406
    if ideps is None or len(ideps)==0:
1✔
1407
        proc_table['LATEST_DEP_QID'][rown] = np.ndarray(shape=0).astype(int)
×
1408
    else:
1409
        all_valid_states = list(resubmission_states.copy())
1✔
1410
        good_states = ['RUNNING','PENDING','SUBMITTED','COMPLETED']
1✔
1411
        all_valid_states.extend(good_states)
1✔
1412
        othernight_idep_row_lookup = {}
1✔
1413
        for idep in np.sort(np.atleast_1d(ideps)):
1✔
1414
            if idep not in id_to_row_map:
1✔
1415
                if idep // 1000 != row['INTID'] // 1000:
1✔
1416
                    log.debug("Internal ID: %d not in id_to_row_map. "
1✔
1417
                             + "This is expected since it is from another day. ", idep)
1418
                    reference_night = 20000000 + (idep // 1000)
1✔
1419
                    reftab = read_minimal_full_proctab_cols(nights=[reference_night])
1✔
1420
                    if reftab is None:
1✔
1421
                        msg = f"The dependency is from night={reference_night}" \
×
1422
                              + f" but read_minimal_full_proctab_cols couldn't" \
1423
                              + f" locate that processing table, this is a " \
1424
                              +  f"fatal error."
1425
                        log.critical(msg)
×
1426
                        raise ValueError(msg)
×
1427
                    reftab = update_from_queue(reftab, dry_run_level=dry_run_level)
1✔
1428
                    entry = reftab[reftab['INTID'] == idep][0]
1✔
1429
                    if entry['STATUS'] not in good_states:
1✔
1430
                        msg = f"Internal ID: {idep} not in id_to_row_map. " \
1✔
1431
                              + f"Since the dependency is from night={reference_night} " \
1432
                              + f"and that job isn't in a good state this is an " \
1433
                              + f"error we can't overcome."
1434
                        log.error(msg)
1✔
1435
                        proc_table['STATUS'][rown] = "DEP_NOT_SUBD"
1✔
1436
                        return proc_table, submits
1✔
1437
                    else:
1438
                        ## otherwise if incomplete, just update the cache to use this
1439
                        ## in the next stage
1440
                        othernight_idep_row_lookup[idep] = entry
1✔
1441
                        update_full_ptab_cache(reftab)
1✔
1442
                else:
1443
                    msg = f"Internal ID: {idep} not in id_to_row_map. " \
×
1444
                         + f"Since the dependency is from the same night" \
1445
                         + f" and we can't find it, this is a fatal error."
1446
                    log.critical(msg)
×
1447
                    raise ValueError(msg)
×
1448
            elif proc_table['STATUS'][id_to_row_map[idep]] not in all_valid_states:
1✔
1449
                log.error(f"Proc INTID: {proc_table['INTID'][rown]} depended on" +
×
1450
                            f" INTID {proc_table['INTID'][id_to_row_map[idep]]}" +
1451
                            f" but that exposure has state" +
1452
                            f" {proc_table['STATUS'][id_to_row_map[idep]]} that" +
1453
                            f" isn't in the list of resubmission states." +
1454
                            f" Exiting this job's resubmission attempt.")
1455
                proc_table['STATUS'][rown] = "DEP_NOT_SUBD"
×
1456
                return proc_table, submits
×
1457
        qdeps = []
1✔
1458
        for idep in np.sort(np.atleast_1d(ideps)):
1✔
1459
            if idep in id_to_row_map:
1✔
1460
                if proc_table['STATUS'][id_to_row_map[idep]] in resubmission_states:
1✔
1461
                    proc_table, submits = recursive_submit_failed(id_to_row_map[idep],
×
1462
                                                                  proc_table, submits,
1463
                                                                  id_to_row_map,
1464
                                                                  reservation=reservation,
1465
                                                                  dry_run_level=dry_run_level)
1466
                ## Now that we've resubmitted the dependency if necessary,
1467
                ## add the most recent QID to the list assuming it isn't COMPLETED
1468
                if still_a_dependency(proc_table[id_to_row_map[idep]]):
1✔
1469
                    qdeps.append(proc_table['LATEST_QID'][id_to_row_map[idep]])
1✔
1470
                else:
1471
                    log.info(f"{idep} is COMPLETED. Not submitting as a dependency.")
×
1472
            else:
1473
                ## Since we verified above that the cross night QID is still
1474
                ## either pending or successful, add that to the list of QID's
1475
                if still_a_dependency(othernight_idep_row_lookup[idep]):
1✔
1476
                    qdeps.append(othernight_idep_row_lookup[idep]['LATEST_QID'])
1✔
1477
                else:
1478
                    log.info(f"{idep} is COMPLETED. Not submitting as a dependency.")
×
1479

1480
        qdeps = np.atleast_1d(qdeps)
1✔
1481
        proc_table['LATEST_DEP_QID'][rown] = qdeps
1✔
1482
        if len(qdeps) < len(ideps):
1✔
1483
            log.warning(f"Number of internal dependencies was {len(ideps)} but number "
×
1484
                        + f"of queue deps is {len(qdeps)} for Rown {rown}, ideps {ideps}."
1485
                        + " This is expected if the ideps were status=COMPLETED")
1486

1487
    proc_table[rown] = submit_batch_script(proc_table[rown], reservation=reservation,
1✔
1488
                                           strictly_successful=True, dry_run=dry_run_level)
1489
    submits += 1
1✔
1490

1491
    if dry_run_level < 3:
1✔
1492
        if ptab_name is None:
×
1493
            write_table(proc_table, tabletype='processing', overwrite=True)
×
1494
        else:
1495
            write_table(proc_table, tablename=ptab_name, overwrite=True)
×
1496
        sleep_and_report(0.1 + 0.1*(submits % 10 == 0),
×
1497
                         message_suffix=f"after submitting job to queue and writing proctable")
1498
    return proc_table, submits
1✔
1499

1500

1501
#########################################
1502
########     Joint fit     ##############
1503
#########################################
1504
def joint_fit(ptable, prows, internal_id, queue, reservation, descriptor, z_submit_types=None,
1✔
1505
              dry_run=0, strictly_successful=False, check_for_outputs=True, resubmit_partial_complete=True,
1506
              system_name=None):
1507
    """
1508
    DEPRECATED
1509
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1510
    joint fitting job given by descriptor. If the joint fitting job is standard star fitting, the post standard star fits
1511
    for all the individual exposures also created and submitted. The returned ptable has all of these rows added to the
1512
    table given as input.
1513

1514
    Args:
1515
        ptable (Table): The processing table where each row is a processed job.
1516
        prows (list or array of dict): The rows corresponding to the individual exposure jobs that are
1517
            inputs to the joint fit.
1518
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1519
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1520
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1521
        descriptor (str): Description of the joint fitting job. Can either be 'science' or 'stdstarfit', 'arc' or 'psfnight',
1522
            or 'flat' or 'nightlyflat'.
1523
        z_submit_types (list of str, optional): The "group" types of redshifts that should be submitted with each
1524
            exposure. If not specified or None, then no redshifts are submitted.
1525
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
1526
            0 which runs the code normally.
1527
            1 writes all files but doesn't submit any jobs to Slurm.
1528
            2 writes tables but doesn't write scripts or submit anything.
1529
            3 Doesn't write or submit anything but queries Slurm normally for job status.
1530
            4 Doesn't write, submit jobs, or query Slurm.
1531
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
1532
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1533
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1534
            than failing completely from failed calibrations. Default is False.
1535
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1536
            data products for the script being submitted. If all files exist and this is True,
1537
            then the script will not be submitted. If some files exist and this is True, only the
1538
            subset of the cameras without the final data products will be generated and submitted.
1539
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1540
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1541
            remaining cameras not found to exist.
1542
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1543

1544
    Returns:
1545
        tuple: A tuple containing:
1546

1547
        * ptable, Table. The same processing table as input except with added rows for the joint fit job and, in the case
1548
          of a stdstarfit, the poststdstar science exposure jobs.
1549
        * joint_prow, dict. Row of a processing table corresponding to the joint fit job.
1550
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1551
    """
1552
    log = get_logger()
×
1553
    if len(prows) < 1:
×
1554
        return ptable, None, internal_id
×
1555

1556
    if descriptor is None:
×
1557
        return ptable, None
×
1558
    elif descriptor == 'arc':
×
1559
        descriptor = 'psfnight'
×
1560
    elif descriptor == 'flat':
×
1561
        descriptor = 'nightlyflat'
×
1562
    elif descriptor == 'science':
×
1563
        if z_submit_types is None or len(z_submit_types) == 0:
×
1564
            descriptor = 'stdstarfit'
×
1565

1566
    if descriptor not in ['psfnight', 'nightlyflat', 'science','stdstarfit']:
×
1567
        return ptable, None, internal_id
×
1568

1569
    log.info(" ")
×
1570
    log.info(f"Joint fit criteria found. Running {descriptor}.\n")
×
1571

1572
    if descriptor == 'science':
×
1573
        joint_prow, internal_id = make_joint_prow(prows, descriptor='stdstarfit', internal_id=internal_id)
×
1574
    else:
1575
        joint_prow, internal_id = make_joint_prow(prows, descriptor=descriptor, internal_id=internal_id)
×
1576
    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1577
                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1578
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1579
    ptable.add_row(joint_prow)
×
1580

1581
    if descriptor in ['science','stdstarfit']:
×
1582
        if descriptor == 'science':
×
1583
            zprows = []
×
1584
        log.info(" ")
×
1585
        log.info(f"Submitting individual science exposures now that joint fitting of standard stars is submitted.\n")
×
1586
        for row in prows:
×
1587
            if row['LASTSTEP'] == 'stdstarfit':
×
1588
                continue
×
1589
            row['JOBDESC'] = 'poststdstar'
×
1590

1591
            # poststdstar job can't process cameras not included in its stdstar joint fit
1592
            stdcamword = joint_prow['PROCCAMWORD']
×
1593
            thiscamword = row['PROCCAMWORD']
×
1594
            proccamword = camword_intersection([stdcamword, thiscamword])
×
1595
            if proccamword != thiscamword:
×
1596
                dropcams = difference_camwords(thiscamword, proccamword)
×
1597
                assert dropcams != ''  #- i.e. if they differ, we should be dropping something
×
1598
                log.warning(f"Dropping exp {row['EXPID']} poststdstar cameras {dropcams} since they weren't included in stdstar fit {stdcamword}")
×
1599
                row['PROCCAMWORD'] = proccamword
×
1600

1601
            row['INTID'] = internal_id
×
1602
            internal_id += 1
×
1603
            row['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
×
1604
            row = assign_dependency(row, joint_prow)
×
1605
            row = create_and_submit(row, queue=queue, reservation=reservation, dry_run=dry_run,
×
1606
                                    strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1607
                                    resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1608
            ptable.add_row(row)
×
1609
            if descriptor == 'science' and row['LASTSTEP'] == 'all':
×
1610
                zprows.append(row)
×
1611

1612
    ## Now run redshifts
1613
    if descriptor == 'science' and len(zprows) > 0 and z_submit_types is not None:
×
1614
        prow_selection = (  (ptable['OBSTYPE'] == 'science')
×
1615
                          & (ptable['LASTSTEP'] == 'all')
1616
                          & (ptable['JOBDESC'] == 'poststdstar')
1617
                          & (ptable['TILEID'] == int(zprows[0]['TILEID'])) )
1618
        nightly_zprows = []
×
1619
        if np.sum(prow_selection) == len(zprows):
×
1620
            nightly_zprows = zprows.copy()
×
1621
        else:
1622
            for prow in ptable[prow_selection]:
×
1623
                nightly_zprows.append(table_row_to_dict(prow))
×
1624

1625
        for zsubtype in z_submit_types:
×
1626
            if zsubtype == 'perexp':
×
1627
                for zprow in zprows:
×
1628
                    log.info(" ")
×
1629
                    log.info(f"Submitting redshift fit of type {zsubtype} for TILEID {zprow['TILEID']} and EXPID {zprow['EXPID']}.\n")
×
1630
                    joint_prow, internal_id = make_joint_prow([zprow], descriptor=zsubtype, internal_id=internal_id)
×
1631
                    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1632
                                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1633
                                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1634
                    ptable.add_row(joint_prow)
×
1635
            else:
1636
                log.info(" ")
×
1637
                log.info(f"Submitting joint redshift fits of type {zsubtype} for TILEID {nightly_zprows[0]['TILEID']}.")
×
1638
                expids = [prow['EXPID'][0] for prow in nightly_zprows]
×
1639
                log.info(f"Expids: {expids}.\n")
×
1640
                joint_prow, internal_id = make_joint_prow(nightly_zprows, descriptor=zsubtype, internal_id=internal_id)
×
1641
                joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1642
                                               strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1643
                                               resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1644
                ptable.add_row(joint_prow)
×
1645

1646
    if descriptor in ['psfnight', 'nightlyflat']:
×
1647
        log.info(f"Setting the calibration exposures as calibrators in the processing table.\n")
×
1648
        ptable = set_calibrator_flag(prows, ptable)
×
1649

1650
    return ptable, joint_prow, internal_id
×
1651

1652
def joint_cal_fit(descriptor, ptable, prows, internal_id, queue, reservation,
1✔
1653
                  dry_run=0, strictly_successful=False, check_for_outputs=True,
1654
                  resubmit_partial_complete=True, system_name=None):
1655
    """
1656
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1657
    joint fitting job given by descriptor. If the joint fitting job is standard star fitting, the post standard star fits
1658
    for all the individual exposures also created and submitted. The returned ptable has all of these rows added to the
1659
    table given as input.
1660

1661
    Args:
1662
        descriptor (str): Description of the joint fitting job. Can either be 'science' or 'stdstarfit', 'arc' or 'psfnight',
1663
            or 'flat' or 'nightlyflat'.
1664
        prows (list or array of dict): The rows corresponding to the individual exposure jobs that are
1665
            inputs to the joint fit.
1666
        ptable (Table): The processing table where each row is a processed job.
1667
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1668
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1669
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1670
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
1671
            0 which runs the code normally.
1672
            1 writes all files but doesn't submit any jobs to Slurm.
1673
            2 writes tables but doesn't write scripts or submit anything.
1674
            3 Doesn't write or submit anything but queries Slurm normally for job status.
1675
            4 Doesn't write, submit jobs, or query Slurm.
1676
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
1677
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1678
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1679
            than failing completely from failed calibrations. Default is False.
1680
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1681
            data products for the script being submitted. If all files exist and this is True,
1682
            then the script will not be submitted. If some files exist and this is True, only the
1683
            subset of the cameras without the final data products will be generated and submitted.
1684
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1685
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1686
            remaining cameras not found to exist.
1687
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1688

1689
    Returns:
1690
        tuple: A tuple containing:
1691

1692
        * ptable, Table. The same processing table as input except with added rows for the joint fit job and, in the case
1693
          of a stdstarfit, the poststdstar science exposure jobs.
1694
        * joint_prow, dict. Row of a processing table corresponding to the joint fit job.
1695
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1696
    """
1697
    log = get_logger()
×
1698
    if len(prows) < 1:
×
1699
        return ptable, None, internal_id
×
1700

1701
    if descriptor is None:
×
1702
        return ptable, None
×
1703
    elif descriptor == 'arc':
×
1704
        descriptor = 'psfnight'
×
1705
    elif descriptor == 'flat':
×
1706
        descriptor = 'nightlyflat'
×
1707

1708
    if descriptor not in ['psfnight', 'nightlyflat']:
×
1709
        return ptable, None, internal_id
×
1710

1711
    log.info(" ")
×
1712
    log.info(f"Joint fit criteria found. Running {descriptor}.\n")
×
1713

1714
    joint_prow, internal_id = make_joint_prow(prows, descriptor=descriptor, internal_id=internal_id)
×
1715
    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1716
                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1717
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1718
    ptable.add_row(joint_prow)
×
1719

1720
    if descriptor in ['psfnight', 'nightlyflat']:
×
1721
        log.info(f"Setting the calibration exposures as calibrators in the processing table.\n")
×
1722
        ptable = set_calibrator_flag(prows, ptable)
×
1723

1724
    return ptable, joint_prow, internal_id
×
1725

1726

1727
#########################################
1728
########     Redshifts     ##############
1729
#########################################
1730
def submit_redshifts(ptable, prows, tnight, internal_id, queue, reservation,
1✔
1731
              dry_run=0, strictly_successful=False,
1732
              check_for_outputs=True, resubmit_partial_complete=True,
1733
              z_submit_types=None, system_name=None):
1734
    """
1735
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1736
    tilenight job given by descriptor. The returned ptable has all of these rows added to the
1737
    table given as input.
1738

1739
    Args:
1740
        ptable (Table): The processing table where each row is a processed job.
1741
        prows (list or array of dict): Unsubmitted prestdstar jobs that are first steps in tilenight.
1742
        tnight (Table.Row): The processing table row of the tilenight job on which the redshifts depend.
1743
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1744
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1745
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1746
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
1747
            0 which runs the code normally.
1748
            1 writes all files but doesn't submit any jobs to Slurm.
1749
            2 writes tables but doesn't write scripts or submit anything.
1750
            3 Doesn't write or submit anything but queries Slurm normally for job status.
1751
            4 Doesn't write, submit jobs, or query Slurm.
1752
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
1753
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1754
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1755
            than failing completely from failed calibrations. Default is False.
1756
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1757
            data products for the script being submitted. If all files exist and this is True,
1758
            then the script will not be submitted. If some files exist and this is True, only the
1759
            subset of the cameras without the final data products will be generated and submitted.
1760
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1761
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1762
            remaining cameras not found to exist.
1763
        z_submit_types (list of str): The "group" types of redshifts that should be submitted with each
1764
            exposure. If not specified or None, then no redshifts are submitted.
1765
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1766

1767
    Returns:
1768
        tuple: A tuple containing:
1769

1770
        * ptable, Table. The same processing table as input except with added rows for the joint fit job.
1771
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1772
    """
1773
    log = get_logger()
1✔
1774
    if len(prows) < 1 or z_submit_types == None:
1✔
1775
        return ptable, internal_id
1✔
1776

1777
    log.info(" ")
1✔
1778
    log.info(f"Running redshifts.\n")
1✔
1779

1780
    ## Now run redshifts
1781
    zprows = []
1✔
1782
    for row in prows:
1✔
1783
        if row['LASTSTEP'] == 'all':
1✔
1784
            zprows.append(row)
1✔
1785

1786
    if len(zprows) > 0:
1✔
1787
        for zsubtype in z_submit_types:
1✔
1788
            log.info(" ")
1✔
1789
            log.info(f"Submitting joint redshift fits of type {zsubtype} for TILEID {zprows[0]['TILEID']}.")
1✔
1790
            if zsubtype == 'perexp':
1✔
1791
                for zprow in zprows:
×
1792
                    log.info(f"EXPID: {zprow['EXPID']}.\n")
×
1793
                    redshift_prow = make_redshift_prow([zprow], tnight, descriptor=zsubtype, internal_id=internal_id)
×
1794
                    internal_id += 1
×
1795
                    redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1796
                                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1797
                                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1798
                    ptable.add_row(redshift_prow)
×
1799
            elif zsubtype == 'cumulative':
1✔
1800
                tileids = np.unique([prow['TILEID'] for prow in zprows])
1✔
1801
                if len(tileids) > 1:
1✔
1802
                    msg = f"Error, more than one tileid provided for cumulative redshift job: {tileids}"
×
1803
                    log.critical(msg)
×
1804
                    raise ValueError(msg)
×
1805
                nights = np.unique([prow['NIGHT'] for prow in zprows])
1✔
1806
                if len(nights) > 1:
1✔
1807
                    msg = f"Error, more than one night provided for cumulative redshift job: {nights}"
×
1808
                    log.critical(msg)
×
1809
                    raise ValueError(msg)
×
1810
                tileid, night = tileids[0], nights[0]
1✔
1811
                ## For cumulative redshifts, get any existing processing rows for tile
1812
                matched_prows = read_minimal_tilenight_proctab_cols(tileids=tileids)
1✔
1813
                ## Identify the processing rows that should be assigned as dependecies
1814
                ## tnight should be first such that the new job inherits the other metadata from it
1815
                tnights = [tnight]
1✔
1816
                if matched_prows is not None:
1✔
1817
                    matched_prows = matched_prows[matched_prows['NIGHT'] <= night]
1✔
1818
                    for prow in matched_prows:
1✔
1819
                        if prow['INTID'] != tnight['INTID']:
1✔
1820
                            tnights.append(prow)
1✔
1821
                log.info(f"Internal Processing IDs: {[prow['INTID'] for prow in tnights]}.\n")
1✔
1822
                ## Identify all exposures that should go into the fit
1823
                expids = [prow['EXPID'][0] for prow in zprows]
1✔
1824
                ## note we can actually get the full list of exposures, but for now
1825
                ## we'll stay consistent with old processing where we only list exposures
1826
                ## from the current night
1827
                ## For cumulative redshifts, get valid expids from exptables
1828
                #matched_erows = read_minimal_science_exptab_cols(tileids=tileids)
1829
                #matched_erows = matched_erows[matched_erows['NIGHT']<=night]
1830
                #expids = list(set([prow['EXPID'][0] for prow in zprows])+set(matched_erows['EXPID']))
1831
                log.info(f"Expids: {expids}.\n")
1✔
1832
                redshift_prow, internal_id = make_joint_prow(tnights, descriptor=zsubtype, internal_id=internal_id)
1✔
1833
                redshift_prow['EXPID'] = expids
1✔
1834
                redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
1✔
1835
                                               strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1836
                                               resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1837
                ptable.add_row(redshift_prow)
1✔
1838
            else: # pernight
1839
                expids = [prow['EXPID'][0] for prow in zprows]
×
1840
                log.info(f"Expids: {expids}.\n")
×
1841
                redshift_prow = make_redshift_prow(zprows, tnight, descriptor=zsubtype, internal_id=internal_id)
×
1842
                internal_id += 1
×
1843
                redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1844
                                               strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1845
                                               resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1846
                ptable.add_row(redshift_prow)
×
1847

1848
    return ptable, internal_id
1✔
1849

1850
#########################################
1851
########     Tilenight     ##############
1852
#########################################
1853
def submit_tilenight(ptable, prows, calibjobs, internal_id, queue, reservation,
1✔
1854
              dry_run=0, strictly_successful=False, resubmit_partial_complete=True,
1855
              system_name=None, use_specter=False, extra_job_args=None):
1856
    """
1857
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1858
    tilenight job given by descriptor. The returned ptable has all of these rows added to the
1859
    table given as input.
1860

1861
    Args:
1862
        ptable (Table): The processing table where each row is a processed job.
1863
        prows (list or array of dict): Unsubmitted prestdstar jobs that are first steps in tilenight.
1864
        calibjobs (dict): Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
1865
            and 'nightlyflat'. Each key corresponds to a Table.Row or
1866
            None. The table.Row() values are for the corresponding
1867
            calibration job.
1868
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1869
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1870
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1871
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
1872
            0 which runs the code normally.
1873
            1 writes all files but doesn't submit any jobs to Slurm.
1874
            2 writes tables but doesn't write scripts or submit anything.
1875
            3 Doesn't write or submit anything but queries Slurm normally for job status.
1876
            4 Doesn't write, submit jobs, or query Slurm.
1877
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
1878
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1879
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1880
            than failing completely from failed calibrations. Default is False.
1881
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1882
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1883
            remaining cameras not found to exist.
1884
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1885
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
1886
        extra_job_args (dict): Dictionary with key-value pairs that specify additional
1887
            information used for a specific type of job. Examples include
1888
            laststeps for for tilenight, etc.
1889

1890
    Returns:
1891
        tuple: A tuple containing:
1892

1893
        * ptable, Table. The same processing table as input except with added rows for the joint fit job.
1894
        * tnight_prow, dict. Row of a processing table corresponding to the tilenight job.
1895
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1896
    """
1897
    log = get_logger()
1✔
1898
    if len(prows) < 1:
1✔
1899
        return ptable, None, internal_id
×
1900

1901
    log.info(" ")
1✔
1902
    log.info(f"Running tilenight.\n")
1✔
1903

1904
    tnight_prow = make_tnight_prow(prows, calibjobs, internal_id=internal_id)
1✔
1905
    internal_id += 1
1✔
1906
    tnight_prow = create_and_submit(tnight_prow, queue=queue, reservation=reservation, dry_run=dry_run,
1✔
1907
                                   strictly_successful=strictly_successful, check_for_outputs=False,
1908
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name,
1909
                                   use_specter=use_specter, extra_job_args=extra_job_args)
1910
    ptable.add_row(tnight_prow)
1✔
1911

1912
    return ptable, tnight_prow, internal_id
1✔
1913

1914
## wrapper functions for joint fitting
1915
def science_joint_fit(ptable, sciences, internal_id, queue='realtime', reservation=None,
1✔
1916
                      z_submit_types=None, dry_run=0, strictly_successful=False,
1917
                      check_for_outputs=True, resubmit_partial_complete=True,
1918
                      system_name=None):
1919
    """
1920
    Wrapper function for desiproc.workflow.processing.joint_fit specific to the stdstarfit joint fit and redshift fitting.
1921

1922
    All variables are the same except:
1923

1924
        Arg 'sciences' is mapped to the prows argument of joint_fit.
1925
        The joint_fit argument descriptor is pre-defined as 'science'.
1926
    """
1927
    return joint_fit(ptable=ptable, prows=sciences, internal_id=internal_id, queue=queue, reservation=reservation,
×
1928
                     descriptor='science', z_submit_types=z_submit_types, dry_run=dry_run,
1929
                     strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1930
                     resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1931

1932

1933
def flat_joint_fit(ptable, flats, internal_id, queue='realtime',
1✔
1934
                   reservation=None, dry_run=0, strictly_successful=False,
1935
                   check_for_outputs=True, resubmit_partial_complete=True,
1936
                   system_name=None):
1937
    """
1938
    Wrapper function for desiproc.workflow.processing.joint_fit specific to the nightlyflat joint fit.
1939

1940
    All variables are the same except:
1941

1942
        Arg 'flats' is mapped to the prows argument of joint_fit.
1943
        The joint_fit argument descriptor is pre-defined as 'nightlyflat'.
1944
    """
1945
    return joint_fit(ptable=ptable, prows=flats, internal_id=internal_id, queue=queue, reservation=reservation,
×
1946
                     descriptor='nightlyflat', dry_run=dry_run, strictly_successful=strictly_successful,
1947
                     check_for_outputs=check_for_outputs, resubmit_partial_complete=resubmit_partial_complete,
1948
                     system_name=system_name)
1949

1950

1951
def arc_joint_fit(ptable, arcs, internal_id, queue='realtime',
1✔
1952
                  reservation=None, dry_run=0, strictly_successful=False,
1953
                  check_for_outputs=True, resubmit_partial_complete=True,
1954
                  system_name=None):
1955
    """
1956
    Wrapper function for desiproc.workflow.processing.joint_fit specific to the psfnight joint fit.
1957

1958
    All variables are the same except:
1959

1960
        Arg 'arcs' is mapped to the prows argument of joint_fit.
1961
        The joint_fit argument descriptor is pre-defined as 'psfnight'.
1962
    """
1963
    return joint_fit(ptable=ptable, prows=arcs, internal_id=internal_id, queue=queue, reservation=reservation,
×
1964
                     descriptor='psfnight', dry_run=dry_run, strictly_successful=strictly_successful,
1965
                     check_for_outputs=check_for_outputs, resubmit_partial_complete=resubmit_partial_complete,
1966
                     system_name=system_name)
1967

1968
def make_joint_prow(prows, descriptor, internal_id):
1✔
1969
    """
1970
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
1971
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
1972
    input prows).
1973

1974
    Args:
1975
        prows, list or array of dicts. The rows corresponding to the individual exposure jobs that are
1976
            inputs to the joint fit.
1977
        descriptor, str. Description of the joint fitting job. Can either be 'stdstarfit', 'psfnight', or 'nightlyflat'.
1978
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1979

1980
    Returns:
1981
        dict: Row of a processing table corresponding to the joint fit job.
1982
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1983
    """
1984
    log = get_logger()
1✔
1985
    first_row = table_row_to_dict(prows[0])
1✔
1986
    joint_prow = first_row.copy()
1✔
1987

1988
    joint_prow['INTID'] = internal_id
1✔
1989
    internal_id += 1
1✔
1990
    joint_prow['JOBDESC'] = descriptor
1✔
1991
    joint_prow['LATEST_QID'] = -99
1✔
1992
    joint_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
1✔
1993
    joint_prow['SUBMIT_DATE'] = -99
1✔
1994
    joint_prow['STATUS'] = 'UNSUBMITTED'
1✔
1995
    joint_prow['SCRIPTNAME'] = ''
1✔
1996
    joint_prow['EXPID'] = np.unique(np.concatenate([currow['EXPID'] for currow in prows])).astype(int)
1✔
1997

1998
    ## Assign the PROCCAMWORD based on the descriptor and the input exposures
1999
    ## UPDATE 2024-04-24: badamps are now included in arc/flat joint fits,
2000
    ## so grab all PROCCAMWORDs instead of filtering out BADAMP cameras
2001
    ## For flats we want any camera that exists in all 12 exposures
2002
    ## For arcs we want any camera that exists in at least 3 exposures
2003
    pcamwords = [prow['PROCCAMWORD'] for prow in prows]
1✔
2004
    if descriptor in 'stdstarfit':
1✔
2005
        joint_prow['PROCCAMWORD'] = camword_union(pcamwords,
×
2006
                                                  full_spectros_only=True)
2007
    elif descriptor in ['pernight', 'cumulative']:
1✔
2008
        joint_prow['PROCCAMWORD'] = camword_union(pcamwords,
1✔
2009
                                                  full_spectros_only=False)
2010
    elif descriptor == 'nightlyflat':
1✔
2011
        joint_prow['PROCCAMWORD'] = camword_intersection(pcamwords,
1✔
2012
                                                         full_spectros_only=False)
2013
    elif descriptor == 'psfnight':
1✔
2014
        ## Count number of exposures each camera is present for
2015
        camcheck = {}
1✔
2016
        for camword in pcamwords:
1✔
2017
            for cam in decode_camword(camword):
1✔
2018
                if cam in camcheck:
1✔
2019
                    camcheck[cam] += 1
1✔
2020
                else:
2021
                    camcheck[cam] = 1
1✔
2022
        ## if exists in 3 or more exposures, then include it
2023
        goodcams = []
1✔
2024
        for cam,camcount in camcheck.items():
1✔
2025
            if camcount >= 3:
1✔
2026
                goodcams.append(cam)
1✔
2027
        joint_prow['PROCCAMWORD'] = create_camword(goodcams)
1✔
2028
    else:
2029
        log.warning("Warning asked to produce joint proc table row for unknown"
×
2030
                    + f" job description {descriptor}")
2031

2032
    joint_prow = assign_dependency(joint_prow, dependency=prows)
1✔
2033
    return joint_prow, internal_id
1✔
2034

2035
def make_exposure_prow(erow, int_id, calibjobs, jobdesc=None):
1✔
2036
    prow = erow_to_prow(erow)
1✔
2037
    prow['INTID'] = int_id
1✔
2038
    int_id += 1
1✔
2039
    if jobdesc is None:
1✔
2040
        prow['JOBDESC'] = prow['OBSTYPE']
1✔
2041
    else:
2042
        prow['JOBDESC'] = jobdesc
1✔
2043
    prow = define_and_assign_dependency(prow, calibjobs)
1✔
2044
    return prow, int_id
1✔
2045

2046
def make_tnight_prow(prows, calibjobs, internal_id):
1✔
2047
    """
2048
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
2049
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
2050
    input prows).
2051

2052
    Args:
2053
        prows, list or array of dicts. Unsumbitted rows corresponding to the individual prestdstar jobs that are
2054
            the first steps of tilenight.
2055
        calibjobs, dict. Dictionary containing keys that each corresponds to a Table.Row or
2056
            None, with each table.Row() value corresponding to a calibration job
2057
            on which the tilenight job depends.
2058
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
2059

2060
    Returns:
2061
        dict: Row of a processing table corresponding to the tilenight job.
2062
    """
2063
    first_row = table_row_to_dict(prows[0])
1✔
2064
    joint_prow = first_row.copy()
1✔
2065

2066
    joint_prow['INTID'] = internal_id
1✔
2067
    joint_prow['JOBDESC'] = 'tilenight'
1✔
2068
    joint_prow['LATEST_QID'] = -99
1✔
2069
    joint_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
1✔
2070
    joint_prow['SUBMIT_DATE'] = -99
1✔
2071
    joint_prow['STATUS'] = 'UNSUBMITTED'
1✔
2072
    joint_prow['SCRIPTNAME'] = ''
1✔
2073
    joint_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int)
1✔
2074

2075
    joint_prow = define_and_assign_dependency(joint_prow, calibjobs, use_tilenight=True)
1✔
2076

2077
    return joint_prow
1✔
2078

2079
def make_redshift_prow(prows, tnights, descriptor, internal_id):
1✔
2080
    """
2081
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
2082
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
2083
    input prows).
2084

2085
    Args:
2086
        prows, list or array of dicts. Unsumbitted rows corresponding to the individual prestdstar jobs that are
2087
            the first steps of tilenight.
2088
        tnights, list or array of Table.Row objects. Rows corresponding to the tilenight jobs on which the redshift job depends.
2089
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
2090

2091
    Returns:
2092
        dict: Row of a processing table corresponding to the tilenight jobs.
2093
    """
2094
    first_row = table_row_to_dict(prows[0])
×
2095
    redshift_prow = first_row.copy()
×
2096

2097
    redshift_prow['INTID'] = internal_id
×
2098
    redshift_prow['JOBDESC'] = descriptor
×
2099
    redshift_prow['LATEST_QID'] = -99
×
2100
    redshift_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
×
2101
    redshift_prow['SUBMIT_DATE'] = -99
×
2102
    redshift_prow['STATUS'] = 'UNSUBMITTED'
×
2103
    redshift_prow['SCRIPTNAME'] = ''
×
2104
    redshift_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int)
×
2105

2106
    redshift_prow = assign_dependency(redshift_prow,dependency=tnights)
×
2107

2108
    return redshift_prow
×
2109

2110
def check_darknight_deps_and_update_prow(prow, n_nights_before=None, n_nights_after=None,
1✔
2111
                                    proc_table_path=None):
2112
    """
2113
    Update the processing row with the darknight dependencies.
2114

2115
    Args:
2116
        prow (dict): Processing row to be updated.
2117
        n_nights_before (int, optional): Number of nights before the given night to include in the darknight calculation.
2118
        n_nights_after (int, optional): Number of nights after the given night to include in the darknight calculation.
2119
        proc_table_path (str): Path to the processing table files.
2120

2121
    Returns:
2122
        dict: If sufficient cameras returns updated processing row with darknight dependencies, else the input prow
2123
    """
2124
    log = get_logger()
1✔
2125
    refnight = prow['NIGHT']
1✔
2126

2127
    compdarkparser = compute_dark_parser()
1✔
2128
    options = ['--reference-night', str(refnight), '-c', 'b1', '-o', 'temp',
1✔
2129
               '--skip-camera-check', '--dont-search-filesystem']
2130
    if n_nights_before is not None:
1✔
2131
        options.extend(['--before', str(n_nights_before)])
×
2132
    if n_nights_after is not None:
1✔
2133
        options.extend(['--after', str(n_nights_after)])
×
2134
    compdarkargs = compdarkparser.parse_args(options)
1✔
2135

2136
    exptab_for_dark_night = get_stacked_dark_exposure_table(compdarkargs)
1✔
2137
    
2138
    ## AK: as of now we always have enough darks, we just don't know if they are viable, so this is wasted computation
2139
    ### First see if we have enough exposures for each camera to make a viable darknight
2140
    #camera_counter = {cam: 0 for cam in decode_camword(prow['PROCCAMWORD'])}
2141
    #for erow in exptab_for_dark_night:
2142
    #    for cam in decode_camword(erow_to_goodcamword(erow, suppress_logging=True, exclude_badamps=True)):
2143
    #        camera_counter[cam] += 1
2144
    #enough_darks = np.all([count >= compdarkargs.min_dark_exposures for count in camera_counter.values()])
2145
    #if not enough_darks:
2146
    #    log.critical("Requested to do darknights, but not all cameras have sufficienct darks:"
2147
    #                 + f" {camera_counter=}, min_dark_exposures={compdarkargs.min_dark_exposures}, "
2148
    #                 + f"{n_nights_before=}, {n_nights_after=}. Exiting without submitting ccdcalib job.")
2149
    #    return prow, enough_darks
2150
    
2151
    ## Since we have enough exposures, update the processing table row with dependencies
2152
    nights = np.unique(np.append(exptab_for_dark_night['NIGHT'].data, [refnight]))
1✔
2153
    dep_intids, dep_qids = [], []
1✔
2154
    for night in nights:
1✔
2155
        nightly_expids = exptab_for_dark_night['EXPID'][exptab_for_dark_night['NIGHT'] == night].data
1✔
2156
        
2157
        ## Load in the files defined above
2158
        proc_table_pathname = findfile('processing_table', night=night, readonly=True)
1✔
2159
        if proc_table_path is not None:
1✔
2160
            proc_table_pathname = os.path.join(proc_table_path, os.path.basename(proc_table_pathname))
1✔
2161
        
2162
        ptable = load_table(tablename=proc_table_pathname, tabletype='proctable', suppress_logging=True)
1✔
2163
        if len(ptable) == 0:
1✔
2164
            log.error(f"Expected bias and/or pdark processing on {night=} for expids={nightly_expids}, but didn't find a table. Continuing")
1✔
2165
            continue
1✔
2166
        else:
2167
            biasdarks = ptable[np.isin(ptable['JOBDESC'].data, np.asarray([b'biasnight', b'biaspdark', b'pdark']))]
1✔
2168
            if len(biasdarks) == 0:
1✔
2169
                log.error(f"Expected biasnight, biaspdark, or pdark processing on {night=} for expids={nightly_expids}, but didn't find any "
×
2170
                          + f"entries: jobdescs={ptable['JOBDESC'].data}, expids={ptable['EXPID'].data},"
2171
                          + f"obstypes={ptable['OBSTYPE'].data}. Continuing anyway")
2172
            for biasdark in biasdarks:
1✔
2173
                ## if bias for the reference night or if any darks in the job are used for
2174
                ## the darknight, we should depend on the job
2175
                if np.any(np.isin(biasdark['EXPID'].data, nightly_expids)) or (str(biasdark['JOBDESC']).startswith('bias') and night == refnight):
1✔
2176
                    dep_intids.append(biasdark['INTID'])
1✔
2177
                    dep_qids.append(biasdark['LATEST_QID'])
1✔
2178

2179
    if len(dep_intids) > 0:
1✔
2180
        ## Note originally used argsort so that INT_DEP_IDS order would correspond to
2181
        ## LATEST_DEP_QID order, but because we remove LATEST_DEP_QID's if completed,
2182
        ## these are not the same length. So now we just sort INT_DEP_IDS and leave
2183
        ## LATEST_DEP_QID as-is
2184
        prow['INT_DEP_IDS'] = np.sort(np.concatenate([prow['INT_DEP_IDS'], dep_intids]))
1✔
2185
        prow['LATEST_DEP_QID'] = np.concatenate([prow['LATEST_DEP_QID'], dep_qids])
1✔
2186
        ## LATEST_DEP_QID contains 1's for jobs that already had files completed outside
2187
        ## the normal pipeline. Ignore those otherwise Slurm gets confused
2188
        prow['LATEST_DEP_QID'] = prow['LATEST_DEP_QID'][prow['LATEST_DEP_QID']>1]
1✔
2189

2190
    return prow#, enough_darks
1✔
2191

2192
def checkfor_and_submit_joint_job(ptable, arcs, flats, sciences, calibjobs,
1✔
2193
                                  lasttype, internal_id, z_submit_types=None, dry_run=0,
2194
                                  queue='realtime', reservation=None, strictly_successful=False,
2195
                                  check_for_outputs=True, resubmit_partial_complete=True,
2196
                                  system_name=None):
2197
    """
2198
    Takes all the state-ful data from daily processing and determines whether a joint fit needs to be submitted. Places
2199
    the decision criteria into a single function for easier maintainability over time. These are separate from the
2200
    new standard manifest*.json method of indicating a calibration sequence is complete. That is checked independently
2201
    elsewhere and doesn't interact with this.
2202

2203
    Args:
2204
        ptable (Table): Processing table of all exposures that have been processed.
2205
        arcs (list of dict): list of the individual arc jobs to be used for the psfnight (NOT all
2206
            the arcs, if multiple sets existed). May be empty if none identified yet.
2207
        flats (list of dict): list of the individual flat jobs to be used for the nightlyflat (NOT
2208
            all the flats, if multiple sets existed). May be empty if none identified yet.
2209
        sciences (list of dict): list of the most recent individual prestdstar science exposures
2210
            (if currently processing that tile). May be empty if none identified yet.
2211
        calibjobs (dict): Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
2212
            and 'nightlyflat'. Each key corresponds to a Table.Row or
2213
            None. The table.Row() values are for the corresponding
2214
            calibration job.
2215
        lasttype (str or None): the obstype of the last individual exposure row to be processed.
2216
        internal_id (int): an internal identifier unique to each job. Increments with each new job. This
2217
            is the smallest unassigned value.
2218
        z_submit_types (list of str): The "group" types of redshifts that should be submitted with each
2219
            exposure. If not specified or None, then no redshifts are submitted.
2220
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
2221
            0 which runs the code normally.
2222
            1 writes all files but doesn't submit any jobs to Slurm.
2223
            2 writes tables but doesn't write scripts or submit anything.
2224
            3 Doesn't write or submit anything but queries Slurm normally for job status.
2225
            4 Doesn't write, submit jobs, or query Slurm.
2226
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
2227
        queue (str, optional): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
2228
        reservation (str, optional): The reservation to submit jobs to. If None, it is not submitted to a reservation.
2229
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
2230
            less desirable because e.g. the sciences can run with SVN default calibrations rather
2231
            than failing completely from failed calibrations. Default is False.
2232
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
2233
            data products for the script being submitted. If all files exist and this is True,
2234
            then the script will not be submitted. If some files exist and this is True, only the
2235
            subset of the cameras without the final data products will be generated and submitted.
2236
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
2237
            jobs with some prior data are pruned using PROCCAMWORD to only process the
2238
            remaining cameras not found to exist.
2239
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, permutter-gpu
2240

2241
    Returns:
2242
        tuple: A tuple containing:
2243

2244
        * ptable, Table, Processing table of all exposures that have been processed.
2245
        * calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
2246
          and 'nightlyflat'. Each key corresponds to a Table.Row or
2247
          None. The table.Row() values are for the corresponding
2248
          calibration job.
2249
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
2250
          (if currently processing that tile). May be empty if none identified yet or
2251
          we just submitted them for processing.
2252
        * internal_id, int, if no job is submitted, this is the same as the input, otherwise it is incremented upward from
2253
          from the input such that it represents the smallest unused ID.
2254
    """
2255
    if lasttype == 'science' and len(sciences) > 0:
×
2256
        log = get_logger()
×
2257
        skysubonly = np.array([sci['LASTSTEP'] == 'skysub' for sci in sciences])
×
2258
        if np.all(skysubonly):
×
2259
            log.error("Identified all exposures in joint fitting request as skysub-only. Not submitting")
×
2260
            sciences = []
×
2261
            return ptable, calibjobs, sciences, internal_id
×
2262

2263
        if np.any(skysubonly):
×
2264
            log.error("Identified skysub-only exposures in joint fitting request")
×
2265
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
2266
            log.info("LASTSTEP's: {}".format([row['LASTSTEP'] for row in sciences]))
×
2267
            sciences = (np.array(sciences,dtype=object)[~skysubonly]).tolist()
×
2268
            log.info("Removed skysub only exposures in joint fitting:")
×
2269
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
2270
            log.info("LASTSTEP's: {}".format([row['LASTSTEP'] for row in sciences]))
×
2271

2272
        from collections import Counter
×
2273
        tiles = np.array([sci['TILEID'] for sci in sciences])
×
2274
        counts = Counter(tiles)
×
2275
        if len(counts.most_common()) > 1:
×
2276
            log.error("Identified more than one tile in a joint fitting request")
×
2277
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
2278
            log.info("Tileid's: {}".format(tiles))
×
2279
            log.info("Returning without joint fitting any of these exposures.")
×
2280
            # most_common, nmost_common = counts.most_common()[0]
2281
            # if most_common == -99:
2282
            #     most_common, nmost_common = counts.most_common()[1]
2283
            # log.warning(f"Given multiple tiles to jointly fit: {counts}. "+
2284
            #             "Only processing the most common non-default " +
2285
            #             f"tile: {most_common} with {nmost_common} exposures")
2286
            # sciences = (np.array(sciences,dtype=object)[tiles == most_common]).tolist()
2287
            # log.info("Tiles and exposure id's being submitted for joint fitting:")
2288
            # log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
2289
            # log.info("Tileid's: {}".format([row['TILEID'] for row in sciences]))
2290
            sciences = []
×
2291
            return ptable, calibjobs, sciences, internal_id
×
2292

2293
        ptable, tilejob, internal_id = science_joint_fit(ptable, sciences, internal_id, z_submit_types=z_submit_types,
×
2294
                                                         dry_run=dry_run, queue=queue, reservation=reservation,
2295
                                                         strictly_successful=strictly_successful,
2296
                                                         check_for_outputs=check_for_outputs,
2297
                                                         resubmit_partial_complete=resubmit_partial_complete,
2298
                                                         system_name=system_name)
2299
        if tilejob is not None:
×
2300
            sciences = []
×
2301

2302
    elif lasttype == 'flat' and calibjobs['nightlyflat'] is None and len(flats) == 12:
×
2303
        ## Note here we have an assumption about the number of expected flats being greater than 11
2304
        ptable, calibjobs['nightlyflat'], internal_id \
×
2305
            = flat_joint_fit(ptable, flats, internal_id, dry_run=dry_run, queue=queue,
2306
                             reservation=reservation, strictly_successful=strictly_successful,
2307
                             check_for_outputs=check_for_outputs,
2308
                             resubmit_partial_complete=resubmit_partial_complete,
2309
                             system_name=system_name
2310
                            )
2311

2312
    elif lasttype == 'arc' and calibjobs['psfnight'] is None and len(arcs) == 5:
×
2313
        ## Note here we have an assumption about the number of expected arcs being greater than 4
2314
        ptable, calibjobs['psfnight'], internal_id \
×
2315
            = arc_joint_fit(ptable, arcs, internal_id, dry_run=dry_run, queue=queue,
2316
                            reservation=reservation, strictly_successful=strictly_successful,
2317
                            check_for_outputs=check_for_outputs,
2318
                            resubmit_partial_complete=resubmit_partial_complete,
2319
                            system_name=system_name
2320
                            )
2321
    return ptable, calibjobs, sciences, internal_id
×
2322

2323
def submit_tilenight_and_redshifts(ptable, sciences, calibjobs, internal_id, dry_run=0,
1✔
2324
                                  queue='realtime', reservation=None, strictly_successful=False,
2325
                                  check_for_outputs=True, resubmit_partial_complete=True,
2326
                                  system_name=None,use_specter=False, extra_job_args=None):
2327
    """
2328
    Takes all the state-ful data from daily processing and determines whether a tilenight job needs to be submitted.
2329

2330
    Args:
2331
        ptable (Table): Processing table of all exposures that have been processed.
2332
        sciences (list of dict): list of the most recent individual prestdstar science exposures
2333
            (if currently processing that tile). May be empty if none identified yet.
2334
        internal_id (int): an internal identifier unique to each job. Increments with each new job. This
2335
            is the smallest unassigned value.
2336
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
2337
            0 which runs the code normally.
2338
            1 writes all files but doesn't submit any jobs to Slurm.
2339
            2 writes tables but doesn't write scripts or submit anything.
2340
            3 Doesn't write or submit anything but queries Slurm normally for job status.
2341
            4 Doesn't write, submit jobs, or query Slurm.
2342
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
2343
        queue (str, optional): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
2344
        reservation (str, optional): The reservation to submit jobs to. If None, it is not submitted to a reservation.
2345
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
2346
            less desirable because e.g. the sciences can run with SVN default calibrations rather
2347
            than failing completely from failed calibrations. Default is False.
2348
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
2349
            data products for the script being submitted. If all files exist and this is True,
2350
            then the script will not be submitted. If some files exist and this is True, only the
2351
            subset of the cameras without the final data products will be generated and submitted.
2352
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
2353
            jobs with some prior data are pruned using PROCCAMWORD to only process the
2354
            remaining cameras not found to exist.
2355
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, permutter-gpu
2356
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
2357
        extra_job_args (dict, optional): Dictionary with key-value pairs that specify additional
2358
            information used for a specific type of job. Examples include
2359
            laststeps for tilenight, z_submit_types for redshifts, etc.
2360

2361
    Returns:
2362
        tuple: A tuple containing:
2363

2364
        * ptable, Table, Processing table of all exposures that have been processed.
2365
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
2366
          (if currently processing that tile). May be empty if none identified yet or
2367
          we just submitted them for processing.
2368
        * internal_id, int, if no job is submitted, this is the same as the input, otherwise it is incremented upward from
2369
          from the input such that it represents the smallest unused ID.
2370
    """
2371
    ptable, tnight, internal_id = submit_tilenight(ptable, sciences, calibjobs, internal_id,
1✔
2372
                                             queue=queue, reservation=reservation,
2373
                                             dry_run=dry_run, strictly_successful=strictly_successful,
2374
                                             resubmit_partial_complete=resubmit_partial_complete,
2375
                                             system_name=system_name,use_specter=use_specter,
2376
                                             extra_job_args=extra_job_args)
2377

2378
    z_submit_types = None
1✔
2379
    if 'z_submit_types'  in extra_job_args:
1✔
2380
        z_submit_types = extra_job_args['z_submit_types']
1✔
2381
        
2382
    ptable, internal_id = submit_redshifts(ptable, sciences, tnight, internal_id,
1✔
2383
                                    queue=queue, reservation=reservation,
2384
                                    dry_run=dry_run, strictly_successful=strictly_successful,
2385
                                    check_for_outputs=check_for_outputs,
2386
                                    resubmit_partial_complete=resubmit_partial_complete,
2387
                                    z_submit_types=z_submit_types,
2388
                                    system_name=system_name)
2389

2390
    if tnight is not None:
1✔
2391
        sciences = []
1✔
2392

2393
    return ptable, sciences, internal_id
1✔
2394

2395
def set_calibrator_flag(prows, ptable):
1✔
2396
    """
2397
    Sets the "CALIBRATOR" column of a procesing table row to 1 (integer representation of True)
2398
     for all input rows. Used within joint fitting code to flag the exposures that were input
2399
     to the psfnight or nightlyflat for later reference.
2400

2401
    Args:
2402
        prows, list or array of Table.Rows or dicts. The rows corresponding to the individual exposure jobs that are
2403
            inputs to the joint fit.
2404
        ptable, Table. The processing table where each row is a processed job.
2405

2406
    Returns:
2407
        Table: The same processing table as input except with added rows for the joint fit job and, in the case
2408
        of a stdstarfit, the poststdstar science exposure jobs.
2409
    """
2410
    for prow in prows:
1✔
2411
        ptable['CALIBRATOR'][ptable['INTID'] == prow['INTID']] = 1
1✔
2412
    return ptable
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc