• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

desihub / desispec / 13775799275

10 Mar 2025 10:13PM UTC coverage: 38.994% (-0.006%) from 39.0%
13775799275

push

github

web-flow
Always update dependency entry in table for resubmission

2 of 2 new or added lines in 1 file covered. (100.0%)

3 existing lines in 2 files now uncovered.

12956 of 33226 relevant lines covered (38.99%)

0.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.88
/py/desispec/workflow/processing.py
1
"""
2
desispec.workflow.processing
3
============================
4

5
"""
6
import sys, os, glob
1✔
7
import json
1✔
8
from astropy.io import fits
1✔
9
from astropy.table import Table, join
1✔
10
import numpy as np
1✔
11

12
import time, datetime
1✔
13
from collections import OrderedDict
1✔
14
import subprocess
1✔
15

16
from desispec.scripts.link_calibnight import derive_include_exclude
1✔
17
from desispec.scripts.tile_redshifts import generate_tile_redshift_scripts
1✔
18
from desispec.workflow.redshifts import get_ztile_script_pathname, \
1✔
19
    get_ztile_relpath, \
20
    get_ztile_script_suffix
21
from desispec.workflow.exptable import read_minimal_science_exptab_cols
1✔
22
from desispec.workflow.queue import get_resubmission_states, update_from_queue, \
1✔
23
    queue_info_from_qids, get_queue_states_from_qids, update_queue_state_cache, \
24
    get_non_final_states
25
from desispec.workflow.timing import what_night_is_it
1✔
26
from desispec.workflow.desi_proc_funcs import get_desi_proc_batch_file_pathname, \
1✔
27
    create_desi_proc_batch_script, \
28
    get_desi_proc_batch_file_path, \
29
    get_desi_proc_tilenight_batch_file_pathname, \
30
    create_desi_proc_tilenight_batch_script, create_linkcal_batch_script
31
from desispec.workflow.batch import parse_reservation
1✔
32
from desispec.workflow.utils import pathjoin, sleep_and_report, \
1✔
33
    load_override_file
34
from desispec.workflow.tableio import write_table, load_table
1✔
35
from desispec.workflow.proctable import table_row_to_dict, erow_to_prow, \
1✔
36
    read_minimal_tilenight_proctab_cols, read_minimal_full_proctab_cols, \
37
    update_full_ptab_cache, default_prow, get_default_qid
38
from desiutil.log import get_logger
1✔
39

40
from desispec.io import findfile, specprod_root
1✔
41
from desispec.io.util import decode_camword, create_camword, \
1✔
42
    difference_camwords, \
43
    camword_to_spectros, camword_union, camword_intersection, parse_badamps
44

45

46
#################################################
47
############## Misc Functions ###################
48
#################################################
49
def night_to_starting_iid(night=None):
1✔
50
    """
51
    Creates an internal ID for a given night. The resulting integer is an 8 digit number.
52
    The digits are YYMMDDxxx where YY is the years since 2000, MM and DD are the month and day. xxx are 000,
53
    and are incremented for up to 1000 unique job ID's for a given night.
54

55
    Args:
56
        night (str or int): YYYYMMDD of the night to get the starting internal ID for.
57

58
    Returns:
59
        int: 9 digit number consisting of YYMMDD000. YY is years after 2000, MMDD is month and day.
60
        000 being the starting job number (0).
61
    """
62
    if night is None:
1✔
63
        night = what_night_is_it()
×
64
    night = int(night)
1✔
65
    internal_id = (night - 20000000) * 1000
1✔
66
    return internal_id
1✔
67

68
class ProcessingParams():
1✔
69
    def __init__(self, dry_run_level=0, queue='realtime',
1✔
70
                 reservation=None, strictly_successful=True,
71
                 check_for_outputs=True,
72
                 resubmit_partial_complete=True,
73
                 system_name='perlmutter', use_specter=True):
74

75
        self.dry_run_level = dry_run_level
×
76
        self.system_name = system_name
×
77
        self.queue = queue
×
78
        self.reservation = reservation
×
79
        self.strictly_successful = strictly_successful
×
80
        self.check_for_outputs = check_for_outputs
×
81
        self.resubmit_partial_complete = resubmit_partial_complete
×
82
        self.use_specter = use_specter
×
83

84
#################################################
85
############ Script Functions ###################
86
#################################################
87
def batch_script_name(prow):
1✔
88
    """
89
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, JOBDESC, PROCCAMWORD defined)
90
    and determines the script file pathname as defined by desi_proc's helper functions.
91

92
    Args:
93
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
94

95
    Returns:
96
        str: The complete pathname to the script file, as it is defined within the desi_proc ecosystem.
97
    """
98
    expids = prow['EXPID']
1✔
99
    if len(expids) == 0:
1✔
100
        expids = None
1✔
101
    if prow['JOBDESC'] == 'tilenight':
1✔
102
        pathname = get_desi_proc_tilenight_batch_file_pathname(night = prow['NIGHT'], tileid=prow['TILEID'])
1✔
103
    else:
104
        pathname = get_desi_proc_batch_file_pathname(night = prow['NIGHT'], exp=expids, \
1✔
105
                                             jobdesc=prow['JOBDESC'], cameras=prow['PROCCAMWORD'])
106
    scriptfile =  pathname + '.slurm'
1✔
107
    return scriptfile
1✔
108

109
def get_jobdesc_to_file_map():
1✔
110
    """
111
    Returns a mapping of job descriptions to the filenames of the output files
112

113
    Args:
114
        None
115

116
    Returns:
117
        dict. Dictionary with keys as lowercase job descriptions and to the
118
            filename of their expected outputs.
119

120
    """
121
    return {'prestdstar': 'sframe',
1✔
122
            'stdstarfit': 'stdstars',
123
            'poststdstar': 'cframe',
124
            'nightlybias': 'biasnight',
125
            # 'ccdcalib': 'badcolumns',
126
            'badcol': 'badcolumns',
127
            'arc': 'fitpsf',
128
            'flat': 'fiberflat',
129
            'psfnight': 'psfnight',
130
            'nightlyflat': 'fiberflatnight',
131
            'spectra': 'spectra_tile',
132
            'coadds': 'coadds_tile',
133
            'redshift': 'redrock_tile'}
134

135
def get_file_to_jobdesc_map():
1✔
136
    """
137
    Returns a mapping of output filenames to job descriptions
138

139
    Args:
140
        None
141

142
    Returns:
143
        dict. Dictionary with keys as filename of their expected outputs to
144
            the lowercase job descriptions
145
            .
146

147
    """
148
    job_to_file_map = get_jobdesc_to_file_map()
×
149
    job_to_file_map.pop('badcol') # these files can also be in a ccdcalib job
×
150
    job_to_file_map.pop('nightlybias') # these files can also be in a ccdcalib job
×
151
    return {value: key for key, value in job_to_file_map.items()}
×
152

153
def check_for_outputs_on_disk(prow, resubmit_partial_complete=True):
1✔
154
    """
155
    Args:
156
        prow (Table.Row or dict): Must include keyword accessible definitions for processing_table columns found in
157
            desispect.workflow.proctable.get_processing_table_column_defs()
158
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
159
            jobs with some prior data are pruned using PROCCAMWORD to only process the
160
            remaining cameras not found to exist.
161

162
    Returns:
163
        Table.Row or dict: The same prow type and keywords as input except with modified values updated to reflect
164
        the change in job status after creating and submitting the job for processing.
165
    """
166
    prow['STATUS'] = 'UNKNOWN'
1✔
167
    log = get_logger()
1✔
168

169
    if prow['JOBDESC'] in ['linkcal', 'ccdcalib']:
1✔
170
        log.info(f"jobdesc={prow['JOBDESC']} has indeterminated outputs, so "
1✔
171
                + "not checking for files on disk.")
172
        return prow
1✔
173

174
    job_to_file_map = get_jobdesc_to_file_map()
1✔
175

176
    night = prow['NIGHT']
1✔
177
    if prow['JOBDESC'] in ['cumulative','pernight-v0','pernight','perexp']:
1✔
178
        filetype = 'redrock_tile'
1✔
179
    else:
180
        filetype = job_to_file_map[prow['JOBDESC']]
1✔
181
    orig_camword = prow['PROCCAMWORD']
1✔
182

183
    ## if spectro based, look for spectros, else look for cameras
184
    if prow['JOBDESC'] in ['stdstarfit','spectra','coadds','redshift']:
1✔
185
        ## Spectrograph based
186
        spectros = camword_to_spectros(prow['PROCCAMWORD'])
×
187
        n_desired = len(spectros)
×
188
        ## Suppress outputs about using tile based files in findfile if only looking for stdstarfits
189
        if prow['JOBDESC'] == 'stdstarfit':
×
190
            tileid = None
×
191
        else:
192
            tileid = prow['TILEID']
×
193
        expid = prow['EXPID'][0]
×
194
        existing_spectros = []
×
195
        for spectro in spectros:
×
196
            if os.path.exists(findfile(filetype=filetype, night=night, expid=expid, spectrograph=spectro, tile=tileid)):
×
197
                existing_spectros.append(spectro)
×
198
        completed = (len(existing_spectros) == n_desired)
×
199
        if not completed and resubmit_partial_complete and len(existing_spectros) > 0:
×
200
            existing_camword = 'a' + ''.join([str(spec) for spec in sorted(existing_spectros)])
×
201
            prow['PROCCAMWORD'] = difference_camwords(prow['PROCCAMWORD'],existing_camword)
×
202
    elif prow['JOBDESC'] in ['cumulative','pernight-v0','pernight','perexp']:
1✔
203
        ## Spectrograph based
204
        spectros = camword_to_spectros(prow['PROCCAMWORD'])
1✔
205
        n_desired = len(spectros)
1✔
206
        ## Suppress outputs about using tile based files in findfile if only looking for stdstarfits
207
        tileid = prow['TILEID']
1✔
208
        expid = prow['EXPID'][0]
1✔
209
        redux_dir = specprod_root()
1✔
210
        outdir = os.path.join(redux_dir,get_ztile_relpath(tileid,group=prow['JOBDESC'],night=night,expid=expid))
1✔
211
        suffix = get_ztile_script_suffix(tileid, group=prow['JOBDESC'], night=night, expid=expid)
1✔
212
        existing_spectros = []
1✔
213
        for spectro in spectros:
1✔
214
            if os.path.exists(os.path.join(outdir, f"redrock-{spectro}-{suffix}.fits")):
1✔
215
                existing_spectros.append(spectro)
×
216
        completed = (len(existing_spectros) == n_desired)
1✔
217
        if not completed and resubmit_partial_complete and len(existing_spectros) > 0:
1✔
218
            existing_camword = 'a' + ''.join([str(spec) for spec in sorted(existing_spectros)])
×
219
            prow['PROCCAMWORD'] = difference_camwords(prow['PROCCAMWORD'],existing_camword)
×
220
    else:
221
        ## Otheriwse camera based
222
        cameras = decode_camword(prow['PROCCAMWORD'])
1✔
223
        n_desired = len(cameras)
1✔
224
        if len(prow['EXPID']) > 0:
1✔
225
            expid = prow['EXPID'][0]
1✔
226
        else:
227
            expid = None
1✔
228
        if len(prow['EXPID']) > 1 and prow['JOBDESC'] not in ['psfnight','nightlyflat']:
1✔
229
            log.warning(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']}. This job type only makes " +
×
230
                     f"sense with a single exposure. Proceeding with {expid}.")
231
        missing_cameras = []
1✔
232
        for cam in cameras:
1✔
233
            if not os.path.exists(findfile(filetype=filetype, night=night, expid=expid, camera=cam)):
1✔
234
                missing_cameras.append(cam)
1✔
235
        completed = (len(missing_cameras) == 0)
1✔
236
        if not completed and resubmit_partial_complete and len(missing_cameras) < n_desired:
1✔
237
            prow['PROCCAMWORD'] = create_camword(missing_cameras)
×
238

239
    if completed:
1✔
240
        prow['STATUS'] = 'COMPLETED'
×
241
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} already has " +
×
242
                 f"the desired {n_desired} {filetype}'s. Not submitting this job.")
243
    elif resubmit_partial_complete and orig_camword != prow['PROCCAMWORD']:
1✔
244
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} already has " +
×
245
                 f"some {filetype}'s. Submitting smaller camword={prow['PROCCAMWORD']}.")
246
    elif not resubmit_partial_complete:
1✔
247
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} doesn't have all " +
×
248
                 f"{filetype}'s and resubmit_partial_complete=False. "+
249
                 f"Submitting full camword={prow['PROCCAMWORD']}.")
250
    else:
251
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} has no " +
1✔
252
                 f"existing {filetype}'s. Submitting full camword={prow['PROCCAMWORD']}.")
253
    return prow
1✔
254

255
def create_and_submit(prow, queue='realtime', reservation=None, dry_run=0,
1✔
256
                      joint=False, strictly_successful=False,
257
                      check_for_outputs=True, resubmit_partial_complete=True,
258
                      system_name=None, use_specter=False,
259
                      extra_job_args=None):
260
    """
261
    Wrapper script that takes a processing table row and three modifier keywords, creates a submission script for the
262
    compute nodes, and then submits that script to the Slurm scheduler with appropriate dependencies.
263

264
    Args:
265
        prow (Table.Row or dict): Must include keyword accessible definitions for processing_table columns found in
266
            desispect.workflow.proctable.get_processing_table_column_defs()
267
        queue (str, optional): The name of the NERSC Slurm queue to submit to. Default is the realtime queue.
268
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
269
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
270
            0 which runs the code normally.
271
            1 writes all files but doesn't submit any jobs to Slurm.
272
            2 writes tables but doesn't write scripts or submit anything.
273
            3 Doesn't write or submit anything but queries Slurm normally for job status.
274
            4 Doesn't write, submit jobs, or query Slurm.
275
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
276
        joint (bool, optional): Whether this is a joint fitting job (the job involves multiple exposures) and therefore needs to be
277
            run with desi_proc_joint_fit. Default is False.
278
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
279
            less desirable because e.g. the sciences can run with SVN default calibrations rather
280
            than failing completely from failed calibrations. Default is False.
281
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
282
            data products for the script being submitted. If all files exist and this is True,
283
            then the script will not be submitted. If some files exist and this is True, only the
284
            subset of the cameras without the final data products will be generated and submitted.
285
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
286
            jobs with some prior data are pruned using PROCCAMWORD to only process the
287
            remaining cameras not found to exist.
288
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
289
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
290
        extra_job_args (dict): Dictionary with key-value pairs that specify additional
291
            information used for a specific type of job. Examples include refnight
292
            and include/exclude lists for linkcals, laststeps for for tilenight, etc.
293

294
    Returns:
295
        Table.Row or dict: The same prow type and keywords as input except with modified values updated to reflect
296
        the change in job status after creating and submitting the job for processing.
297

298
    Note:
299
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
300
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
301
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
302
    """
303
    orig_prow = prow.copy()
1✔
304
    if check_for_outputs:
1✔
305
        prow = check_for_outputs_on_disk(prow, resubmit_partial_complete)
1✔
306
        if prow['STATUS'].upper() == 'COMPLETED':
1✔
307
            return prow
×
308

309
    prow = create_batch_script(prow, queue=queue, dry_run=dry_run, joint=joint,
1✔
310
                               system_name=system_name, use_specter=use_specter,
311
                               extra_job_args=extra_job_args)
312
    prow = submit_batch_script(prow, reservation=reservation, dry_run=dry_run,
1✔
313
                               strictly_successful=strictly_successful)
314

315
    ## If resubmitted partial, the PROCCAMWORD and SCRIPTNAME will correspond
316
    ## to the pruned values. But we want to
317
    ## retain the full job's value, so get those from the old job.
318
    if resubmit_partial_complete:
1✔
319
        prow['PROCCAMWORD'] = orig_prow['PROCCAMWORD']
1✔
320
        prow['SCRIPTNAME'] = orig_prow['SCRIPTNAME']
1✔
321
    return prow
1✔
322

323
def desi_link_calibnight_command(prow, refnight, include=None):
1✔
324
    """
325
    Wrapper script that takes a processing table row (or dictionary with
326
    REFNIGHT, NIGHT, PROCCAMWORD defined) and determines the proper command
327
    line call to link data defined by the input row/dict.
328

329
    Args:
330
        prow (Table.Row or dict): Must include keyword accessible definitions
331
            for 'NIGHT', 'REFNIGHT', and 'PROCCAMWORD'.
332
        refnight (str or int): The night with a valid set of calibrations
333
            be created.
334
        include (list): The filetypes to include in the linking.
335
    Returns:
336
        str: The proper command to be submitted to desi_link_calibnight
337
            to process the job defined by the prow values.
338
    """
339
    cmd = 'desi_link_calibnight'
1✔
340
    cmd += f' --refnight={refnight}'
1✔
341
    cmd += f' --newnight={prow["NIGHT"]}'
1✔
342
    cmd += f' --cameras={prow["PROCCAMWORD"]}'
1✔
343
    if include is not None:
1✔
344
        cmd += f' --include=' + ','.join(list(include))
1✔
345
    return cmd
1✔
346

347
def desi_proc_command(prow, system_name, use_specter=False, queue=None):
1✔
348
    """
349
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, OBSTYPE, JOBDESC, PROCCAMWORD defined)
350
    and determines the proper command line call to process the data defined by the input row/dict.
351

352
    Args:
353
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
354
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, perlmutter-gpu
355
        queue (str, optional): The name of the NERSC Slurm queue to submit to. Default is None (which leaves it to the desi_proc default).
356
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
357

358
    Returns:
359
        str: The proper command to be submitted to desi_proc to process the job defined by the prow values.
360
    """
361
    cmd = 'desi_proc'
1✔
362
    cmd += ' --batch'
1✔
363
    cmd += ' --nosubmit'
1✔
364
    if queue is not None:
1✔
365
        cmd += f' -q {queue}'
1✔
366
    if prow['OBSTYPE'].lower() == 'science':
1✔
367
        if prow['JOBDESC'] == 'prestdstar':
×
368
            cmd += ' --nostdstarfit --nofluxcalib'
×
369
        elif prow['JOBDESC'] == 'poststdstar':
×
370
            cmd += ' --noprestdstarfit --nostdstarfit'
×
371

372
        if use_specter:
×
373
            cmd += ' --use-specter'
×
374
    elif prow['JOBDESC'] in ['flat', 'prestdstar'] and use_specter:
1✔
375
        cmd += ' --use-specter'
×
376
    pcamw = str(prow['PROCCAMWORD'])
1✔
377
    cmd += f" --cameras={pcamw} -n {prow['NIGHT']}"
1✔
378
    if len(prow['EXPID']) > 0:
1✔
379
        ## If ccdcalib job without a dark exposure, don't assign the flat expid
380
        ## since it would incorrectly process the flat using desi_proc
381
        if prow['OBSTYPE'].lower() != 'flat' or prow['JOBDESC'] != 'ccdcalib':
1✔
382
            cmd += f" -e {prow['EXPID'][0]}"
1✔
383
    if prow['BADAMPS'] != '':
1✔
384
        cmd += ' --badamps={}'.format(prow['BADAMPS'])
×
385
    return cmd
1✔
386

387
def desi_proc_joint_fit_command(prow, queue=None):
1✔
388
    """
389
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, OBSTYPE, PROCCAMWORD defined)
390
    and determines the proper command line call to process the data defined by the input row/dict.
391

392
    Args:
393
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
394
        queue (str): The name of the NERSC Slurm queue to submit to. Default is None (which leaves it to the desi_proc default).
395

396
    Returns:
397
        str: The proper command to be submitted to desi_proc_joint_fit
398
            to process the job defined by the prow values.
399
    """
400
    cmd = 'desi_proc_joint_fit'
1✔
401
    cmd += ' --batch'
1✔
402
    cmd += ' --nosubmit'
1✔
403
    if queue is not None:
1✔
404
        cmd += f' -q {queue}'
1✔
405

406
    descriptor = prow['OBSTYPE'].lower()
1✔
407

408
    night = prow['NIGHT']
1✔
409
    specs = str(prow['PROCCAMWORD'])
1✔
410
    expid_str = ','.join([str(eid) for eid in prow['EXPID']])
1✔
411

412
    cmd += f' --obstype {descriptor}'
1✔
413
    cmd += f' --cameras={specs} -n {night}'
1✔
414
    if len(expid_str) > 0:
1✔
415
        cmd += f' -e {expid_str}'
1✔
416
    return cmd
1✔
417

418

419
def create_batch_script(prow, queue='realtime', dry_run=0, joint=False,
1✔
420
                        system_name=None, use_specter=False, extra_job_args=None):
421
    """
422
    Wrapper script that takes a processing table row and three modifier keywords and creates a submission script for the
423
    compute nodes.
424

425
    Args:
426
        prow, Table.Row or dict. Must include keyword accessible definitions for processing_table columns found in
427
            desispect.workflow.proctable.get_processing_table_column_defs()
428
        queue, str. The name of the NERSC Slurm queue to submit to. Default is the realtime queue.
429
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
430
            0 which runs the code normally.
431
            1 writes all files but doesn't submit any jobs to Slurm.
432
            2 writes tables but doesn't write scripts or submit anything.
433
            3 Doesn't write or submit anything but queries Slurm normally for job status.
434
            4 Doesn't write, submit jobs, or query Slurm.
435
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
436
        joint, bool. Whether this is a joint fitting job (the job involves multiple exposures) and therefore needs to be
437
            run with desi_proc_joint_fit when not using tilenight. Default is False.
438
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
439
        use_specter, bool, optional. Default is False. If True, use specter, otherwise use gpu_specter by default.
440
        extra_job_args (dict): Dictionary with key-value pairs that specify additional
441
            information used for a specific type of job. Examples include refnight
442
            and include/exclude lists for linkcal, laststeps for tilenight, etc.
443

444
    Returns:
445
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
446
        scriptname.
447

448
    Note:
449
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
450
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
451
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
452
    """
453
    log = get_logger()
1✔
454

455
    if extra_job_args is None:
1✔
456
        extra_job_args = {}
1✔
457

458
    if prow['JOBDESC'] in ['perexp','pernight','pernight-v0','cumulative']:
1✔
459
        if dry_run > 1:
1✔
460
            scriptpathname = get_ztile_script_pathname(tileid=prow['TILEID'],group=prow['JOBDESC'],
1✔
461
                                                               night=prow['NIGHT'], expid=prow['EXPID'][0])
462

463
            log.info("Output file would have been: {}".format(scriptpathname))
1✔
464
        else:
465
            #- run zmtl for cumulative redshifts but not others
466
            run_zmtl = (prow['JOBDESC'] == 'cumulative')
1✔
467
            no_afterburners = False
1✔
468
            print(f"entering tileredshiftscript: {prow}")
1✔
469
            scripts, failed_scripts = generate_tile_redshift_scripts(tileid=prow['TILEID'], group=prow['JOBDESC'],
1✔
470
                                                                     nights=[prow['NIGHT']], expids=prow['EXPID'],
471
                                                                     batch_queue=queue, system_name=system_name,
472
                                                                     run_zmtl=run_zmtl,
473
                                                                     no_afterburners=no_afterburners,
474
                                                                     nosubmit=True)
475
            if len(failed_scripts) > 0:
1✔
476
                log.error(f"Redshifts failed for group={prow['JOBDESC']}, night={prow['NIGHT']}, "+
×
477
                          f"tileid={prow['TILEID']}, expid={prow['EXPID']}.")
478
                log.info(f"Returned failed scriptname is {failed_scripts}")
×
479
            elif len(scripts) > 1:
1✔
480
                log.error(f"More than one redshifts returned for group={prow['JOBDESC']}, night={prow['NIGHT']}, "+
×
481
                          f"tileid={prow['TILEID']}, expid={prow['EXPID']}.")
482
                log.info(f"Returned scriptnames were {scripts}")
×
483
            elif len(scripts) == 0:
1✔
484
                msg = f'No scripts were generated for {prow=}'
×
485
                log.critical(prow)
×
486
                raise ValueError(msg)
×
487
            else:
488
                scriptpathname = scripts[0]
1✔
489

490
    elif prow['JOBDESC'] == 'linkcal':
1✔
491
        refnight, include, exclude = -99, None, None
1✔
492
        if 'refnight' in extra_job_args:
1✔
493
            refnight = extra_job_args['refnight']
1✔
494
        if 'include' in extra_job_args:
1✔
495
            include = extra_job_args['include']
1✔
496
        if 'exclude' in extra_job_args:
1✔
497
            exclude = extra_job_args['exclude']
×
498
        include, exclude = derive_include_exclude(include, exclude)
1✔
499
        ## Fiberflatnights shouldn't to be generated with psfs from same time, so
500
        ## shouldn't link psfs without also linking fiberflatnight
501
        ## However, this should be checked at a higher level. If set here,
502
        ## go ahead and do it
503
        # if 'psfnight' in include and not 'fiberflatnight' in include:
504
        #     err = "Must link fiberflatnight if linking psfnight"
505
        #     log.error(err)
506
        #     raise ValueError(err)
507
        if dry_run > 1:
1✔
508
            scriptpathname = batch_script_name(prow)
1✔
509
            log.info("Output file would have been: {}".format(scriptpathname))
1✔
510
            cmd = desi_link_calibnight_command(prow, refnight, include)
1✔
511
            log.info("Command to be run: {}".format(cmd.split()))
1✔
512
        else:
513
            if refnight == -99:
×
514
                err = f'For {prow=} asked to link calibration but not given' \
×
515
                      + ' a valid refnight'
516
                log.error(err)
×
517
                raise ValueError(err)
×
518

519
            cmd = desi_link_calibnight_command(prow, refnight, include)
×
520
            log.info(f"Running: {cmd.split()}")
×
521
            scriptpathname = create_linkcal_batch_script(newnight=prow['NIGHT'],
×
522
                                                        cameras=prow['PROCCAMWORD'],
523
                                                        queue=queue,
524
                                                        cmd=cmd,
525
                                                        system_name=system_name)
526
    else:
527
        if prow['JOBDESC'] != 'tilenight':
1✔
528
            nightlybias, nightlycte, cte_expids = False, False, None
1✔
529
            if 'nightlybias' in extra_job_args:
1✔
530
                nightlybias = extra_job_args['nightlybias']
1✔
531
            elif prow['JOBDESC'].lower() == 'nightlybias':
1✔
532
                nightlybias = True
1✔
533
            if 'nightlycte' in extra_job_args:
1✔
534
                nightlycte = extra_job_args['nightlycte']
1✔
535
            if 'cte_expids' in extra_job_args:
1✔
536
                cte_expids = extra_job_args['cte_expids']
1✔
537
            ## run known joint jobs as joint even if unspecified
538
            ## in the future we can eliminate the need for "joint"
539
            if joint or prow['JOBDESC'].lower() in ['psfnight', 'nightlyflat']:
1✔
540
                cmd = desi_proc_joint_fit_command(prow, queue=queue)
1✔
541
                ## For consistency with how we edit the other commands, do them
542
                ## here, but future TODO would be to move these into the command
543
                ## generation itself
544
                if 'extra_cmd_args' in extra_job_args:
1✔
545
                    cmd += ' ' + ' '.join(np.atleast_1d(extra_job_args['extra_cmd_args']))
1✔
546
            else:
547
                cmd = desi_proc_command(prow, system_name, use_specter, queue=queue)
1✔
548
                if nightlybias:
1✔
549
                    cmd += ' --nightlybias'
1✔
550
                if nightlycte:
1✔
551
                    cmd += ' --nightlycte'
1✔
552
                    if cte_expids is not None:
1✔
553
                        cmd += ' --cte-expids '
1✔
554
                        cmd += ','.join(np.atleast_1d(cte_expids).astype(str))
1✔
555
        if dry_run > 1:
1✔
556
            scriptpathname = batch_script_name(prow)
1✔
557
            log.info("Output file would have been: {}".format(scriptpathname))
1✔
558
            if prow['JOBDESC'] != 'tilenight':
1✔
559
                log.info("Command to be run: {}".format(cmd.split()))
1✔
560
        else:
561
            expids = prow['EXPID']
1✔
562
            if len(expids) == 0:
1✔
563
                expids = None
×
564

565
            if prow['JOBDESC'] == 'tilenight':
1✔
566
                log.info("Creating tilenight script for tile {}".format(prow['TILEID']))
1✔
567
                if 'laststeps' in extra_job_args:
1✔
568
                    laststeps = extra_job_args['laststeps']
1✔
569
                else:
570
                    err = f'{prow=} job did not specify last steps to tilenight'
×
571
                    log.error(err)
×
572
                    raise ValueError(err)
×
573
                ncameras = len(decode_camword(prow['PROCCAMWORD']))
1✔
574
                scriptpathname = create_desi_proc_tilenight_batch_script(
1✔
575
                                                               night=prow['NIGHT'], exp=expids,
576
                                                               tileid=prow['TILEID'],
577
                                                               ncameras=ncameras,
578
                                                               queue=queue,
579
                                                               mpistdstars=True,
580
                                                               use_specter=use_specter,
581
                                                               system_name=system_name,
582
                                                               laststeps=laststeps)
583
            else:
584
                if expids is not None and len(expids) > 1:
1✔
585
                    expids = expids[:1]
1✔
586
                log.info("Running: {}".format(cmd.split()))
1✔
587
                scriptpathname = create_desi_proc_batch_script(night=prow['NIGHT'], exp=expids,
1✔
588
                                                               cameras=prow['PROCCAMWORD'],
589
                                                               jobdesc=prow['JOBDESC'],
590
                                                               queue=queue, cmdline=cmd,
591
                                                               use_specter=use_specter,
592
                                                               system_name=system_name,
593
                                                               nightlybias=nightlybias,
594
                                                               nightlycte=nightlycte,
595
                                                               cte_expids=cte_expids)
596
    log.info("Outfile is: {}".format(scriptpathname))
1✔
597
    prow['SCRIPTNAME'] = os.path.basename(scriptpathname)
1✔
598
    return prow
1✔
599

600
_fake_qid = int(time.time() - 1.7e9)
1✔
601
def _get_fake_qid():
1✔
602
    """
603
    Return fake slurm queue jobid to use for dry-run testing
604
    """
605
    # Note: not implemented as a yield generator so that this returns a
606
    # genuine int, not a generator object
607
    global _fake_qid
608
    _fake_qid += 1
1✔
609
    return _fake_qid
1✔
610

611
def submit_batch_script(prow, dry_run=0, reservation=None, strictly_successful=False):
1✔
612
    """
613
    Wrapper script that takes a processing table row and three modifier keywords and submits the scripts to the Slurm
614
    scheduler.
615

616
    Args:
617
        prow, Table.Row or dict. Must include keyword accessible definitions for processing_table columns found in
618
            desispect.workflow.proctable.get_processing_table_column_defs()
619
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
620
            0 which runs the code normally.
621
            1 writes all files but doesn't submit any jobs to Slurm.
622
            2 writes tables but doesn't write scripts or submit anything.
623
            3 Doesn't write or submit anything but queries Slurm normally for job status.
624
            4 Doesn't write, submit jobs, or query Slurm.
625
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
626
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
627
        strictly_successful, bool. Whether all jobs require all inputs to have succeeded. For daily processing, this is
628
            less desirable because e.g. the sciences can run with SVN default calibrations rather
629
            than failing completely from failed calibrations. Default is False.
630

631
    Returns:
632
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
633
        scriptname.
634

635
    Note:
636
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
637
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
638
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
639
    """
640
    log = get_logger()
1✔
641
    dep_qids = prow['LATEST_DEP_QID']
1✔
642
    dep_list, dep_str = '', ''
1✔
643

644
    ## With desi_proc_night we now either resubmit failed jobs or exit, so this
645
    ## should no longer be necessary in the normal workflow.
646
    # workaround for sbatch --dependency bug not tracking jobs correctly
647
    # see NERSC TICKET INC0203024
648
    failed_dependency = False
1✔
649
    if len(dep_qids) > 0 and not dry_run:
1✔
650
        non_final_states = get_non_final_states()
×
651
        state_dict = get_queue_states_from_qids(dep_qids, dry_run_level=dry_run, use_cache=True)
×
652
        still_depids = []
×
653
        for depid in dep_qids:
×
654
            if depid in state_dict.keys():
×
655
                if state_dict[int(depid)] == 'COMPLETED':
×
656
                   log.info(f"removing completed jobid {depid}")
×
657
                elif state_dict[int(depid)] not in non_final_states:
×
658
                    failed_dependency = True
×
659
                    log.info("Found a dependency in a bad final state="
×
660
                             + f"{state_dict[int(depid)]} for depjobid={depid},"
661
                             + " not submitting this job.")
662
                    still_depids.append(depid)
×
663
                else:
664
                    still_depids.append(depid)
×
665
            else:
666
                still_depids.append(depid)
×
667
        dep_qids = np.array(still_depids)
×
668

669
    if len(dep_qids) > 0:
1✔
670
        jobtype = prow['JOBDESC']
1✔
671
        if strictly_successful:
1✔
672
            depcond = 'afterok'
1✔
673
        elif jobtype in ['arc', 'psfnight', 'prestdstar', 'stdstarfit']:
×
674
            ## (though psfnight and stdstarfit will require some inputs otherwise they'll go up in flames)
675
            depcond = 'afterany'
×
676
        else:
677
            ## if 'flat','nightlyflat','poststdstar', or any type of redshift, require strict success of inputs
678
            depcond = 'afterok'
×
679

680
        dep_str = f'--dependency={depcond}:'
1✔
681

682
        if np.isscalar(dep_qids):
1✔
683
            dep_list = str(dep_qids).strip(' \t')
×
684
            if dep_list == '':
×
685
                dep_str = ''
×
686
            else:
687
                dep_str += dep_list
×
688
        else:
689
            if len(dep_qids)>1:
1✔
690
                dep_list = ':'.join(np.array(dep_qids).astype(str))
1✔
691
                dep_str += dep_list
1✔
692
            elif len(dep_qids) == 1 and dep_qids[0] not in [None, 0]:
1✔
693
                dep_str += str(dep_qids[0])
1✔
694
            else:
695
                dep_str = ''
×
696

697
    # script = f'{jobname}.slurm'
698
    # script_path = pathjoin(batchdir, script)
699
    if prow['JOBDESC'] in ['pernight-v0','pernight','perexp','cumulative']:
1✔
700
        script_path = get_ztile_script_pathname(tileid=prow['TILEID'],group=prow['JOBDESC'],
1✔
701
                                                        night=prow['NIGHT'], expid=np.min(prow['EXPID']))
702
        jobname = os.path.basename(script_path)
1✔
703
    else:
704
        batchdir = get_desi_proc_batch_file_path(night=prow['NIGHT'])
1✔
705
        jobname = batch_script_name(prow)
1✔
706
        script_path = pathjoin(batchdir, jobname)
1✔
707

708
    batch_params = ['sbatch', '--parsable']
1✔
709
    if dep_str != '':
1✔
710
        batch_params.append(f'{dep_str}')
1✔
711

712
    reservation = parse_reservation(reservation, prow['JOBDESC'])
1✔
713
    if reservation is not None:
1✔
714
        batch_params.append(f'--reservation={reservation}')
×
715

716
    batch_params.append(f'{script_path}')
1✔
717
    submitted = True
1✔
718
    ## If dry_run give it a fake QID
719
    ## if a dependency has failed don't even try to submit the job because
720
    ## Slurm will refuse, instead just mark as unsubmitted.
721
    if dry_run:
1✔
722
        current_qid = _get_fake_qid()
1✔
723
    elif not failed_dependency:
×
724
        #- sbatch sometimes fails; try several times before giving up
725
        max_attempts = 3
×
726
        for attempt in range(max_attempts):
×
727
            try:
×
728
                current_qid = subprocess.check_output(batch_params, stderr=subprocess.STDOUT, text=True)
×
729
                current_qid = int(current_qid.strip(' \t\n'))
×
730
                break
×
731
            except subprocess.CalledProcessError as err:
×
732
                log.error(f'{jobname} submission failure at {datetime.datetime.now()}')
×
733
                log.error(f'{jobname}   {batch_params}')
×
734
                log.error(f'{jobname}   {err.output=}')
×
735
                if attempt < max_attempts - 1:
×
736
                    log.info('Sleeping 60 seconds then retrying')
×
737
                    time.sleep(60)
×
738
        else:  #- for/else happens if loop doesn't succeed
739
            msg = f'{jobname} submission failed {max_attempts} times.' \
×
740
                  + ' setting as unsubmitted and moving on'
741
            log.error(msg)
×
742
            current_qid = get_default_qid()
×
743
            submitted = False
×
744
    else:
745
        current_qid = get_default_qid()
×
746
        submitted = False
×
747

748
    ## Update prow with new information
749
    prow['LATEST_QID'] = current_qid
1✔
750

751
    ## If we didn't submit, don't say we did and don't add to ALL_QIDS
752
    if submitted:
1✔
753
        log.info(batch_params)
1✔
754
        log.info(f'Submitted {jobname} with dependencies {dep_str} and '
1✔
755
                 + f'reservation={reservation}. Returned qid: {current_qid}')
756

757
        ## Update prow with new information
758
        prow['ALL_QIDS'] = np.append(prow['ALL_QIDS'],current_qid)
1✔
759
        prow['STATUS'] = 'SUBMITTED'
1✔
760
        prow['SUBMIT_DATE'] = int(time.time())
1✔
761
    else:
762
        log.info(f"Would have submitted: {batch_params}")
×
763
        prow['STATUS'] = 'UNSUBMITTED'
×
764

765
        ## Update the Slurm jobid cache of job states
766
        update_queue_state_cache(qid=prow['LATEST_QID'], state=prow['STATUS'])
×
767

768
    return prow
1✔
769

770

771
#############################################
772
##########   Row Manipulations   ############
773
#############################################
774
def define_and_assign_dependency(prow, calibjobs, use_tilenight=False,
1✔
775
                                 refnight=None):
776
    """
777
    Given input processing row and possible calibjobs, this defines the
778
    JOBDESC keyword and assigns the dependency appropriate for the job type of
779
    prow.
780

781
    Args:
782
        prow, Table.Row or dict. Must include keyword accessible definitions for
783
            'OBSTYPE'. A row must have column names for
784
            'JOBDESC', 'INT_DEP_IDS', and 'LATEST_DEP_ID'.
785
        calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
786
            and 'nightlyflat'. Each key corresponds to a Table.Row or
787
            None. The table.Row() values are for the corresponding
788
            calibration job. Each value that isn't None must contain
789
            'INTID', and 'LATEST_QID'. If None, it assumes the
790
            dependency doesn't exist and no dependency is assigned.
791
        use_tilenight, bool. Default is False. If True, use desi_proc_tilenight
792
            for prestdstar, stdstar,and poststdstar steps for
793
            science exposures.
794
        refnight, int. The reference night for linking jobs
795

796
    Returns:
797
        Table.Row or dict: The same prow type and keywords as input except
798
        with modified values updated values for 'JOBDESC', 'INT_DEP_IDS'. and 'LATEST_DEP_ID'.
799

800
    Note:
801
        This modifies the input. Though Table.Row objects are generally copied
802
        on modification, so the change to the input object in memory may or may
803
        not be changed. As of writing, a row from a table given to this function
804
        will not change during the execution of this function (but can be
805
        overwritten explicitly with the returned row if desired).
806
    """
807
    if prow['OBSTYPE'] in ['science', 'twiflat']:
1✔
808
        if calibjobs['nightlyflat'] is not None:
1✔
809
            dependency = calibjobs['nightlyflat']
1✔
810
        elif calibjobs['psfnight'] is not None:
1✔
811
            dependency = calibjobs['psfnight']
1✔
812
        elif calibjobs['ccdcalib'] is not None:
1✔
813
            dependency = calibjobs['ccdcalib']
1✔
814
        elif calibjobs['nightlybias'] is not None:
1✔
815
            dependency = calibjobs['nightlybias']
×
816
        elif calibjobs['badcol'] is not None:
1✔
817
            dependency = calibjobs['badcol']
×
818
        else:
819
            dependency = calibjobs['linkcal']
1✔
820
        if not use_tilenight:
1✔
821
            prow['JOBDESC'] = 'prestdstar'
1✔
822
    elif prow['OBSTYPE'] == 'flat':
1✔
823
        if calibjobs['psfnight'] is not None:
1✔
824
            dependency = calibjobs['psfnight']
1✔
825
        elif calibjobs['ccdcalib'] is not None:
1✔
826
            dependency = calibjobs['ccdcalib']
1✔
827
        elif calibjobs['nightlybias'] is not None:
1✔
828
            dependency = calibjobs['nightlybias']
×
829
        elif calibjobs['badcol'] is not None:
1✔
830
            dependency = calibjobs['badcol']
×
831
        else:
832
            dependency = calibjobs['linkcal']
1✔
833
    elif prow['OBSTYPE'] == 'arc':
1✔
834
        if calibjobs['ccdcalib'] is not None:
1✔
835
            dependency = calibjobs['ccdcalib']
1✔
836
        elif calibjobs['nightlybias'] is not None:
1✔
837
            dependency = calibjobs['nightlybias']
1✔
838
        elif calibjobs['badcol'] is not None:
1✔
839
            dependency = calibjobs['badcol']
×
840
        else:
841
            dependency = calibjobs['linkcal']
1✔
842
    elif prow['JOBDESC'] in ['badcol', 'nightlybias', 'ccdcalib']:
1✔
843
        dependency = calibjobs['linkcal']
1✔
844
    elif prow['OBSTYPE'] == 'dark':
1✔
845
        if calibjobs['ccdcalib'] is not None:
×
846
            dependency = calibjobs['ccdcalib']
×
847
        elif calibjobs['nightlybias'] is not None:
×
848
            dependency = calibjobs['nightlybias']
×
849
        elif calibjobs['badcol'] is not None:
×
850
            dependency = calibjobs['badcol']
×
851
        else:
852
            dependency = calibjobs['linkcal']
×
853
    elif prow['JOBDESC'] == 'linkcal' and refnight is not None:
1✔
854
        dependency = None
1✔
855
        ## For link cals only, enable cross-night dependencies if available
856
        refproctable = findfile('proctable', night=refnight)
1✔
857
        if os.path.exists(refproctable):
1✔
858
            ptab = load_table(tablename=refproctable, tabletype='proctable')
×
859
            ## This isn't perfect because we may depend on jobs that aren't
860
            ## actually being linked
861
            ## Also allows us to proceed even if jobs don't exist yet
862
            deps = []
×
863
            for job in ['nightlybias', 'ccdcalib', 'psfnight', 'nightlyflat']:
×
864
                if job in ptab['JOBDESC']:
×
865
                    ## add prow to dependencies
866
                    deps.append(ptab[ptab['JOBDESC']==job][0])
×
867
            if len(deps) > 0:
×
868
                dependency = deps
×
869
    else:
870
        dependency = None
×
871

872
    prow = assign_dependency(prow, dependency)
1✔
873

874
    return prow
1✔
875

876

877
def assign_dependency(prow, dependency):
1✔
878
    """
879
    Given input processing row and possible arcjob (processing row for psfnight) and flatjob (processing row for
880
    nightlyflat), this defines the JOBDESC keyword and assigns the dependency appropriate for the job type of prow.
881

882
    Args:
883
        prow, Table.Row or dict. Must include keyword accessible definitions for 'OBSTYPE'. A row must have column names for
884
            'JOBDESC', 'INT_DEP_IDS', and 'LATEST_DEP_ID'.
885
        dependency, NoneType or scalar/list/array of Table.Row, dict. Processing row corresponding to the required input
886
            for the job in prow. This must contain keyword
887
            accessible values for 'INTID', and 'LATEST_QID'.
888
            If None, it assumes the dependency doesn't exist
889
            and no dependency is assigned.
890

891
    Returns:
892
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
893
        'JOBDESC', 'INT_DEP_IDS'. and 'LATEST_DEP_ID'.
894

895
    Note:
896
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
897
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
898
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
899
    """
900
    prow['INT_DEP_IDS'] = np.ndarray(shape=0).astype(int)
1✔
901
    prow['LATEST_DEP_QID'] = np.ndarray(shape=0).astype(int)
1✔
902
    if dependency is not None:
1✔
903
        if type(dependency) in [list, np.array]:
1✔
904
            ids, qids = [], []
1✔
905
            for curdep in dependency:
1✔
906
                ids.append(curdep['INTID'])
1✔
907
                if still_a_dependency(curdep):
1✔
908
                    # ids.append(curdep['INTID'])
909
                    qids.append(curdep['LATEST_QID'])
1✔
910
            prow['INT_DEP_IDS'] = np.array(ids, dtype=int)
1✔
911
            prow['LATEST_DEP_QID'] = np.array(qids, dtype=int)
1✔
912
        elif type(dependency) in [dict, OrderedDict, Table.Row]:
1✔
913
            prow['INT_DEP_IDS'] = np.array([dependency['INTID']], dtype=int)
1✔
914
            if still_a_dependency(dependency):
1✔
915
                prow['LATEST_DEP_QID'] = np.array([dependency['LATEST_QID']], dtype=int)
1✔
916
    return prow
1✔
917

918
def still_a_dependency(dependency):
1✔
919
    """
920
    Defines the criteria for which a dependency is deemed complete (and therefore no longer a dependency).
921

922
     Args:
923
        dependency, Table.Row or dict. Processing row corresponding to the required input for the job in prow.
924
            This must contain keyword accessible values for 'STATUS', and 'LATEST_QID'.
925

926
    Returns:
927
        bool: False if the criteria indicate that the dependency is completed and no longer a blocking factor (ie no longer
928
        a genuine dependency). Returns True if the dependency is still a blocking factor such that the slurm
929
        scheduler needs to be aware of the pending job.
930

931
    """
932
    return dependency['LATEST_QID'] > 0 and dependency['STATUS'] != 'COMPLETED'
1✔
933

934
def get_type_and_tile(erow):
1✔
935
    """
936
    Trivial function to return the OBSTYPE and the TILEID from an exposure table row
937

938
    Args:
939
        erow, Table.Row or dict. Must contain 'OBSTYPE' and 'TILEID' as keywords.
940

941
    Returns:
942
        tuple (str, str), corresponding to the OBSTYPE and TILEID values of the input erow.
943
    """
944
    return str(erow['OBSTYPE']).lower(), erow['TILEID']
×
945

946

947
#############################################
948
#########   Table manipulators   ############
949
#############################################
950
def parse_previous_tables(etable, ptable, night):
1✔
951
    """
952
    This takes in the exposure and processing tables and regenerates all the working memory variables needed for the
953
    daily processing script.
954

955
    Used by the daily processing to define most of its state-ful variables into working memory.
956
    If the processing table is empty, these are simply declared and returned for use.
957
    If the code had previously run and exited (or crashed), however, this will all the code to
958
    re-establish itself by redefining these values.
959

960
    Args:
961
        etable, Table, Exposure table of all exposures that have been dealt with thus far.
962
        ptable, Table, Processing table of all exposures that have been processed.
963
        night, str or int, the night the data was taken.
964

965
    Returns:
966
        tuple: A tuple containing:
967

968
        * arcs, list of dicts, list of the individual arc jobs used for the psfnight (NOT all
969
          the arcs, if multiple sets existed)
970
        * flats, list of dicts, list of the individual flat jobs used for the nightlyflat (NOT
971
          all the flats, if multiple sets existed)
972
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
973
          (if currently processing that tile)
974
        * calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'badcol', 'psfnight'
975
          and 'nightlyflat'. Each key corresponds to a Table.Row or
976
          None. The table.Row() values are for the corresponding
977
          calibration job.
978
        * curtype, None, the obstype of the current job being run. Always None as first new job will define this.
979
        * lasttype, str or None, the obstype of the last individual exposure row to be processed.
980
        * curtile, None, the tileid of the current job (if science). Otherwise None. Always None as first
981
          new job will define this.
982
        * lasttile, str or None, the tileid of the last job (if science). Otherwise None.
983
        * internal_id, int, an internal identifier unique to each job. Increments with each new job. This
984
          is the latest unassigned value.
985
    """
986
    log = get_logger()
×
987
    arcs, flats, sciences = [], [], []
×
988
    calibjobs = {'nightlybias': None, 'ccdcalib': None, 'badcol': None,
×
989
                 'psfnight': None, 'nightlyflat': None, 'linkcal': None,
990
                 'accounted_for': dict()}
991
    curtype,lasttype = None,None
×
992
    curtile,lasttile = None,None
×
993

994
    if len(ptable) > 0:
×
995
        prow = ptable[-1]
×
996
        internal_id = int(prow['INTID'])+1
×
997
        lasttype,lasttile = get_type_and_tile(ptable[-1])
×
998
        jobtypes = ptable['JOBDESC']
×
999

1000
        if 'nightlybias' in jobtypes:
×
1001
            calibjobs['nightlybias'] = table_row_to_dict(ptable[jobtypes=='nightlybias'][0])
×
1002
            log.info("Located nightlybias job in exposure table: {}".format(calibjobs['nightlybias']))
×
1003

1004
        if 'ccdcalib' in jobtypes:
×
1005
            calibjobs['ccdcalib'] = table_row_to_dict(ptable[jobtypes=='ccdcalib'][0])
×
1006
            log.info("Located ccdcalib job in exposure table: {}".format(calibjobs['ccdcalib']))
×
1007

1008
        if 'psfnight' in jobtypes:
×
1009
            calibjobs['psfnight'] = table_row_to_dict(ptable[jobtypes=='psfnight'][0])
×
1010
            log.info("Located joint fit psfnight job in exposure table: {}".format(calibjobs['psfnight']))
×
1011
        elif lasttype == 'arc':
×
1012
            seqnum = 10
×
1013
            for row in ptable[::-1]:
×
1014
                erow = etable[etable['EXPID']==row['EXPID'][0]]
×
1015
                if row['OBSTYPE'].lower() == 'arc' and int(erow['SEQNUM'])<seqnum:
×
1016
                    arcs.append(table_row_to_dict(row))
×
1017
                    seqnum = int(erow['SEQNUM'])
×
1018
                else:
1019
                    break
×
1020
            ## Because we work backword to fill in, we need to reverse them to get chronological order back
1021
            arcs = arcs[::-1]
×
1022

1023
        if 'nightlyflat' in jobtypes:
×
1024
            calibjobs['nightlyflat'] = table_row_to_dict(ptable[jobtypes=='nightlyflat'][0])
×
1025
            log.info("Located joint fit nightlyflat job in exposure table: {}".format(calibjobs['nightlyflat']))
×
1026
        elif lasttype == 'flat':
×
1027
            for row in ptable[::-1]:
×
1028
                erow = etable[etable['EXPID']==row['EXPID'][0]]
×
1029
                if row['OBSTYPE'].lower() == 'flat' and int(erow['SEQTOT']) < 5:
×
1030
                    if float(erow['EXPTIME']) > 100.:
×
1031
                        flats.append(table_row_to_dict(row))
×
1032
                else:
1033
                    break
×
1034
            flats = flats[::-1]
×
1035

1036
        if lasttype.lower() == 'science':
×
1037
            for row in ptable[::-1]:
×
1038
                if row['OBSTYPE'].lower() == 'science' and row['TILEID'] == lasttile and \
×
1039
                   row['JOBDESC'] == 'prestdstar' and row['LASTSTEP'] != 'skysub':
1040
                    sciences.append(table_row_to_dict(row))
×
1041
                else:
1042
                    break
×
1043
            sciences = sciences[::-1]
×
1044
    else:
1045
        internal_id = night_to_starting_iid(night)
×
1046

1047
    return arcs,flats,sciences, \
×
1048
           calibjobs, \
1049
           curtype, lasttype, \
1050
           curtile, lasttile,\
1051
           internal_id
1052

1053
def generate_calibration_dict(ptable, files_to_link=None):
1✔
1054
    """
1055
    This takes in a processing table and regenerates the working memory calibration
1056
    dictionary for dependency tracking. Used by the daily processing to define 
1057
    most of its state-ful variables into working memory.
1058
    If the processing table is empty, these are simply declared and returned for use.
1059
    If the code had previously run and exited (or crashed), however, this will all the code to
1060
    re-establish itself by redefining these values.
1061

1062
    Args:
1063
        ptable, Table, Processing table of all exposures that have been processed.
1064
        files_to_link, set, Set of filenames that the linkcal job will link.
1065

1066
    Returns:
1067
        calibjobs, dict. Dictionary containing 'nightlybias', 'badcol', 'ccdcalib',
1068
            'psfnight', 'nightlyflat', 'linkcal', and 'completed'. Each key corresponds to a
1069
            Table.Row or None. The table.Row() values are for the corresponding
1070
            calibration job.
1071
    """
1072
    log = get_logger()
1✔
1073
    job_to_file_map = get_jobdesc_to_file_map()
1✔
1074
    accounted_for = {'biasnight': False, 'badcolumns': False,
1✔
1075
                     'ctecorrnight': False, 'psfnight': False,
1076
                     'fiberflatnight': False}
1077
    calibjobs = {'nightlybias': None, 'ccdcalib': None, 'badcol': None,
1✔
1078
                 'psfnight': None, 'nightlyflat': None, 'linkcal': None}
1079

1080
    ptable_jobtypes = ptable['JOBDESC']
1✔
1081

1082
    for jobtype in calibjobs.keys():
1✔
1083
        if jobtype in ptable_jobtypes:
1✔
1084
            calibjobs[jobtype] = table_row_to_dict(ptable[ptable_jobtypes==jobtype][0])
×
1085
            log.info(f"Located {jobtype} job in exposure table: {calibjobs[jobtype]}")
×
1086
            if jobtype == 'linkcal':
×
1087
                if files_to_link is not None and len(files_to_link) > 0:
×
1088
                    log.info(f"Assuming existing linkcal job processed "
×
1089
                             + f"{files_to_link} since given in override file.")
1090
                    accounted_for = update_accounted_for_with_linking(accounted_for,
×
1091
                                                                  files_to_link)
1092
                else:
1093
                    err = f"linkcal job exists but no files given: {files_to_link=}"
×
1094
                    log.error(err)
×
1095
                    raise ValueError(err)
×
1096
            elif jobtype == 'ccdcalib':
×
1097
                possible_ccd_files = set(['biasnight', 'badcolumns', 'ctecorrnight'])
×
1098
                if files_to_link is None:
×
1099
                    files_accounted_for = possible_ccd_files
×
1100
                else:
1101
                    files_accounted_for = possible_ccd_files.difference(files_to_link)
×
1102
                    ccd_files_linked = possible_ccd_files.intersection(files_to_link)
×
1103
                    log.info(f"Assuming existing ccdcalib job processed "
×
1104
                             + f"{files_accounted_for} since {ccd_files_linked} "
1105
                             + f"are linked.")
1106
                for fil in files_accounted_for:
×
1107
                    accounted_for[fil] = True
×
1108
            else:
1109
                accounted_for[job_to_file_map[jobtype]] = True
×
1110

1111
    calibjobs['accounted_for'] = accounted_for
1✔
1112
    return calibjobs
1✔
1113

1114
def update_accounted_for_with_linking(accounted_for, files_to_link):
1✔
1115
    """
1116
    This takes in a dictionary summarizing the calibration files accounted for
1117
     and updates it based on the files_to_link, which are assumed to have
1118
     already been linked such that those files already exist on disk and
1119
     don't need ot be generated.
1120

1121
    Parameters
1122
    ----------
1123
        accounted_for: dict
1124
            Dictionary containing 'biasnight', 'badcolumns', 'ctecorrnight',
1125
            'psfnight', and 'fiberflatnight'. Each value is True if file is
1126
            accounted for and False if it is not.
1127
        files_to_link: set
1128
            Set of filenames that the linkcal job will link.
1129

1130
    Returns
1131
    -------
1132
        accounted_for: dict
1133
            Dictionary containing 'biasnight', 'badcolumns', 'ctecorrnight',
1134
            'psfnight', and 'fiberflatnight'. Each value is True if file is
1135
            accounted for and False if it is not.
1136
    """
1137
    log = get_logger()
1✔
1138
    
1139
    for fil in files_to_link:
1✔
1140
        if fil in accounted_for:
1✔
1141
            accounted_for[fil] = True
1✔
1142
        else:
1143
            err = f"{fil} doesn't match an expected filetype: "
×
1144
            err += f"{accounted_for.keys()}"
×
1145
            log.error(err)
×
1146
            raise ValueError(err)
×
1147

1148
    return accounted_for
1✔
1149

1150
def all_calibs_submitted(accounted_for, do_cte_flats):
1✔
1151
    """
1152
    Function that returns the boolean logic to determine if the necessary
1153
    calibration jobs have been submitted for calibration.
1154

1155
    Args:
1156
        accounted_for, dict, Dictionary with keys corresponding to the calibration
1157
            filenames and values of True or False.
1158
        do_cte_flats, bool, whether ctecorrnight files are expected or not.
1159

1160
    Returns:
1161
        bool, True if all necessary calibrations have been submitted or handled, False otherwise.
1162
    """
1163
    test_dict = accounted_for.copy()
1✔
1164
    if not do_cte_flats:
1✔
1165
        test_dict.pop('ctecorrnight')
1✔
1166

1167
    return np.all(list(test_dict.values()))
1✔
1168

1169
def update_and_recursively_submit(proc_table, submits=0, max_resubs=100,
1✔
1170
                                  resubmission_states=None,
1171
                                  no_resub_failed=False, ptab_name=None,
1172
                                  dry_run_level=0, reservation=None,
1173
                                  expids=None, tileids=None):
1174
    """
1175
    Given an processing table, this loops over job rows and resubmits failed jobs (as defined by resubmission_states).
1176
    Before submitting a job, it checks the dependencies for failures. If a dependency needs to be resubmitted, it recursively
1177
    follows dependencies until it finds the first job without a failed dependency and resubmits that. Then resubmits the
1178
    other jobs with the new Slurm jobID's for proper dependency coordination within Slurm.
1179

1180
    Args:
1181
        proc_table, Table, the processing table with a row per job.
1182
        submits, int, the number of submissions made to the queue. Used for saving files and in not overloading the scheduler.
1183
        max_resubs, int, the number of times a job should be resubmitted before giving up. Default is very high at 100.
1184
        resubmission_states, list or array of strings, each element should be a capitalized string corresponding to a
1185
            possible Slurm scheduler state, where you wish for jobs with that
1186
            outcome to be resubmitted
1187
        no_resub_failed: bool. Set to True if you do NOT want to resubmit
1188
            jobs with Slurm status 'FAILED' by default. Default is False.
1189
        ptab_name, str, the full pathname where the processing table should be saved.
1190
        dry_run_level (int, optional): If nonzero, this is a simulated run. Default is 0.
1191
            0 which runs the code normally.
1192
            1 writes all files but doesn't submit any jobs to Slurm.
1193
            2 writes tables but doesn't write scripts or submit anything.
1194
            3 Doesn't write or submit anything but queries Slurm normally for job status.
1195
            4 Doesn't write, submit jobs, or query Slurm.
1196
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
1197
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
1198
        expids: list of ints. The exposure ids to resubmit (along with the jobs they depend on).
1199
        tileids: list of ints. The tile ids to resubmit (along with the jobs they depend on).
1200

1201
    Returns:
1202
        tuple: A tuple containing:
1203

1204
        * proc_table: Table, a table with the same rows as the input except that Slurm and jobid relevant columns have
1205
          been updated for those jobs that needed to be resubmitted.
1206
        * submits: int, the number of submissions made to the queue. This is incremented from the input submits, so it is
1207
          the number of submissions made from this function call plus the input submits value.
1208

1209
    Note:
1210
        This modifies the inputs of both proc_table and submits and returns them.
1211
    """
1212
    log = get_logger()
1✔
1213
    if tileids is not None and expids is not None:
1✔
1214
        msg = f"Provided both expids and tilesids. Please only provide one."
×
1215
        log.critical(msg)
×
1216
        raise AssertionError(msg)
1217
    elif tileids is not None:
1✔
1218
        msg = f"Only resubmitting the following tileids and the jobs they depend on: {tileids=}"
×
1219
        log.info(msg)
×
1220
    elif expids is not None:
1✔
1221
        msg = f"Only resubmitting the following expids and the jobs they depend on: {expids=}"
×
1222
        log.info(msg)
×
1223

1224
    if resubmission_states is None:
1✔
1225
        resubmission_states = get_resubmission_states(no_resub_failed=no_resub_failed)
1✔
1226

1227
    log.info(f"Resubmitting jobs with current states in the following: {resubmission_states}")
1✔
1228
    proc_table = update_from_queue(proc_table, dry_run_level=dry_run_level)
1✔
1229

1230
    log.info("Updated processing table queue information:")
1✔
1231
    cols = ['INTID', 'INT_DEP_IDS', 'EXPID', 'TILEID',
1✔
1232
            'OBSTYPE', 'JOBDESC', 'LATEST_QID', 'STATUS']
1233
    log.info(np.array(cols))
1✔
1234
    for row in proc_table:
1✔
1235
        log.info(np.array(row[cols]))
1✔
1236

1237
    ## If expids or tileids are given, subselect to the processing table rows
1238
    ## that included those exposures or tiles otherwise just list all indices
1239
    ## NOTE: Other rows can still be submitted if the selected rows depend on them
1240
    ## we hand the entire table to recursive_submit_failed(), which will walk the
1241
    ## entire dependency tree as necessary.
1242
    if expids is not None:
1✔
1243
        select_ptab_rows = np.where([np.any(np.isin(prow_eids, expids)) for prow_eids in proc_table['EXPID']])[0]
×
1244
    elif tileids is not None:
1✔
1245
        select_ptab_rows = np.where(np.isin(proc_table['TILEID'], tileids))[0]
×
1246
    else:
1247
        select_ptab_rows = np.arange(len(proc_table))
1✔
1248

1249
    log.info("\n")
1✔
1250
    id_to_row_map = {row['INTID']: rown for rown, row in enumerate(proc_table)}
1✔
1251
    ## Loop over all requested rows and resubmit those that have failed
1252
    for rown in select_ptab_rows:
1✔
1253
        if proc_table['STATUS'][rown] in resubmission_states:
1✔
1254
            proc_table, submits = recursive_submit_failed(rown=rown, proc_table=proc_table,
1✔
1255
                                                          submits=submits, max_resubs=max_resubs,
1256
                                                          id_to_row_map=id_to_row_map,
1257
                                                          ptab_name=ptab_name,
1258
                                                          resubmission_states=resubmission_states,
1259
                                                          reservation=reservation,
1260
                                                          dry_run_level=dry_run_level)
1261

1262
    proc_table = update_from_queue(proc_table, dry_run_level=dry_run_level)
1✔
1263

1264
    return proc_table, submits
1✔
1265

1266
def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, max_resubs=100, ptab_name=None,
1✔
1267
                            resubmission_states=None, reservation=None, dry_run_level=0):
1268
    """
1269
    Given a row of a processing table and the full processing table, this resubmits the given job.
1270
    Before submitting a job, it checks the dependencies for failures in the processing table. If a dependency needs to
1271
    be resubmitted, it recursively follows dependencies until it finds the first job without a failed dependency and
1272
    resubmits that. Then resubmits the other jobs with the new Slurm jobID's for proper dependency coordination within Slurm.
1273

1274
    Args:
1275
        rown, Table.Row, the row of the processing table that you want to resubmit.
1276
        proc_table, Table, the processing table with a row per job.
1277
        submits, int, the number of submissions made to the queue. Used for saving files and in not overloading the scheduler.
1278
        id_to_row_map, dict, lookup dictionary where the keys are internal ids (INTID's) and the values are the row position
1279
            in the processing table.
1280
        max_resubs, int, the number of times a job should be resubmitted before giving up. Default is very high at 100.
1281
        ptab_name, str, the full pathname where the processing table should be saved.
1282
        resubmission_states, list or array of strings, each element should be a capitalized string corresponding to a
1283
            possible Slurm scheduler state, where you wish for jobs with that
1284
            outcome to be resubmitted
1285
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
1286
        dry_run_level (int, optional): If nonzero, this is a simulated run. Default is 0.
1287
            0 which runs the code normally.
1288
            1 writes all files but doesn't submit any jobs to Slurm.
1289
            2 writes tables but doesn't write scripts or submit anything.
1290
            3 Doesn't write or submit anything but queries Slurm normally for job status.
1291
            4 Doesn't write, submit jobs, or query Slurm.
1292
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
1293

1294
    Returns:
1295
        tuple: A tuple containing:
1296

1297
        * proc_table: Table, a table with the same rows as the input except that Slurm and jobid relevant columns have
1298
          been updated for those jobs that needed to be resubmitted.
1299
        * submits: int, the number of submissions made to the queue. This is incremented from the input submits, so it is
1300
          the number of submissions made from this function call plus the input submits value.
1301

1302
    Note:
1303
        This modifies the inputs of both proc_table and submits and returns them.
1304
    """
1305
    log = get_logger()
1✔
1306
    row = proc_table[rown]
1✔
1307
    log.info(f"Identified row {row['INTID']} as needing resubmission.")
1✔
1308
    log.info(f"\t{row['INTID']}: Tileid={row['TILEID']}, Expid(s)={row['EXPID']}, Jobdesc={row['JOBDESC']}")
1✔
1309
    if len(proc_table['ALL_QIDS'][rown]) > max_resubs:
1✔
1310
        log.warning(f"Tileid={row['TILEID']}, Expid(s)={row['EXPID']}, "
×
1311
                    + f"Jobdesc={row['JOBDESC']} has already been submitted "
1312
                    + f"{max_resubs+1} times. Not resubmitting.")
1313
        proc_table['STATUS'][rown] = "MAX_RESUB"
×
1314
        return proc_table, submits
×
1315
    if resubmission_states is None:
1✔
1316
        resubmission_states = get_resubmission_states()
×
1317
    ideps = proc_table['INT_DEP_IDS'][rown]
1✔
1318
    if ideps is None or len(ideps)==0:
1✔
1319
        proc_table['LATEST_DEP_QID'][rown] = np.ndarray(shape=0).astype(int)
×
1320
    else:
1321
        all_valid_states = list(resubmission_states.copy())
1✔
1322
        good_states = ['RUNNING','PENDING','SUBMITTED','COMPLETED']
1✔
1323
        all_valid_states.extend(good_states)
1✔
1324
        othernight_idep_row_lookup = {}
1✔
1325
        for idep in np.sort(np.atleast_1d(ideps)):
1✔
1326
            if idep not in id_to_row_map:
1✔
1327
                if idep // 1000 != row['INTID'] // 1000:
1✔
1328
                    log.debug("Internal ID: %d not in id_to_row_map. "
1✔
1329
                             + "This is expected since it is from another day. ", idep)
1330
                    reference_night = 20000000 + (idep // 1000)
1✔
1331
                    reftab = read_minimal_full_proctab_cols(nights=[reference_night])
1✔
1332
                    if reftab is None:
1✔
1333
                        msg = f"The dependency is from night={reference_night}" \
×
1334
                              + f" but read_minimal_full_proctab_cols couldn't" \
1335
                              + f" locate that processing table, this is a " \
1336
                              +  f"fatal error."
1337
                        log.critical(msg)
×
1338
                        raise ValueError(msg)
×
1339
                    reftab = update_from_queue(reftab, dry_run_level=dry_run_level)
1✔
1340
                    entry = reftab[reftab['INTID'] == idep][0]
1✔
1341
                    if entry['STATUS'] not in good_states:
1✔
1342
                        msg = f"Internal ID: {idep} not in id_to_row_map. " \
1✔
1343
                              + f"Since the dependency is from night={reference_night} " \
1344
                              + f"and that job isn't in a good state this is an " \
1345
                              + f"error we can't overcome."
1346
                        log.error(msg)
1✔
1347
                        proc_table['STATUS'][rown] = "DEP_NOT_SUBD"
1✔
1348
                        return proc_table, submits
1✔
1349
                    else:
1350
                        ## otherwise if incomplete, just update the cache to use this
1351
                        ## in the next stage
1352
                        othernight_idep_row_lookup[idep] = entry
1✔
1353
                        update_full_ptab_cache(reftab)
1✔
1354
                else:
1355
                    msg = f"Internal ID: {idep} not in id_to_row_map. " \
×
1356
                         + f"Since the dependency is from the same night" \
1357
                         + f" and we can't find it, this is a fatal error."
1358
                    log.critical(msg)
×
1359
                    raise ValueError(msg)
×
1360
            elif proc_table['STATUS'][id_to_row_map[idep]] not in all_valid_states:
1✔
1361
                log.error(f"Proc INTID: {proc_table['INTID'][rown]} depended on" +
×
1362
                            f" INTID {proc_table['INTID'][id_to_row_map[idep]]}" +
1363
                            f" but that exposure has state" +
1364
                            f" {proc_table['STATUS'][id_to_row_map[idep]]} that" +
1365
                            f" isn't in the list of resubmission states." +
1366
                            f" Exiting this job's resubmission attempt.")
1367
                proc_table['STATUS'][rown] = "DEP_NOT_SUBD"
×
1368
                return proc_table, submits
×
1369
        qdeps = []
1✔
1370
        for idep in np.sort(np.atleast_1d(ideps)):
1✔
1371
            if idep in id_to_row_map:
1✔
1372
                if proc_table['STATUS'][id_to_row_map[idep]] in resubmission_states:
1✔
1373
                    proc_table, submits = recursive_submit_failed(id_to_row_map[idep],
×
1374
                                                                  proc_table, submits,
1375
                                                                  id_to_row_map,
1376
                                                                  reservation=reservation,
1377
                                                                  dry_run_level=dry_run_level)
1378
                ## Now that we've resubmitted the dependency if necessary,
1379
                ## add the most recent QID to the list assuming it isn't COMPLETED
1380
                if still_a_dependency(proc_table[id_to_row_map[idep]]):
1✔
1381
                    qdeps.append(proc_table['LATEST_QID'][id_to_row_map[idep]])
1✔
1382
                else:
1383
                    log.info(f"{idep} is COMPLETED. Not submitting as a dependency.")
×
1384
            else:
1385
                ## Since we verified above that the cross night QID is still
1386
                ## either pending or successful, add that to the list of QID's
1387
                if still_a_dependency(othernight_idep_row_lookup[idep]):
1✔
1388
                    qdeps.append(othernight_idep_row_lookup[idep]['LATEST_QID'])
1✔
1389
                else:
1390
                    log.info(f"{idep} is COMPLETED. Not submitting as a dependency.")
×
1391

1392
        qdeps = np.atleast_1d(qdeps)
1✔
1393
        proc_table['LATEST_DEP_QID'][rown] = qdeps
1✔
1394
        if len(qdeps) < len(ideps):
1✔
UNCOV
1395
            log.warning(f"Number of internal dependencies was {len(ideps)} but number "
×
1396
                        + f"of queue deps is {len(qdeps)} for Rown {rown}, ideps {ideps}."
1397
                        + " This is expected if the ideps were status=COMPLETED")
1398

1399
    proc_table[rown] = submit_batch_script(proc_table[rown], reservation=reservation,
1✔
1400
                                           strictly_successful=True, dry_run=dry_run_level)
1401
    submits += 1
1✔
1402

1403
    if dry_run_level < 3:
1✔
1404
        if ptab_name is None:
×
1405
            write_table(proc_table, tabletype='processing', overwrite=True)
×
1406
        else:
1407
            write_table(proc_table, tablename=ptab_name, overwrite=True)
×
1408
        sleep_and_report(0.1 + 0.1*(submits % 10 == 0),
×
1409
                         message_suffix=f"after submitting job to queue and writing proctable")
1410
    return proc_table, submits
1✔
1411

1412

1413
#########################################
1414
########     Joint fit     ##############
1415
#########################################
1416
def joint_fit(ptable, prows, internal_id, queue, reservation, descriptor, z_submit_types=None,
1✔
1417
              dry_run=0, strictly_successful=False, check_for_outputs=True, resubmit_partial_complete=True,
1418
              system_name=None):
1419
    """
1420
    DEPRECATED
1421
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1422
    joint fitting job given by descriptor. If the joint fitting job is standard star fitting, the post standard star fits
1423
    for all the individual exposures also created and submitted. The returned ptable has all of these rows added to the
1424
    table given as input.
1425

1426
    Args:
1427
        ptable (Table): The processing table where each row is a processed job.
1428
        prows (list or array of dict): The rows corresponding to the individual exposure jobs that are
1429
            inputs to the joint fit.
1430
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1431
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1432
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1433
        descriptor (str): Description of the joint fitting job. Can either be 'science' or 'stdstarfit', 'arc' or 'psfnight',
1434
            or 'flat' or 'nightlyflat'.
1435
        z_submit_types (list of str, optional): The "group" types of redshifts that should be submitted with each
1436
            exposure. If not specified or None, then no redshifts are submitted.
1437
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
1438
            0 which runs the code normally.
1439
            1 writes all files but doesn't submit any jobs to Slurm.
1440
            2 writes tables but doesn't write scripts or submit anything.
1441
            3 Doesn't write or submit anything but queries Slurm normally for job status.
1442
            4 Doesn't write, submit jobs, or query Slurm.
1443
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
1444
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1445
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1446
            than failing completely from failed calibrations. Default is False.
1447
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1448
            data products for the script being submitted. If all files exist and this is True,
1449
            then the script will not be submitted. If some files exist and this is True, only the
1450
            subset of the cameras without the final data products will be generated and submitted.
1451
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1452
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1453
            remaining cameras not found to exist.
1454
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1455

1456
    Returns:
1457
        tuple: A tuple containing:
1458

1459
        * ptable, Table. The same processing table as input except with added rows for the joint fit job and, in the case
1460
          of a stdstarfit, the poststdstar science exposure jobs.
1461
        * joint_prow, dict. Row of a processing table corresponding to the joint fit job.
1462
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1463
    """
1464
    log = get_logger()
×
1465
    if len(prows) < 1:
×
1466
        return ptable, None, internal_id
×
1467

1468
    if descriptor is None:
×
1469
        return ptable, None
×
1470
    elif descriptor == 'arc':
×
1471
        descriptor = 'psfnight'
×
1472
    elif descriptor == 'flat':
×
1473
        descriptor = 'nightlyflat'
×
1474
    elif descriptor == 'science':
×
1475
        if z_submit_types is None or len(z_submit_types) == 0:
×
1476
            descriptor = 'stdstarfit'
×
1477

1478
    if descriptor not in ['psfnight', 'nightlyflat', 'science','stdstarfit']:
×
1479
        return ptable, None, internal_id
×
1480

1481
    log.info(" ")
×
1482
    log.info(f"Joint fit criteria found. Running {descriptor}.\n")
×
1483

1484
    if descriptor == 'science':
×
1485
        joint_prow, internal_id = make_joint_prow(prows, descriptor='stdstarfit', internal_id=internal_id)
×
1486
    else:
1487
        joint_prow, internal_id = make_joint_prow(prows, descriptor=descriptor, internal_id=internal_id)
×
1488
    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1489
                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1490
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1491
    ptable.add_row(joint_prow)
×
1492

1493
    if descriptor in ['science','stdstarfit']:
×
1494
        if descriptor == 'science':
×
1495
            zprows = []
×
1496
        log.info(" ")
×
1497
        log.info(f"Submitting individual science exposures now that joint fitting of standard stars is submitted.\n")
×
1498
        for row in prows:
×
1499
            if row['LASTSTEP'] == 'stdstarfit':
×
1500
                continue
×
1501
            row['JOBDESC'] = 'poststdstar'
×
1502

1503
            # poststdstar job can't process cameras not included in its stdstar joint fit
1504
            stdcamword = joint_prow['PROCCAMWORD']
×
1505
            thiscamword = row['PROCCAMWORD']
×
1506
            proccamword = camword_intersection([stdcamword, thiscamword])
×
1507
            if proccamword != thiscamword:
×
1508
                dropcams = difference_camwords(thiscamword, proccamword)
×
1509
                assert dropcams != ''  #- i.e. if they differ, we should be dropping something
×
1510
                log.warning(f"Dropping exp {row['EXPID']} poststdstar cameras {dropcams} since they weren't included in stdstar fit {stdcamword}")
×
1511
                row['PROCCAMWORD'] = proccamword
×
1512

1513
            row['INTID'] = internal_id
×
1514
            internal_id += 1
×
1515
            row['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
×
1516
            row = assign_dependency(row, joint_prow)
×
1517
            row = create_and_submit(row, queue=queue, reservation=reservation, dry_run=dry_run,
×
1518
                                    strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1519
                                    resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1520
            ptable.add_row(row)
×
1521
            if descriptor == 'science' and row['LASTSTEP'] == 'all':
×
1522
                zprows.append(row)
×
1523

1524
    ## Now run redshifts
1525
    if descriptor == 'science' and len(zprows) > 0 and z_submit_types is not None:
×
1526
        prow_selection = (  (ptable['OBSTYPE'] == 'science')
×
1527
                          & (ptable['LASTSTEP'] == 'all')
1528
                          & (ptable['JOBDESC'] == 'poststdstar')
1529
                          & (ptable['TILEID'] == int(zprows[0]['TILEID'])) )
1530
        nightly_zprows = []
×
1531
        if np.sum(prow_selection) == len(zprows):
×
1532
            nightly_zprows = zprows.copy()
×
1533
        else:
1534
            for prow in ptable[prow_selection]:
×
1535
                nightly_zprows.append(table_row_to_dict(prow))
×
1536

1537
        for zsubtype in z_submit_types:
×
1538
            if zsubtype == 'perexp':
×
1539
                for zprow in zprows:
×
1540
                    log.info(" ")
×
1541
                    log.info(f"Submitting redshift fit of type {zsubtype} for TILEID {zprow['TILEID']} and EXPID {zprow['EXPID']}.\n")
×
1542
                    joint_prow, internal_id = make_joint_prow([zprow], descriptor=zsubtype, internal_id=internal_id)
×
1543
                    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1544
                                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1545
                                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1546
                    ptable.add_row(joint_prow)
×
1547
            else:
1548
                log.info(" ")
×
1549
                log.info(f"Submitting joint redshift fits of type {zsubtype} for TILEID {nightly_zprows[0]['TILEID']}.")
×
1550
                expids = [prow['EXPID'][0] for prow in nightly_zprows]
×
1551
                log.info(f"Expids: {expids}.\n")
×
1552
                joint_prow, internal_id = make_joint_prow(nightly_zprows, descriptor=zsubtype, internal_id=internal_id)
×
1553
                joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1554
                                               strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1555
                                               resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1556
                ptable.add_row(joint_prow)
×
1557

1558
    if descriptor in ['psfnight', 'nightlyflat']:
×
1559
        log.info(f"Setting the calibration exposures as calibrators in the processing table.\n")
×
1560
        ptable = set_calibrator_flag(prows, ptable)
×
1561

1562
    return ptable, joint_prow, internal_id
×
1563

1564
def joint_cal_fit(descriptor, ptable, prows, internal_id, queue, reservation,
1✔
1565
                  dry_run=0, strictly_successful=False, check_for_outputs=True,
1566
                  resubmit_partial_complete=True, system_name=None):
1567
    """
1568
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1569
    joint fitting job given by descriptor. If the joint fitting job is standard star fitting, the post standard star fits
1570
    for all the individual exposures also created and submitted. The returned ptable has all of these rows added to the
1571
    table given as input.
1572

1573
    Args:
1574
        descriptor (str): Description of the joint fitting job. Can either be 'science' or 'stdstarfit', 'arc' or 'psfnight',
1575
            or 'flat' or 'nightlyflat'.
1576
        prows (list or array of dict): The rows corresponding to the individual exposure jobs that are
1577
            inputs to the joint fit.
1578
        ptable (Table): The processing table where each row is a processed job.
1579
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1580
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1581
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1582
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
1583
            0 which runs the code normally.
1584
            1 writes all files but doesn't submit any jobs to Slurm.
1585
            2 writes tables but doesn't write scripts or submit anything.
1586
            3 Doesn't write or submit anything but queries Slurm normally for job status.
1587
            4 Doesn't write, submit jobs, or query Slurm.
1588
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
1589
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1590
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1591
            than failing completely from failed calibrations. Default is False.
1592
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1593
            data products for the script being submitted. If all files exist and this is True,
1594
            then the script will not be submitted. If some files exist and this is True, only the
1595
            subset of the cameras without the final data products will be generated and submitted.
1596
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1597
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1598
            remaining cameras not found to exist.
1599
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1600

1601
    Returns:
1602
        tuple: A tuple containing:
1603

1604
        * ptable, Table. The same processing table as input except with added rows for the joint fit job and, in the case
1605
          of a stdstarfit, the poststdstar science exposure jobs.
1606
        * joint_prow, dict. Row of a processing table corresponding to the joint fit job.
1607
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1608
    """
1609
    log = get_logger()
×
1610
    if len(prows) < 1:
×
1611
        return ptable, None, internal_id
×
1612

1613
    if descriptor is None:
×
1614
        return ptable, None
×
1615
    elif descriptor == 'arc':
×
1616
        descriptor = 'psfnight'
×
1617
    elif descriptor == 'flat':
×
1618
        descriptor = 'nightlyflat'
×
1619

1620
    if descriptor not in ['psfnight', 'nightlyflat']:
×
1621
        return ptable, None, internal_id
×
1622

1623
    log.info(" ")
×
1624
    log.info(f"Joint fit criteria found. Running {descriptor}.\n")
×
1625

1626
    joint_prow, internal_id = make_joint_prow(prows, descriptor=descriptor, internal_id=internal_id)
×
1627
    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1628
                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1629
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1630
    ptable.add_row(joint_prow)
×
1631

1632
    if descriptor in ['psfnight', 'nightlyflat']:
×
1633
        log.info(f"Setting the calibration exposures as calibrators in the processing table.\n")
×
1634
        ptable = set_calibrator_flag(prows, ptable)
×
1635

1636
    return ptable, joint_prow, internal_id
×
1637

1638

1639
#########################################
1640
########     Redshifts     ##############
1641
#########################################
1642
def submit_redshifts(ptable, prows, tnight, internal_id, queue, reservation,
1✔
1643
              dry_run=0, strictly_successful=False,
1644
              check_for_outputs=True, resubmit_partial_complete=True,
1645
              z_submit_types=None, system_name=None):
1646
    """
1647
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1648
    tilenight job given by descriptor. The returned ptable has all of these rows added to the
1649
    table given as input.
1650

1651
    Args:
1652
        ptable (Table): The processing table where each row is a processed job.
1653
        prows (list or array of dict): Unsubmitted prestdstar jobs that are first steps in tilenight.
1654
        tnight (Table.Row): The processing table row of the tilenight job on which the redshifts depend.
1655
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1656
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1657
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1658
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
1659
            0 which runs the code normally.
1660
            1 writes all files but doesn't submit any jobs to Slurm.
1661
            2 writes tables but doesn't write scripts or submit anything.
1662
            3 Doesn't write or submit anything but queries Slurm normally for job status.
1663
            4 Doesn't write, submit jobs, or query Slurm.
1664
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
1665
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1666
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1667
            than failing completely from failed calibrations. Default is False.
1668
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1669
            data products for the script being submitted. If all files exist and this is True,
1670
            then the script will not be submitted. If some files exist and this is True, only the
1671
            subset of the cameras without the final data products will be generated and submitted.
1672
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1673
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1674
            remaining cameras not found to exist.
1675
        z_submit_types (list of str): The "group" types of redshifts that should be submitted with each
1676
            exposure. If not specified or None, then no redshifts are submitted.
1677
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1678

1679
    Returns:
1680
        tuple: A tuple containing:
1681

1682
        * ptable, Table. The same processing table as input except with added rows for the joint fit job.
1683
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1684
    """
1685
    log = get_logger()
1✔
1686
    if len(prows) < 1 or z_submit_types == None:
1✔
1687
        return ptable, internal_id
1✔
1688

1689
    log.info(" ")
1✔
1690
    log.info(f"Running redshifts.\n")
1✔
1691

1692
    ## Now run redshifts
1693
    zprows = []
1✔
1694
    for row in prows:
1✔
1695
        if row['LASTSTEP'] == 'all':
1✔
1696
            zprows.append(row)
1✔
1697

1698
    if len(zprows) > 0:
1✔
1699
        for zsubtype in z_submit_types:
1✔
1700
            log.info(" ")
1✔
1701
            log.info(f"Submitting joint redshift fits of type {zsubtype} for TILEID {zprows[0]['TILEID']}.")
1✔
1702
            if zsubtype == 'perexp':
1✔
1703
                for zprow in zprows:
×
1704
                    log.info(f"EXPID: {zprow['EXPID']}.\n")
×
1705
                    redshift_prow = make_redshift_prow([zprow], tnight, descriptor=zsubtype, internal_id=internal_id)
×
1706
                    internal_id += 1
×
1707
                    redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1708
                                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1709
                                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1710
                    ptable.add_row(redshift_prow)
×
1711
            elif zsubtype == 'cumulative':
1✔
1712
                tileids = np.unique([prow['TILEID'] for prow in zprows])
1✔
1713
                if len(tileids) > 1:
1✔
1714
                    msg = f"Error, more than one tileid provided for cumulative redshift job: {tileids}"
×
1715
                    log.critical(msg)
×
1716
                    raise ValueError(msg)
×
1717
                nights = np.unique([prow['NIGHT'] for prow in zprows])
1✔
1718
                if len(nights) > 1:
1✔
1719
                    msg = f"Error, more than one night provided for cumulative redshift job: {nights}"
×
1720
                    log.critical(msg)
×
1721
                    raise ValueError(msg)
×
1722
                tileid, night = tileids[0], nights[0]
1✔
1723
                ## For cumulative redshifts, get any existing processing rows for tile
1724
                matched_prows = read_minimal_tilenight_proctab_cols(tileids=tileids)
1✔
1725
                ## Identify the processing rows that should be assigned as dependecies
1726
                ## tnight should be first such that the new job inherits the other metadata from it
1727
                tnights = [tnight]
1✔
1728
                if matched_prows is not None:
1✔
1729
                    matched_prows = matched_prows[matched_prows['NIGHT'] <= night]
1✔
1730
                    for prow in matched_prows:
1✔
1731
                        if prow['INTID'] != tnight['INTID']:
1✔
1732
                            tnights.append(prow)
1✔
1733
                log.info(f"Internal Processing IDs: {[prow['INTID'] for prow in tnights]}.\n")
1✔
1734
                ## Identify all exposures that should go into the fit
1735
                expids = [prow['EXPID'][0] for prow in zprows]
1✔
1736
                ## note we can actually get the full list of exposures, but for now
1737
                ## we'll stay consistent with old processing where we only list exposures
1738
                ## from the current night
1739
                ## For cumulative redshifts, get valid expids from exptables
1740
                #matched_erows = read_minimal_science_exptab_cols(tileids=tileids)
1741
                #matched_erows = matched_erows[matched_erows['NIGHT']<=night]
1742
                #expids = list(set([prow['EXPID'][0] for prow in zprows])+set(matched_erows['EXPID']))
1743
                log.info(f"Expids: {expids}.\n")
1✔
1744
                redshift_prow, internal_id = make_joint_prow(tnights, descriptor=zsubtype, internal_id=internal_id)
1✔
1745
                redshift_prow['EXPID'] = expids
1✔
1746
                redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
1✔
1747
                                               strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1748
                                               resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1749
                ptable.add_row(redshift_prow)
1✔
1750
            else: # pernight
1751
                expids = [prow['EXPID'][0] for prow in zprows]
×
1752
                log.info(f"Expids: {expids}.\n")
×
1753
                redshift_prow = make_redshift_prow(zprows, tnight, descriptor=zsubtype, internal_id=internal_id)
×
1754
                internal_id += 1
×
1755
                redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1756
                                               strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1757
                                               resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1758
                ptable.add_row(redshift_prow)
×
1759

1760
    return ptable, internal_id
1✔
1761

1762
#########################################
1763
########     Tilenight     ##############
1764
#########################################
1765
def submit_tilenight(ptable, prows, calibjobs, internal_id, queue, reservation,
1✔
1766
              dry_run=0, strictly_successful=False, resubmit_partial_complete=True,
1767
              system_name=None, use_specter=False, extra_job_args=None):
1768
    """
1769
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1770
    tilenight job given by descriptor. The returned ptable has all of these rows added to the
1771
    table given as input.
1772

1773
    Args:
1774
        ptable (Table): The processing table where each row is a processed job.
1775
        prows (list or array of dict): Unsubmitted prestdstar jobs that are first steps in tilenight.
1776
        calibjobs (dict): Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
1777
            and 'nightlyflat'. Each key corresponds to a Table.Row or
1778
            None. The table.Row() values are for the corresponding
1779
            calibration job.
1780
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1781
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1782
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1783
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
1784
            0 which runs the code normally.
1785
            1 writes all files but doesn't submit any jobs to Slurm.
1786
            2 writes tables but doesn't write scripts or submit anything.
1787
            3 Doesn't write or submit anything but queries Slurm normally for job status.
1788
            4 Doesn't write, submit jobs, or query Slurm.
1789
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
1790
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1791
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1792
            than failing completely from failed calibrations. Default is False.
1793
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1794
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1795
            remaining cameras not found to exist.
1796
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1797
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
1798
        extra_job_args (dict): Dictionary with key-value pairs that specify additional
1799
            information used for a specific type of job. Examples include
1800
            laststeps for for tilenight, etc.
1801

1802
    Returns:
1803
        tuple: A tuple containing:
1804

1805
        * ptable, Table. The same processing table as input except with added rows for the joint fit job.
1806
        * tnight_prow, dict. Row of a processing table corresponding to the tilenight job.
1807
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1808
    """
1809
    log = get_logger()
1✔
1810
    if len(prows) < 1:
1✔
1811
        return ptable, None, internal_id
×
1812

1813
    log.info(" ")
1✔
1814
    log.info(f"Running tilenight.\n")
1✔
1815

1816
    tnight_prow = make_tnight_prow(prows, calibjobs, internal_id=internal_id)
1✔
1817
    internal_id += 1
1✔
1818
    tnight_prow = create_and_submit(tnight_prow, queue=queue, reservation=reservation, dry_run=dry_run,
1✔
1819
                                   strictly_successful=strictly_successful, check_for_outputs=False,
1820
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name,
1821
                                   use_specter=use_specter, extra_job_args=extra_job_args)
1822
    ptable.add_row(tnight_prow)
1✔
1823

1824
    return ptable, tnight_prow, internal_id
1✔
1825

1826
## wrapper functions for joint fitting
1827
def science_joint_fit(ptable, sciences, internal_id, queue='realtime', reservation=None,
1✔
1828
                      z_submit_types=None, dry_run=0, strictly_successful=False,
1829
                      check_for_outputs=True, resubmit_partial_complete=True,
1830
                      system_name=None):
1831
    """
1832
    Wrapper function for desiproc.workflow.processing.joint_fit specific to the stdstarfit joint fit and redshift fitting.
1833

1834
    All variables are the same except:
1835

1836
        Arg 'sciences' is mapped to the prows argument of joint_fit.
1837
        The joint_fit argument descriptor is pre-defined as 'science'.
1838
    """
1839
    return joint_fit(ptable=ptable, prows=sciences, internal_id=internal_id, queue=queue, reservation=reservation,
×
1840
                     descriptor='science', z_submit_types=z_submit_types, dry_run=dry_run,
1841
                     strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1842
                     resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1843

1844

1845
def flat_joint_fit(ptable, flats, internal_id, queue='realtime',
1✔
1846
                   reservation=None, dry_run=0, strictly_successful=False,
1847
                   check_for_outputs=True, resubmit_partial_complete=True,
1848
                   system_name=None):
1849
    """
1850
    Wrapper function for desiproc.workflow.processing.joint_fit specific to the nightlyflat joint fit.
1851

1852
    All variables are the same except:
1853

1854
        Arg 'flats' is mapped to the prows argument of joint_fit.
1855
        The joint_fit argument descriptor is pre-defined as 'nightlyflat'.
1856
    """
1857
    return joint_fit(ptable=ptable, prows=flats, internal_id=internal_id, queue=queue, reservation=reservation,
×
1858
                     descriptor='nightlyflat', dry_run=dry_run, strictly_successful=strictly_successful,
1859
                     check_for_outputs=check_for_outputs, resubmit_partial_complete=resubmit_partial_complete,
1860
                     system_name=system_name)
1861

1862

1863
def arc_joint_fit(ptable, arcs, internal_id, queue='realtime',
1✔
1864
                  reservation=None, dry_run=0, strictly_successful=False,
1865
                  check_for_outputs=True, resubmit_partial_complete=True,
1866
                  system_name=None):
1867
    """
1868
    Wrapper function for desiproc.workflow.processing.joint_fit specific to the psfnight joint fit.
1869

1870
    All variables are the same except:
1871

1872
        Arg 'arcs' is mapped to the prows argument of joint_fit.
1873
        The joint_fit argument descriptor is pre-defined as 'psfnight'.
1874
    """
1875
    return joint_fit(ptable=ptable, prows=arcs, internal_id=internal_id, queue=queue, reservation=reservation,
×
1876
                     descriptor='psfnight', dry_run=dry_run, strictly_successful=strictly_successful,
1877
                     check_for_outputs=check_for_outputs, resubmit_partial_complete=resubmit_partial_complete,
1878
                     system_name=system_name)
1879

1880
def make_joint_prow(prows, descriptor, internal_id):
1✔
1881
    """
1882
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
1883
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
1884
    input prows).
1885

1886
    Args:
1887
        prows, list or array of dicts. The rows corresponding to the individual exposure jobs that are
1888
            inputs to the joint fit.
1889
        descriptor, str. Description of the joint fitting job. Can either be 'stdstarfit', 'psfnight', or 'nightlyflat'.
1890
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1891

1892
    Returns:
1893
        dict: Row of a processing table corresponding to the joint fit job.
1894
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1895
    """
1896
    log = get_logger()
1✔
1897
    first_row = table_row_to_dict(prows[0])
1✔
1898
    joint_prow = first_row.copy()
1✔
1899

1900
    joint_prow['INTID'] = internal_id
1✔
1901
    internal_id += 1
1✔
1902
    joint_prow['JOBDESC'] = descriptor
1✔
1903
    joint_prow['LATEST_QID'] = -99
1✔
1904
    joint_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
1✔
1905
    joint_prow['SUBMIT_DATE'] = -99
1✔
1906
    joint_prow['STATUS'] = 'UNSUBMITTED'
1✔
1907
    joint_prow['SCRIPTNAME'] = ''
1✔
1908
    joint_prow['EXPID'] = np.unique(np.concatenate([currow['EXPID'] for currow in prows])).astype(int)
1✔
1909

1910
    ## Assign the PROCCAMWORD based on the descriptor and the input exposures
1911
    ## UPDATE 2024-04-24: badamps are now included in arc/flat joint fits,
1912
    ## so grab all PROCCAMWORDs instead of filtering out BADAMP cameras
1913
    ## For flats we want any camera that exists in all 12 exposures
1914
    ## For arcs we want any camera that exists in at least 3 exposures
1915
    pcamwords = [prow['PROCCAMWORD'] for prow in prows]
1✔
1916
    if descriptor in 'stdstarfit':
1✔
1917
        joint_prow['PROCCAMWORD'] = camword_union(pcamwords,
×
1918
                                                  full_spectros_only=True)
1919
    elif descriptor in ['pernight', 'cumulative']:
1✔
1920
        joint_prow['PROCCAMWORD'] = camword_union(pcamwords,
1✔
1921
                                                  full_spectros_only=False)
1922
    elif descriptor == 'nightlyflat':
1✔
1923
        joint_prow['PROCCAMWORD'] = camword_intersection(pcamwords,
1✔
1924
                                                         full_spectros_only=False)
1925
    elif descriptor == 'psfnight':
1✔
1926
        ## Count number of exposures each camera is present for
1927
        camcheck = {}
1✔
1928
        for camword in pcamwords:
1✔
1929
            for cam in decode_camword(camword):
1✔
1930
                if cam in camcheck:
1✔
1931
                    camcheck[cam] += 1
1✔
1932
                else:
1933
                    camcheck[cam] = 1
1✔
1934
        ## if exists in 3 or more exposures, then include it
1935
        goodcams = []
1✔
1936
        for cam,camcount in camcheck.items():
1✔
1937
            if camcount >= 3:
1✔
1938
                goodcams.append(cam)
1✔
1939
        joint_prow['PROCCAMWORD'] = create_camword(goodcams)
1✔
1940
    else:
1941
        log.warning("Warning asked to produce joint proc table row for unknown"
×
1942
                    + f" job description {descriptor}")
1943

1944
    joint_prow = assign_dependency(joint_prow, dependency=prows)
1✔
1945
    return joint_prow, internal_id
1✔
1946

1947
def make_exposure_prow(erow, int_id, calibjobs, jobdesc=None):
1✔
1948
    prow = erow_to_prow(erow)
1✔
1949
    prow['INTID'] = int_id
1✔
1950
    int_id += 1
1✔
1951
    if jobdesc is None:
1✔
1952
        prow['JOBDESC'] = prow['OBSTYPE']
1✔
1953
    else:
1954
        prow['JOBDESC'] = jobdesc
1✔
1955
    prow = define_and_assign_dependency(prow, calibjobs)
1✔
1956
    return prow, int_id
1✔
1957

1958
def make_tnight_prow(prows, calibjobs, internal_id):
1✔
1959
    """
1960
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
1961
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
1962
    input prows).
1963

1964
    Args:
1965
        prows, list or array of dicts. Unsumbitted rows corresponding to the individual prestdstar jobs that are
1966
            the first steps of tilenight.
1967
        calibjobs, dict. Dictionary containing keys that each corresponds to a Table.Row or
1968
            None, with each table.Row() value corresponding to a calibration job
1969
            on which the tilenight job depends.
1970
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1971

1972
    Returns:
1973
        dict: Row of a processing table corresponding to the tilenight job.
1974
    """
1975
    first_row = table_row_to_dict(prows[0])
1✔
1976
    joint_prow = first_row.copy()
1✔
1977

1978
    joint_prow['INTID'] = internal_id
1✔
1979
    joint_prow['JOBDESC'] = 'tilenight'
1✔
1980
    joint_prow['LATEST_QID'] = -99
1✔
1981
    joint_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
1✔
1982
    joint_prow['SUBMIT_DATE'] = -99
1✔
1983
    joint_prow['STATUS'] = 'UNSUBMITTED'
1✔
1984
    joint_prow['SCRIPTNAME'] = ''
1✔
1985
    joint_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int)
1✔
1986

1987
    joint_prow = define_and_assign_dependency(joint_prow, calibjobs, use_tilenight=True)
1✔
1988

1989
    return joint_prow
1✔
1990

1991
def make_redshift_prow(prows, tnights, descriptor, internal_id):
1✔
1992
    """
1993
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
1994
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
1995
    input prows).
1996

1997
    Args:
1998
        prows, list or array of dicts. Unsumbitted rows corresponding to the individual prestdstar jobs that are
1999
            the first steps of tilenight.
2000
        tnights, list or array of Table.Row objects. Rows corresponding to the tilenight jobs on which the redshift job depends.
2001
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
2002

2003
    Returns:
2004
        dict: Row of a processing table corresponding to the tilenight jobs.
2005
    """
2006
    first_row = table_row_to_dict(prows[0])
×
2007
    redshift_prow = first_row.copy()
×
2008

2009
    redshift_prow['INTID'] = internal_id
×
2010
    redshift_prow['JOBDESC'] = descriptor
×
2011
    redshift_prow['LATEST_QID'] = -99
×
2012
    redshift_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
×
2013
    redshift_prow['SUBMIT_DATE'] = -99
×
2014
    redshift_prow['STATUS'] = 'UNSUBMITTED'
×
2015
    redshift_prow['SCRIPTNAME'] = ''
×
2016
    redshift_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int)
×
2017

2018
    redshift_prow = assign_dependency(redshift_prow,dependency=tnights)
×
2019

2020
    return redshift_prow
×
2021

2022
def checkfor_and_submit_joint_job(ptable, arcs, flats, sciences, calibjobs,
1✔
2023
                                  lasttype, internal_id, z_submit_types=None, dry_run=0,
2024
                                  queue='realtime', reservation=None, strictly_successful=False,
2025
                                  check_for_outputs=True, resubmit_partial_complete=True,
2026
                                  system_name=None):
2027
    """
2028
    Takes all the state-ful data from daily processing and determines whether a joint fit needs to be submitted. Places
2029
    the decision criteria into a single function for easier maintainability over time. These are separate from the
2030
    new standard manifest*.json method of indicating a calibration sequence is complete. That is checked independently
2031
    elsewhere and doesn't interact with this.
2032

2033
    Args:
2034
        ptable (Table): Processing table of all exposures that have been processed.
2035
        arcs (list of dict): list of the individual arc jobs to be used for the psfnight (NOT all
2036
            the arcs, if multiple sets existed). May be empty if none identified yet.
2037
        flats (list of dict): list of the individual flat jobs to be used for the nightlyflat (NOT
2038
            all the flats, if multiple sets existed). May be empty if none identified yet.
2039
        sciences (list of dict): list of the most recent individual prestdstar science exposures
2040
            (if currently processing that tile). May be empty if none identified yet.
2041
        calibjobs (dict): Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
2042
            and 'nightlyflat'. Each key corresponds to a Table.Row or
2043
            None. The table.Row() values are for the corresponding
2044
            calibration job.
2045
        lasttype (str or None): the obstype of the last individual exposure row to be processed.
2046
        internal_id (int): an internal identifier unique to each job. Increments with each new job. This
2047
            is the smallest unassigned value.
2048
        z_submit_types (list of str): The "group" types of redshifts that should be submitted with each
2049
            exposure. If not specified or None, then no redshifts are submitted.
2050
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
2051
            0 which runs the code normally.
2052
            1 writes all files but doesn't submit any jobs to Slurm.
2053
            2 writes tables but doesn't write scripts or submit anything.
2054
            3 Doesn't write or submit anything but queries Slurm normally for job status.
2055
            4 Doesn't write, submit jobs, or query Slurm.
2056
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
2057
        queue (str, optional): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
2058
        reservation (str, optional): The reservation to submit jobs to. If None, it is not submitted to a reservation.
2059
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
2060
            less desirable because e.g. the sciences can run with SVN default calibrations rather
2061
            than failing completely from failed calibrations. Default is False.
2062
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
2063
            data products for the script being submitted. If all files exist and this is True,
2064
            then the script will not be submitted. If some files exist and this is True, only the
2065
            subset of the cameras without the final data products will be generated and submitted.
2066
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
2067
            jobs with some prior data are pruned using PROCCAMWORD to only process the
2068
            remaining cameras not found to exist.
2069
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, permutter-gpu
2070

2071
    Returns:
2072
        tuple: A tuple containing:
2073

2074
        * ptable, Table, Processing table of all exposures that have been processed.
2075
        * calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
2076
          and 'nightlyflat'. Each key corresponds to a Table.Row or
2077
          None. The table.Row() values are for the corresponding
2078
          calibration job.
2079
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
2080
          (if currently processing that tile). May be empty if none identified yet or
2081
          we just submitted them for processing.
2082
        * internal_id, int, if no job is submitted, this is the same as the input, otherwise it is incremented upward from
2083
          from the input such that it represents the smallest unused ID.
2084
    """
2085
    if lasttype == 'science' and len(sciences) > 0:
×
2086
        log = get_logger()
×
2087
        skysubonly = np.array([sci['LASTSTEP'] == 'skysub' for sci in sciences])
×
2088
        if np.all(skysubonly):
×
2089
            log.error("Identified all exposures in joint fitting request as skysub-only. Not submitting")
×
2090
            sciences = []
×
2091
            return ptable, calibjobs, sciences, internal_id
×
2092

2093
        if np.any(skysubonly):
×
2094
            log.error("Identified skysub-only exposures in joint fitting request")
×
2095
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
2096
            log.info("LASTSTEP's: {}".format([row['LASTSTEP'] for row in sciences]))
×
2097
            sciences = (np.array(sciences,dtype=object)[~skysubonly]).tolist()
×
2098
            log.info("Removed skysub only exposures in joint fitting:")
×
2099
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
2100
            log.info("LASTSTEP's: {}".format([row['LASTSTEP'] for row in sciences]))
×
2101

2102
        from collections import Counter
×
2103
        tiles = np.array([sci['TILEID'] for sci in sciences])
×
2104
        counts = Counter(tiles)
×
2105
        if len(counts.most_common()) > 1:
×
2106
            log.error("Identified more than one tile in a joint fitting request")
×
2107
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
2108
            log.info("Tileid's: {}".format(tiles))
×
2109
            log.info("Returning without joint fitting any of these exposures.")
×
2110
            # most_common, nmost_common = counts.most_common()[0]
2111
            # if most_common == -99:
2112
            #     most_common, nmost_common = counts.most_common()[1]
2113
            # log.warning(f"Given multiple tiles to jointly fit: {counts}. "+
2114
            #             "Only processing the most common non-default " +
2115
            #             f"tile: {most_common} with {nmost_common} exposures")
2116
            # sciences = (np.array(sciences,dtype=object)[tiles == most_common]).tolist()
2117
            # log.info("Tiles and exposure id's being submitted for joint fitting:")
2118
            # log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
2119
            # log.info("Tileid's: {}".format([row['TILEID'] for row in sciences]))
2120
            sciences = []
×
2121
            return ptable, calibjobs, sciences, internal_id
×
2122

2123
        ptable, tilejob, internal_id = science_joint_fit(ptable, sciences, internal_id, z_submit_types=z_submit_types,
×
2124
                                                         dry_run=dry_run, queue=queue, reservation=reservation,
2125
                                                         strictly_successful=strictly_successful,
2126
                                                         check_for_outputs=check_for_outputs,
2127
                                                         resubmit_partial_complete=resubmit_partial_complete,
2128
                                                         system_name=system_name)
2129
        if tilejob is not None:
×
2130
            sciences = []
×
2131

2132
    elif lasttype == 'flat' and calibjobs['nightlyflat'] is None and len(flats) == 12:
×
2133
        ## Note here we have an assumption about the number of expected flats being greater than 11
2134
        ptable, calibjobs['nightlyflat'], internal_id \
×
2135
            = flat_joint_fit(ptable, flats, internal_id, dry_run=dry_run, queue=queue,
2136
                             reservation=reservation, strictly_successful=strictly_successful,
2137
                             check_for_outputs=check_for_outputs,
2138
                             resubmit_partial_complete=resubmit_partial_complete,
2139
                             system_name=system_name
2140
                            )
2141

2142
    elif lasttype == 'arc' and calibjobs['psfnight'] is None and len(arcs) == 5:
×
2143
        ## Note here we have an assumption about the number of expected arcs being greater than 4
2144
        ptable, calibjobs['psfnight'], internal_id \
×
2145
            = arc_joint_fit(ptable, arcs, internal_id, dry_run=dry_run, queue=queue,
2146
                            reservation=reservation, strictly_successful=strictly_successful,
2147
                            check_for_outputs=check_for_outputs,
2148
                            resubmit_partial_complete=resubmit_partial_complete,
2149
                            system_name=system_name
2150
                            )
2151
    return ptable, calibjobs, sciences, internal_id
×
2152

2153
def submit_tilenight_and_redshifts(ptable, sciences, calibjobs, internal_id, dry_run=0,
1✔
2154
                                  queue='realtime', reservation=None, strictly_successful=False,
2155
                                  check_for_outputs=True, resubmit_partial_complete=True,
2156
                                  system_name=None,use_specter=False, extra_job_args=None):
2157
    """
2158
    Takes all the state-ful data from daily processing and determines whether a tilenight job needs to be submitted.
2159

2160
    Args:
2161
        ptable (Table): Processing table of all exposures that have been processed.
2162
        sciences (list of dict): list of the most recent individual prestdstar science exposures
2163
            (if currently processing that tile). May be empty if none identified yet.
2164
        internal_id (int): an internal identifier unique to each job. Increments with each new job. This
2165
            is the smallest unassigned value.
2166
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
2167
            0 which runs the code normally.
2168
            1 writes all files but doesn't submit any jobs to Slurm.
2169
            2 writes tables but doesn't write scripts or submit anything.
2170
            3 Doesn't write or submit anything but queries Slurm normally for job status.
2171
            4 Doesn't write, submit jobs, or query Slurm.
2172
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
2173
        queue (str, optional): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
2174
        reservation (str, optional): The reservation to submit jobs to. If None, it is not submitted to a reservation.
2175
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
2176
            less desirable because e.g. the sciences can run with SVN default calibrations rather
2177
            than failing completely from failed calibrations. Default is False.
2178
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
2179
            data products for the script being submitted. If all files exist and this is True,
2180
            then the script will not be submitted. If some files exist and this is True, only the
2181
            subset of the cameras without the final data products will be generated and submitted.
2182
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
2183
            jobs with some prior data are pruned using PROCCAMWORD to only process the
2184
            remaining cameras not found to exist.
2185
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, permutter-gpu
2186
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
2187
        extra_job_args (dict, optional): Dictionary with key-value pairs that specify additional
2188
            information used for a specific type of job. Examples include
2189
            laststeps for tilenight, z_submit_types for redshifts, etc.
2190

2191
    Returns:
2192
        tuple: A tuple containing:
2193

2194
        * ptable, Table, Processing table of all exposures that have been processed.
2195
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
2196
          (if currently processing that tile). May be empty if none identified yet or
2197
          we just submitted them for processing.
2198
        * internal_id, int, if no job is submitted, this is the same as the input, otherwise it is incremented upward from
2199
          from the input such that it represents the smallest unused ID.
2200
    """
2201
    ptable, tnight, internal_id = submit_tilenight(ptable, sciences, calibjobs, internal_id,
1✔
2202
                                             queue=queue, reservation=reservation,
2203
                                             dry_run=dry_run, strictly_successful=strictly_successful,
2204
                                             resubmit_partial_complete=resubmit_partial_complete,
2205
                                             system_name=system_name,use_specter=use_specter,
2206
                                             extra_job_args=extra_job_args)
2207

2208
    z_submit_types = None
1✔
2209
    if 'z_submit_types'  in extra_job_args:
1✔
2210
        z_submit_types = extra_job_args['z_submit_types']
1✔
2211
        
2212
    ptable, internal_id = submit_redshifts(ptable, sciences, tnight, internal_id,
1✔
2213
                                    queue=queue, reservation=reservation,
2214
                                    dry_run=dry_run, strictly_successful=strictly_successful,
2215
                                    check_for_outputs=check_for_outputs,
2216
                                    resubmit_partial_complete=resubmit_partial_complete,
2217
                                    z_submit_types=z_submit_types,
2218
                                    system_name=system_name)
2219

2220
    if tnight is not None:
1✔
2221
        sciences = []
1✔
2222

2223
    return ptable, sciences, internal_id
1✔
2224

2225
def set_calibrator_flag(prows, ptable):
1✔
2226
    """
2227
    Sets the "CALIBRATOR" column of a procesing table row to 1 (integer representation of True)
2228
     for all input rows. Used within joint fitting code to flag the exposures that were input
2229
     to the psfnight or nightlyflat for later reference.
2230

2231
    Args:
2232
        prows, list or array of Table.Rows or dicts. The rows corresponding to the individual exposure jobs that are
2233
            inputs to the joint fit.
2234
        ptable, Table. The processing table where each row is a processed job.
2235

2236
    Returns:
2237
        Table: The same processing table as input except with added rows for the joint fit job and, in the case
2238
        of a stdstarfit, the poststdstar science exposure jobs.
2239
    """
2240
    for prow in prows:
1✔
2241
        ptable['CALIBRATOR'][ptable['INTID'] == prow['INTID']] = 1
1✔
2242
    return ptable
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc