• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

desihub / desispec / 11243997015

08 Oct 2024 05:56AM UTC coverage: 30.081% (-0.02%) from 30.102%
11243997015

push

github

akremin
fix queue bug and only update from queue in proc_night if dry run set appropriately

7 of 29 new or added lines in 3 files covered. (24.14%)

337 existing lines in 3 files now uncovered.

14628 of 48629 relevant lines covered (30.08%)

0.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.27
/py/desispec/workflow/processing.py
1
"""
2
desispec.workflow.processing
3
============================
4

5
"""
6
import sys, os, glob
1✔
7
import json
1✔
8
from astropy.io import fits
1✔
9
from astropy.table import Table, join
1✔
10
import numpy as np
1✔
11

12
import time, datetime
1✔
13
from collections import OrderedDict
1✔
14
import subprocess
1✔
15

16
from desispec.scripts.link_calibnight import derive_include_exclude
1✔
17
from desispec.scripts.tile_redshifts import generate_tile_redshift_scripts
1✔
18
from desispec.workflow.redshifts import get_ztile_script_pathname, \
1✔
19
    get_ztile_relpath, \
20
    get_ztile_script_suffix
21
from desispec.workflow.exptable import read_minimal_science_exptab_cols
1✔
22
from desispec.workflow.queue import get_resubmission_states, update_from_queue, \
1✔
23
    queue_info_from_qids, get_queue_states_from_qids, update_queue_state_cache, \
24
    get_non_final_states
25
from desispec.workflow.timing import what_night_is_it
1✔
26
from desispec.workflow.desi_proc_funcs import get_desi_proc_batch_file_pathname, \
1✔
27
    create_desi_proc_batch_script, \
28
    get_desi_proc_batch_file_path, \
29
    get_desi_proc_tilenight_batch_file_pathname, \
30
    create_desi_proc_tilenight_batch_script, create_linkcal_batch_script
31
from desispec.workflow.batch import parse_reservation
1✔
32
from desispec.workflow.utils import pathjoin, sleep_and_report, \
1✔
33
    load_override_file
34
from desispec.workflow.tableio import write_table, load_table
1✔
35
from desispec.workflow.proctable import table_row_to_dict, erow_to_prow, \
1✔
36
    read_minimal_tilenight_proctab_cols, read_minimal_full_proctab_cols, \
37
    update_full_ptab_cache, default_prow, get_default_qid
38
from desiutil.log import get_logger
1✔
39

40
from desispec.io import findfile, specprod_root
1✔
41
from desispec.io.util import decode_camword, create_camword, \
1✔
42
    difference_camwords, \
43
    camword_to_spectros, camword_union, camword_intersection, parse_badamps
44

45

46
#################################################
47
############## Misc Functions ###################
48
#################################################
49
def night_to_starting_iid(night=None):
1✔
50
    """
51
    Creates an internal ID for a given night. The resulting integer is an 8 digit number.
52
    The digits are YYMMDDxxx where YY is the years since 2000, MM and DD are the month and day. xxx are 000,
53
    and are incremented for up to 1000 unique job ID's for a given night.
54

55
    Args:
56
        night (str or int): YYYYMMDD of the night to get the starting internal ID for.
57

58
    Returns:
59
        int: 9 digit number consisting of YYMMDD000. YY is years after 2000, MMDD is month and day.
60
        000 being the starting job number (0).
61
    """
62
    if night is None:
1✔
63
        night = what_night_is_it()
×
64
    night = int(night)
1✔
65
    internal_id = (night - 20000000) * 1000
1✔
66
    return internal_id
1✔
67

68
class ProcessingParams():
1✔
69
    def __init__(self, dry_run_level=0, queue='realtime',
1✔
70
                 reservation=None, strictly_successful=True,
71
                 check_for_outputs=True,
72
                 resubmit_partial_complete=True,
73
                 system_name='perlmutter', use_specter=True):
74

75
        self.dry_run_level = dry_run_level
×
76
        self.system_name = system_name
×
77
        self.queue = queue
×
78
        self.reservation = reservation
×
79
        self.strictly_successful = strictly_successful
×
80
        self.check_for_outputs = check_for_outputs
×
81
        self.resubmit_partial_complete = resubmit_partial_complete
×
82
        self.use_specter = use_specter
×
83

84
#################################################
85
############ Script Functions ###################
86
#################################################
87
def batch_script_name(prow):
1✔
88
    """
89
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, JOBDESC, PROCCAMWORD defined)
90
    and determines the script file pathname as defined by desi_proc's helper functions.
91

92
    Args:
93
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
94

95
    Returns:
96
        str: The complete pathname to the script file, as it is defined within the desi_proc ecosystem.
97
    """
98
    expids = prow['EXPID']
1✔
99
    if len(expids) == 0:
1✔
100
        expids = None
1✔
101
    if prow['JOBDESC'] == 'tilenight':
1✔
102
        pathname = get_desi_proc_tilenight_batch_file_pathname(night = prow['NIGHT'], tileid=prow['TILEID'])
1✔
103
    else:
104
        pathname = get_desi_proc_batch_file_pathname(night = prow['NIGHT'], exp=expids, \
1✔
105
                                             jobdesc=prow['JOBDESC'], cameras=prow['PROCCAMWORD'])
106
    scriptfile =  pathname + '.slurm'
1✔
107
    return scriptfile
1✔
108

109
def get_jobdesc_to_file_map():
1✔
110
    """
111
    Returns a mapping of job descriptions to the filenames of the output files
112

113
    Args:
114
        None
115

116
    Returns:
117
        dict. Dictionary with keys as lowercase job descriptions and to the
118
            filename of their expected outputs.
119

120
    """
121
    return {'prestdstar': 'sframe',
1✔
122
            'stdstarfit': 'stdstars',
123
            'poststdstar': 'cframe',
124
            'nightlybias': 'biasnight',
125
            # 'ccdcalib': 'badcolumns',
126
            'badcol': 'badcolumns',
127
            'arc': 'fitpsf',
128
            'flat': 'fiberflat',
129
            'psfnight': 'psfnight',
130
            'nightlyflat': 'fiberflatnight',
131
            'spectra': 'spectra_tile',
132
            'coadds': 'coadds_tile',
133
            'redshift': 'redrock_tile'}
134

135
def get_file_to_jobdesc_map():
1✔
136
    """
137
    Returns a mapping of output filenames to job descriptions
138

139
    Args:
140
        None
141

142
    Returns:
143
        dict. Dictionary with keys as filename of their expected outputs to
144
            the lowercase job descriptions
145
            .
146

147
    """
148
    job_to_file_map = get_jobdesc_to_file_map()
×
149
    job_to_file_map.pop('badcol') # these files can also be in a ccdcalib job
×
150
    job_to_file_map.pop('nightlybias') # these files can also be in a ccdcalib job
×
151
    return {value: key for key, value in job_to_file_map.items()}
×
152

153
def check_for_outputs_on_disk(prow, resubmit_partial_complete=True):
1✔
154
    """
155
    Args:
156
        prow (Table.Row or dict): Must include keyword accessible definitions for processing_table columns found in
157
            desispect.workflow.proctable.get_processing_table_column_defs()
158
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
159
            jobs with some prior data are pruned using PROCCAMWORD to only process the
160
            remaining cameras not found to exist.
161

162
    Returns:
163
        Table.Row or dict: The same prow type and keywords as input except with modified values updated to reflect
164
        the change in job status after creating and submitting the job for processing.
165
    """
166
    prow['STATUS'] = 'UNKNOWN'
1✔
167
    log = get_logger()
1✔
168

169
    if prow['JOBDESC'] in ['linkcal', 'ccdcalib']:
1✔
170
        log.info(f"jobdesc={prow['JOBDESC']} has indeterminated outputs, so "
1✔
171
                + "not checking for files on disk.")
172
        return prow
1✔
173

174
    job_to_file_map = get_jobdesc_to_file_map()
1✔
175

176
    night = prow['NIGHT']
1✔
177
    if prow['JOBDESC'] in ['cumulative','pernight-v0','pernight','perexp']:
1✔
178
        filetype = 'redrock_tile'
1✔
179
    else:
180
        filetype = job_to_file_map[prow['JOBDESC']]
1✔
181
    orig_camword = prow['PROCCAMWORD']
1✔
182

183
    ## if spectro based, look for spectros, else look for cameras
184
    if prow['JOBDESC'] in ['stdstarfit','spectra','coadds','redshift']:
1✔
185
        ## Spectrograph based
186
        spectros = camword_to_spectros(prow['PROCCAMWORD'])
×
187
        n_desired = len(spectros)
×
188
        ## Suppress outputs about using tile based files in findfile if only looking for stdstarfits
189
        if prow['JOBDESC'] == 'stdstarfit':
×
190
            tileid = None
×
191
        else:
192
            tileid = prow['TILEID']
×
193
        expid = prow['EXPID'][0]
×
194
        existing_spectros = []
×
195
        for spectro in spectros:
×
196
            if os.path.exists(findfile(filetype=filetype, night=night, expid=expid, spectrograph=spectro, tile=tileid)):
×
197
                existing_spectros.append(spectro)
×
198
        completed = (len(existing_spectros) == n_desired)
×
199
        if not completed and resubmit_partial_complete and len(existing_spectros) > 0:
×
200
            existing_camword = 'a' + ''.join([str(spec) for spec in sorted(existing_spectros)])
×
201
            prow['PROCCAMWORD'] = difference_camwords(prow['PROCCAMWORD'],existing_camword)
×
202
    elif prow['JOBDESC'] in ['cumulative','pernight-v0','pernight','perexp']:
1✔
203
        ## Spectrograph based
204
        spectros = camword_to_spectros(prow['PROCCAMWORD'])
1✔
205
        n_desired = len(spectros)
1✔
206
        ## Suppress outputs about using tile based files in findfile if only looking for stdstarfits
207
        tileid = prow['TILEID']
1✔
208
        expid = prow['EXPID'][0]
1✔
209
        redux_dir = specprod_root()
1✔
210
        outdir = os.path.join(redux_dir,get_ztile_relpath(tileid,group=prow['JOBDESC'],night=night,expid=expid))
1✔
211
        suffix = get_ztile_script_suffix(tileid, group=prow['JOBDESC'], night=night, expid=expid)
1✔
212
        existing_spectros = []
1✔
213
        for spectro in spectros:
1✔
214
            if os.path.exists(os.path.join(outdir, f"redrock-{spectro}-{suffix}.fits")):
1✔
215
                existing_spectros.append(spectro)
×
216
        completed = (len(existing_spectros) == n_desired)
1✔
217
        if not completed and resubmit_partial_complete and len(existing_spectros) > 0:
1✔
218
            existing_camword = 'a' + ''.join([str(spec) for spec in sorted(existing_spectros)])
×
219
            prow['PROCCAMWORD'] = difference_camwords(prow['PROCCAMWORD'],existing_camword)
×
220
    else:
221
        ## Otheriwse camera based
222
        cameras = decode_camword(prow['PROCCAMWORD'])
1✔
223
        n_desired = len(cameras)
1✔
224
        if len(prow['EXPID']) > 0:
1✔
225
            expid = prow['EXPID'][0]
1✔
226
        else:
227
            expid = None
1✔
228
        if len(prow['EXPID']) > 1 and prow['JOBDESC'] not in ['psfnight','nightlyflat']:
1✔
229
            log.warning(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']}. This job type only makes " +
×
230
                     f"sense with a single exposure. Proceeding with {expid}.")
231
        missing_cameras = []
1✔
232
        for cam in cameras:
1✔
233
            if not os.path.exists(findfile(filetype=filetype, night=night, expid=expid, camera=cam)):
1✔
234
                missing_cameras.append(cam)
1✔
235
        completed = (len(missing_cameras) == 0)
1✔
236
        if not completed and resubmit_partial_complete and len(missing_cameras) < n_desired:
1✔
237
            prow['PROCCAMWORD'] = create_camword(missing_cameras)
×
238

239
    if completed:
1✔
240
        prow['STATUS'] = 'COMPLETED'
×
241
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} already has " +
×
242
                 f"the desired {n_desired} {filetype}'s. Not submitting this job.")
243
    elif resubmit_partial_complete and orig_camword != prow['PROCCAMWORD']:
1✔
244
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} already has " +
×
245
                 f"some {filetype}'s. Submitting smaller camword={prow['PROCCAMWORD']}.")
246
    elif not resubmit_partial_complete:
1✔
247
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} doesn't have all " +
×
248
                 f"{filetype}'s and resubmit_partial_complete=False. "+
249
                 f"Submitting full camword={prow['PROCCAMWORD']}.")
250
    else:
251
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} has no " +
1✔
252
                 f"existing {filetype}'s. Submitting full camword={prow['PROCCAMWORD']}.")
253
    return prow
1✔
254

255
def create_and_submit(prow, queue='realtime', reservation=None, dry_run=0,
1✔
256
                      joint=False, strictly_successful=False,
257
                      check_for_outputs=True, resubmit_partial_complete=True,
258
                      system_name=None, use_specter=False,
259
                      extra_job_args=None):
260
    """
261
    Wrapper script that takes a processing table row and three modifier keywords, creates a submission script for the
262
    compute nodes, and then submits that script to the Slurm scheduler with appropriate dependencies.
263

264
    Args:
265
        prow (Table.Row or dict): Must include keyword accessible definitions for processing_table columns found in
266
            desispect.workflow.proctable.get_processing_table_column_defs()
267
        queue (str, optional): The name of the NERSC Slurm queue to submit to. Default is the realtime queue.
268
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
269
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
270
            0 which runs the code normally.
271
            1 writes all files but doesn't submit any jobs to Slurm.
272
            2 writes tables but doesn't write scripts or submit anything.
273
            3 Doesn't write or submit anything but queries Slurm normally for job status.
274
            4 Doesn't write, submit jobs, or query Slurm.
275
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
276
        joint (bool, optional): Whether this is a joint fitting job (the job involves multiple exposures) and therefore needs to be
277
            run with desi_proc_joint_fit. Default is False.
278
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
279
            less desirable because e.g. the sciences can run with SVN default calibrations rather
280
            than failing completely from failed calibrations. Default is False.
281
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
282
            data products for the script being submitted. If all files exist and this is True,
283
            then the script will not be submitted. If some files exist and this is True, only the
284
            subset of the cameras without the final data products will be generated and submitted.
285
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
286
            jobs with some prior data are pruned using PROCCAMWORD to only process the
287
            remaining cameras not found to exist.
288
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
289
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
290
        extra_job_args (dict): Dictionary with key-value pairs that specify additional
291
            information used for a specific type of job. Examples include refnight
292
            and include/exclude lists for linkcals, laststeps for for tilenight, etc.
293

294
    Returns:
295
        Table.Row or dict: The same prow type and keywords as input except with modified values updated to reflect
296
        the change in job status after creating and submitting the job for processing.
297

298
    Note:
299
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
300
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
301
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
302
    """
303
    orig_prow = prow.copy()
1✔
304
    if check_for_outputs:
1✔
305
        prow = check_for_outputs_on_disk(prow, resubmit_partial_complete)
1✔
306
        if prow['STATUS'].upper() == 'COMPLETED':
1✔
UNCOV
307
            return prow
×
308

309
    prow = create_batch_script(prow, queue=queue, dry_run=dry_run, joint=joint,
1✔
310
                               system_name=system_name, use_specter=use_specter,
311
                               extra_job_args=extra_job_args)
312
    prow = submit_batch_script(prow, reservation=reservation, dry_run=dry_run,
1✔
313
                               strictly_successful=strictly_successful)
314

315
    ## If resubmitted partial, the PROCCAMWORD and SCRIPTNAME will correspond
316
    ## to the pruned values. But we want to
317
    ## retain the full job's value, so get those from the old job.
318
    if resubmit_partial_complete:
1✔
319
        prow['PROCCAMWORD'] = orig_prow['PROCCAMWORD']
1✔
320
        prow['SCRIPTNAME'] = orig_prow['SCRIPTNAME']
1✔
321
    return prow
1✔
322

323
def desi_link_calibnight_command(prow, refnight, include=None):
1✔
324
    """
325
    Wrapper script that takes a processing table row (or dictionary with
326
    REFNIGHT, NIGHT, PROCCAMWORD defined) and determines the proper command
327
    line call to link data defined by the input row/dict.
328

329
    Args:
330
        prow (Table.Row or dict): Must include keyword accessible definitions
331
            for 'NIGHT', 'REFNIGHT', and 'PROCCAMWORD'.
332
        refnight (str or int): The night with a valid set of calibrations
333
            be created.
334
        include (list): The filetypes to include in the linking.
335
    Returns:
336
        str: The proper command to be submitted to desi_link_calibnight
337
            to process the job defined by the prow values.
338
    """
339
    cmd = 'desi_link_calibnight'
1✔
340
    cmd += f' --refnight={refnight}'
1✔
341
    cmd += f' --newnight={prow["NIGHT"]}'
1✔
342
    cmd += f' --cameras={prow["PROCCAMWORD"]}'
1✔
343
    if include is not None:
1✔
344
        cmd += f' --include=' + ','.join(list(include))
1✔
345
    return cmd
1✔
346

347
def desi_proc_command(prow, system_name, use_specter=False, queue=None):
1✔
348
    """
349
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, OBSTYPE, JOBDESC, PROCCAMWORD defined)
350
    and determines the proper command line call to process the data defined by the input row/dict.
351

352
    Args:
353
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
354
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, perlmutter-gpu
355
        queue (str, optional): The name of the NERSC Slurm queue to submit to. Default is None (which leaves it to the desi_proc default).
356
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
357

358
    Returns:
359
        str: The proper command to be submitted to desi_proc to process the job defined by the prow values.
360
    """
361
    cmd = 'desi_proc'
1✔
362
    cmd += ' --batch'
1✔
363
    cmd += ' --nosubmit'
1✔
364
    if queue is not None:
1✔
365
        cmd += f' -q {queue}'
1✔
366
    if prow['OBSTYPE'].lower() == 'science':
1✔
367
        if prow['JOBDESC'] == 'prestdstar':
×
UNCOV
368
            cmd += ' --nostdstarfit --nofluxcalib'
×
369
        elif prow['JOBDESC'] == 'poststdstar':
×
370
            cmd += ' --noprestdstarfit --nostdstarfit'
×
371

372
        if use_specter:
×
UNCOV
373
            cmd += ' --use-specter'
×
374
    elif prow['JOBDESC'] in ['flat', 'prestdstar'] and use_specter:
1✔
UNCOV
375
        cmd += ' --use-specter'
×
376
    pcamw = str(prow['PROCCAMWORD'])
1✔
377
    cmd += f" --cameras={pcamw} -n {prow['NIGHT']}"
1✔
378
    if len(prow['EXPID']) > 0:
1✔
379
        ## If ccdcalib job without a dark exposure, don't assign the flat expid
380
        ## since it would incorrectly process the flat using desi_proc
381
        if prow['OBSTYPE'].lower() != 'flat' or prow['JOBDESC'] != 'ccdcalib':
1✔
382
            cmd += f" -e {prow['EXPID'][0]}"
1✔
383
    if prow['BADAMPS'] != '':
1✔
UNCOV
384
        cmd += ' --badamps={}'.format(prow['BADAMPS'])
×
385
    return cmd
1✔
386

387
def desi_proc_joint_fit_command(prow, queue=None):
1✔
388
    """
389
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, OBSTYPE, PROCCAMWORD defined)
390
    and determines the proper command line call to process the data defined by the input row/dict.
391

392
    Args:
393
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
394
        queue (str): The name of the NERSC Slurm queue to submit to. Default is None (which leaves it to the desi_proc default).
395

396
    Returns:
397
        str: The proper command to be submitted to desi_proc_joint_fit
398
            to process the job defined by the prow values.
399
    """
400
    cmd = 'desi_proc_joint_fit'
1✔
401
    cmd += ' --batch'
1✔
402
    cmd += ' --nosubmit'
1✔
403
    if queue is not None:
1✔
404
        cmd += f' -q {queue}'
1✔
405

406
    descriptor = prow['OBSTYPE'].lower()
1✔
407

408
    night = prow['NIGHT']
1✔
409
    specs = str(prow['PROCCAMWORD'])
1✔
410
    expid_str = ','.join([str(eid) for eid in prow['EXPID']])
1✔
411

412
    cmd += f' --obstype {descriptor}'
1✔
413
    cmd += f' --cameras={specs} -n {night}'
1✔
414
    if len(expid_str) > 0:
1✔
415
        cmd += f' -e {expid_str}'
1✔
416
    return cmd
1✔
417

418

419
def create_batch_script(prow, queue='realtime', dry_run=0, joint=False,
1✔
420
                        system_name=None, use_specter=False, extra_job_args=None):
421
    """
422
    Wrapper script that takes a processing table row and three modifier keywords and creates a submission script for the
423
    compute nodes.
424

425
    Args:
426
        prow, Table.Row or dict. Must include keyword accessible definitions for processing_table columns found in
427
            desispect.workflow.proctable.get_processing_table_column_defs()
428
        queue, str. The name of the NERSC Slurm queue to submit to. Default is the realtime queue.
429
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
430
            0 which runs the code normally.
431
            1 writes all files but doesn't submit any jobs to Slurm.
432
            2 writes tables but doesn't write scripts or submit anything.
433
            3 Doesn't write or submit anything but queries Slurm normally for job status.
434
            4 Doesn't write, submit jobs, or query Slurm.
435
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
436
        joint, bool. Whether this is a joint fitting job (the job involves multiple exposures) and therefore needs to be
437
            run with desi_proc_joint_fit when not using tilenight. Default is False.
438
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
439
        use_specter, bool, optional. Default is False. If True, use specter, otherwise use gpu_specter by default.
440
        extra_job_args (dict): Dictionary with key-value pairs that specify additional
441
            information used for a specific type of job. Examples include refnight
442
            and include/exclude lists for linkcal, laststeps for tilenight, etc.
443

444
    Returns:
445
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
446
        scriptname.
447

448
    Note:
449
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
450
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
451
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
452
    """
453
    log = get_logger()
1✔
454

455
    if extra_job_args is None:
1✔
456
        extra_job_args = {}
1✔
457

458
    if prow['JOBDESC'] in ['perexp','pernight','pernight-v0','cumulative']:
1✔
459
        if dry_run > 1:
1✔
460
            scriptpathname = get_ztile_script_pathname(tileid=prow['TILEID'],group=prow['JOBDESC'],
1✔
461
                                                               night=prow['NIGHT'], expid=prow['EXPID'][0])
462

463
            log.info("Output file would have been: {}".format(scriptpathname))
1✔
464
        else:
465
            #- run zmtl for cumulative redshifts but not others
466
            run_zmtl = (prow['JOBDESC'] == 'cumulative')
1✔
467
            no_afterburners = False
1✔
468
            print(f"entering tileredshiftscript: {prow}")
1✔
469
            scripts, failed_scripts = generate_tile_redshift_scripts(tileid=prow['TILEID'], group=prow['JOBDESC'],
1✔
470
                                                                     nights=[prow['NIGHT']], expids=prow['EXPID'],
471
                                                                     batch_queue=queue, system_name=system_name,
472
                                                                     run_zmtl=run_zmtl,
473
                                                                     no_afterburners=no_afterburners,
474
                                                                     nosubmit=True)
475
            if len(failed_scripts) > 0:
1✔
476
                log.error(f"Redshifts failed for group={prow['JOBDESC']}, night={prow['NIGHT']}, "+
×
477
                          f"tileid={prow['TILEID']}, expid={prow['EXPID']}.")
478
                log.info(f"Returned failed scriptname is {failed_scripts}")
×
479
            elif len(scripts) > 1:
1✔
480
                log.error(f"More than one redshifts returned for group={prow['JOBDESC']}, night={prow['NIGHT']}, "+
×
481
                          f"tileid={prow['TILEID']}, expid={prow['EXPID']}.")
UNCOV
482
                log.info(f"Returned scriptnames were {scripts}")
×
483
            elif len(scripts) == 0:
1✔
UNCOV
484
                msg = f'No scripts were generated for {prow=}'
×
UNCOV
485
                log.critical(prow)
×
UNCOV
486
                raise ValueError(msg)
×
487
            else:
488
                scriptpathname = scripts[0]
1✔
489

490
    elif prow['JOBDESC'] == 'linkcal':
1✔
491
        refnight, include, exclude = -99, None, None
1✔
492
        if 'refnight' in extra_job_args:
1✔
493
            refnight = extra_job_args['refnight']
1✔
494
        if 'include' in extra_job_args:
1✔
495
            include = extra_job_args['include']
1✔
496
        if 'exclude' in extra_job_args:
1✔
UNCOV
497
            exclude = extra_job_args['exclude']
×
498
        include, exclude = derive_include_exclude(include, exclude)
1✔
499
        ## Fiberflatnights shouldn't to be generated with psfs from same time, so
500
        ## shouldn't link psfs without also linking fiberflatnight
501
        ## However, this should be checked at a higher level. If set here,
502
        ## go ahead and do it
503
        # if 'psfnight' in include and not 'fiberflatnight' in include:
504
        #     err = "Must link fiberflatnight if linking psfnight"
505
        #     log.error(err)
506
        #     raise ValueError(err)
507
        if dry_run > 1:
1✔
508
            scriptpathname = batch_script_name(prow)
1✔
509
            log.info("Output file would have been: {}".format(scriptpathname))
1✔
510
            cmd = desi_link_calibnight_command(prow, refnight, include)
1✔
511
            log.info("Command to be run: {}".format(cmd.split()))
1✔
512
        else:
513
            if refnight == -99:
×
514
                err = f'For {prow=} asked to link calibration but not given' \
×
515
                      + ' a valid refnight'
UNCOV
516
                log.error(err)
×
UNCOV
517
                raise ValueError(err)
×
518

UNCOV
519
            cmd = desi_link_calibnight_command(prow, refnight, include)
×
UNCOV
520
            log.info(f"Running: {cmd.split()}")
×
UNCOV
521
            scriptpathname = create_linkcal_batch_script(newnight=prow['NIGHT'],
×
522
                                                        cameras=prow['PROCCAMWORD'],
523
                                                        queue=queue,
524
                                                        cmd=cmd,
525
                                                        system_name=system_name)
526
    else:
527
        if prow['JOBDESC'] != 'tilenight':
1✔
528
            nightlybias, nightlycte, cte_expids = False, False, None
1✔
529
            if 'nightlybias' in extra_job_args:
1✔
530
                nightlybias = extra_job_args['nightlybias']
1✔
531
            elif prow['JOBDESC'].lower() == 'nightlybias':
1✔
532
                nightlybias = True
1✔
533
            if 'nightlycte' in extra_job_args:
1✔
534
                nightlycte = extra_job_args['nightlycte']
1✔
535
            if 'cte_expids' in extra_job_args:
1✔
536
                cte_expids = extra_job_args['cte_expids']
1✔
537
            ## run known joint jobs as joint even if unspecified
538
            ## in the future we can eliminate the need for "joint"
539
            if joint or prow['JOBDESC'].lower() in ['psfnight', 'nightlyflat']:
1✔
540
                cmd = desi_proc_joint_fit_command(prow, queue=queue)
1✔
541
                ## For consistency with how we edit the other commands, do them
542
                ## here, but future TODO would be to move these into the command
543
                ## generation itself
544
                if 'extra_cmd_args' in extra_job_args:
1✔
545
                    cmd += ' ' + ' '.join(np.atleast_1d(extra_job_args['extra_cmd_args']))
1✔
546
            else:
547
                cmd = desi_proc_command(prow, system_name, use_specter, queue=queue)
1✔
548
                if nightlybias:
1✔
549
                    cmd += ' --nightlybias'
1✔
550
                if nightlycte:
1✔
551
                    cmd += ' --nightlycte'
1✔
552
                    if cte_expids is not None:
1✔
553
                        cmd += ' --cte-expids '
1✔
554
                        cmd += ','.join(np.atleast_1d(cte_expids).astype(str))
1✔
555
        if dry_run > 1:
1✔
556
            scriptpathname = batch_script_name(prow)
1✔
557
            log.info("Output file would have been: {}".format(scriptpathname))
1✔
558
            if prow['JOBDESC'] != 'tilenight':
1✔
559
                log.info("Command to be run: {}".format(cmd.split()))
1✔
560
        else:
561
            expids = prow['EXPID']
1✔
562
            if len(expids) == 0:
1✔
UNCOV
563
                expids = None
×
564

565
            if prow['JOBDESC'] == 'tilenight':
1✔
566
                log.info("Creating tilenight script for tile {}".format(prow['TILEID']))
1✔
567
                if 'laststeps' in extra_job_args:
1✔
568
                    laststeps = extra_job_args['laststeps']
1✔
569
                else:
UNCOV
570
                    err = f'{prow=} job did not specify last steps to tilenight'
×
UNCOV
571
                    log.error(err)
×
UNCOV
572
                    raise ValueError(err)
×
573
                ncameras = len(decode_camword(prow['PROCCAMWORD']))
1✔
574
                scriptpathname = create_desi_proc_tilenight_batch_script(
1✔
575
                                                               night=prow['NIGHT'], exp=expids,
576
                                                               tileid=prow['TILEID'],
577
                                                               ncameras=ncameras,
578
                                                               queue=queue,
579
                                                               mpistdstars=True,
580
                                                               use_specter=use_specter,
581
                                                               system_name=system_name,
582
                                                               laststeps=laststeps)
583
            else:
584
                if expids is not None and len(expids) > 1:
1✔
585
                    expids = expids[:1]
1✔
586
                log.info("Running: {}".format(cmd.split()))
1✔
587
                scriptpathname = create_desi_proc_batch_script(night=prow['NIGHT'], exp=expids,
1✔
588
                                                               cameras=prow['PROCCAMWORD'],
589
                                                               jobdesc=prow['JOBDESC'],
590
                                                               queue=queue, cmdline=cmd,
591
                                                               use_specter=use_specter,
592
                                                               system_name=system_name,
593
                                                               nightlybias=nightlybias,
594
                                                               nightlycte=nightlycte,
595
                                                               cte_expids=cte_expids)
596
    log.info("Outfile is: {}".format(scriptpathname))
1✔
597
    prow['SCRIPTNAME'] = os.path.basename(scriptpathname)
1✔
598
    return prow
1✔
599

600
_fake_qid = int(time.time() - 1.7e9)
1✔
601
def _get_fake_qid():
1✔
602
    """
603
    Return fake slurm queue jobid to use for dry-run testing
604
    """
605
    # Note: not implemented as a yield generator so that this returns a
606
    # genuine int, not a generator object
607
    global _fake_qid
608
    _fake_qid += 1
1✔
609
    return _fake_qid
1✔
610

611
def submit_batch_script(prow, dry_run=0, reservation=None, strictly_successful=False):
1✔
612
    """
613
    Wrapper script that takes a processing table row and three modifier keywords and submits the scripts to the Slurm
614
    scheduler.
615

616
    Args:
617
        prow, Table.Row or dict. Must include keyword accessible definitions for processing_table columns found in
618
            desispect.workflow.proctable.get_processing_table_column_defs()
619
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
620
            0 which runs the code normally.
621
            1 writes all files but doesn't submit any jobs to Slurm.
622
            2 writes tables but doesn't write scripts or submit anything.
623
            3 Doesn't write or submit anything but queries Slurm normally for job status.
624
            4 Doesn't write, submit jobs, or query Slurm.
625
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
626
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
627
        strictly_successful, bool. Whether all jobs require all inputs to have succeeded. For daily processing, this is
628
            less desirable because e.g. the sciences can run with SVN default calibrations rather
629
            than failing completely from failed calibrations. Default is False.
630

631
    Returns:
632
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
633
        scriptname.
634

635
    Note:
636
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
637
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
638
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
639
    """
640
    log = get_logger()
1✔
641
    dep_qids = prow['LATEST_DEP_QID']
1✔
642
    dep_list, dep_str = '', ''
1✔
643

644
    ## With desi_proc_night we now either resubmit failed jobs or exit, so this
645
    ## should no longer be necessary in the normal workflow.
646
    # workaround for sbatch --dependency bug not tracking jobs correctly
647
    # see NERSC TICKET INC0203024
648
    failed_dependency = False
1✔
649
    if len(dep_qids) > 0 and not dry_run:
1✔
650
        non_final_states = get_non_final_states()
×
UNCOV
651
        state_dict = get_queue_states_from_qids(dep_qids, dry_run_level=dry_run, use_cache=True)
×
UNCOV
652
        still_depids = []
×
653
        for depid in dep_qids:
×
UNCOV
654
            if depid in state_dict.keys():
×
655
                if state_dict[int(depid)] == 'COMPLETED':
×
UNCOV
656
                   log.info(f"removing completed jobid {depid}")
×
657
                elif state_dict[int(depid)] not in non_final_states:
×
658
                    failed_dependency = True
×
UNCOV
659
                    log.info("Found a dependency in a bad final state="
×
660
                             + f"{state_dict[int(depid)]} for depjobid={depid},"
661
                             + " not submitting this job.")
UNCOV
662
                    still_depids.append(depid)
×
663
                else:
664
                    still_depids.append(depid)
×
665
            else:
666
                still_depids.append(depid)
×
UNCOV
667
        dep_qids = np.array(still_depids)
×
668

669
    if len(dep_qids) > 0:
1✔
670
        jobtype = prow['JOBDESC']
1✔
671
        if strictly_successful:
1✔
672
            depcond = 'afterok'
1✔
UNCOV
673
        elif jobtype in ['arc', 'psfnight', 'prestdstar', 'stdstarfit']:
×
674
            ## (though psfnight and stdstarfit will require some inputs otherwise they'll go up in flames)
675
            depcond = 'afterany'
×
676
        else:
677
            ## if 'flat','nightlyflat','poststdstar', or any type of redshift, require strict success of inputs
678
            depcond = 'afterok'
×
679

680
        dep_str = f'--dependency={depcond}:'
1✔
681

682
        if np.isscalar(dep_qids):
1✔
UNCOV
683
            dep_list = str(dep_qids).strip(' \t')
×
UNCOV
684
            if dep_list == '':
×
UNCOV
685
                dep_str = ''
×
686
            else:
UNCOV
687
                dep_str += dep_list
×
688
        else:
689
            if len(dep_qids)>1:
1✔
690
                dep_list = ':'.join(np.array(dep_qids).astype(str))
1✔
691
                dep_str += dep_list
1✔
692
            elif len(dep_qids) == 1 and dep_qids[0] not in [None, 0]:
1✔
693
                dep_str += str(dep_qids[0])
1✔
694
            else:
UNCOV
695
                dep_str = ''
×
696

697
    # script = f'{jobname}.slurm'
698
    # script_path = pathjoin(batchdir, script)
699
    if prow['JOBDESC'] in ['pernight-v0','pernight','perexp','cumulative']:
1✔
700
        script_path = get_ztile_script_pathname(tileid=prow['TILEID'],group=prow['JOBDESC'],
1✔
701
                                                        night=prow['NIGHT'], expid=np.min(prow['EXPID']))
702
        jobname = os.path.basename(script_path)
1✔
703
    else:
704
        batchdir = get_desi_proc_batch_file_path(night=prow['NIGHT'])
1✔
705
        jobname = batch_script_name(prow)
1✔
706
        script_path = pathjoin(batchdir, jobname)
1✔
707

708
    batch_params = ['sbatch', '--parsable']
1✔
709
    if dep_str != '':
1✔
710
        batch_params.append(f'{dep_str}')
1✔
711

712
    reservation = parse_reservation(reservation, prow['JOBDESC'])
1✔
713
    if reservation is not None:
1✔
714
        batch_params.append(f'--reservation={reservation}')
×
715

716
    batch_params.append(f'{script_path}')
1✔
717
    submitted = True
1✔
718
    ## If dry_run give it a fake QID
719
    ## if a dependency has failed don't even try to submit the job because
720
    ## Slurm will refuse, instead just mark as unsubmitted.
721
    if dry_run:
1✔
722
        current_qid = _get_fake_qid()
1✔
723
    elif not failed_dependency:
×
724
        #- sbatch sometimes fails; try several times before giving up
725
        max_attempts = 3
×
726
        for attempt in range(max_attempts):
×
727
            try:
×
728
                current_qid = subprocess.check_output(batch_params, stderr=subprocess.STDOUT, text=True)
×
UNCOV
729
                current_qid = int(current_qid.strip(' \t\n'))
×
730
                break
×
UNCOV
731
            except subprocess.CalledProcessError as err:
×
732
                log.error(f'{jobname} submission failure at {datetime.datetime.now()}')
×
733
                log.error(f'{jobname}   {batch_params}')
×
734
                log.error(f'{jobname}   {err.output=}')
×
UNCOV
735
                if attempt < max_attempts - 1:
×
736
                    log.info('Sleeping 60 seconds then retrying')
×
737
                    time.sleep(60)
×
738
        else:  #- for/else happens if loop doesn't succeed
UNCOV
739
            msg = f'{jobname} submission failed {max_attempts} times.' \
×
740
                  + ' setting as unsubmitted and moving on'
UNCOV
741
            log.error(msg)
×
UNCOV
742
            current_qid = get_default_qid()
×
UNCOV
743
            submitted = False
×
744
    else:
UNCOV
745
        current_qid = get_default_qid()
×
UNCOV
746
        submitted = False
×
747

748
    ## Update prow with new information
749
    prow['LATEST_QID'] = current_qid
1✔
750

751
    ## If we didn't submit, don't say we did and don't add to ALL_QIDS
752
    if submitted:
1✔
753
        log.info(batch_params)
1✔
754
        log.info(f'Submitted {jobname} with dependencies {dep_str} and '
1✔
755
                 + f'reservation={reservation}. Returned qid: {current_qid}')
756

757
        ## Update prow with new information
758
        prow['ALL_QIDS'] = np.append(prow['ALL_QIDS'],current_qid)
1✔
759
        prow['STATUS'] = 'SUBMITTED'
1✔
760
        prow['SUBMIT_DATE'] = int(time.time())
1✔
761
    else:
UNCOV
762
        log.info(f"Would have submitted: {batch_params}")
×
UNCOV
763
        prow['STATUS'] = 'UNSUBMITTED'
×
764

765
        ## Update the Slurm jobid cache of job states
UNCOV
766
        update_queue_state_cache(qid=prow['LATEST_QID'], state=prow['STATUS'])
×
767

768
    return prow
1✔
769

770

771
#############################################
772
##########   Row Manipulations   ############
773
#############################################
774
def define_and_assign_dependency(prow, calibjobs, use_tilenight=False,
1✔
775
                                 refnight=None):
776
    """
777
    Given input processing row and possible calibjobs, this defines the
778
    JOBDESC keyword and assigns the dependency appropriate for the job type of
779
    prow.
780

781
    Args:
782
        prow, Table.Row or dict. Must include keyword accessible definitions for
783
            'OBSTYPE'. A row must have column names for
784
            'JOBDESC', 'INT_DEP_IDS', and 'LATEST_DEP_ID'.
785
        calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
786
            and 'nightlyflat'. Each key corresponds to a Table.Row or
787
            None. The table.Row() values are for the corresponding
788
            calibration job. Each value that isn't None must contain
789
            'INTID', and 'LATEST_QID'. If None, it assumes the
790
            dependency doesn't exist and no dependency is assigned.
791
        use_tilenight, bool. Default is False. If True, use desi_proc_tilenight
792
            for prestdstar, stdstar,and poststdstar steps for
793
            science exposures.
794
        refnight, int. The reference night for linking jobs
795

796
    Returns:
797
        Table.Row or dict: The same prow type and keywords as input except
798
        with modified values updated values for 'JOBDESC', 'INT_DEP_IDS'. and 'LATEST_DEP_ID'.
799

800
    Note:
801
        This modifies the input. Though Table.Row objects are generally copied
802
        on modification, so the change to the input object in memory may or may
803
        not be changed. As of writing, a row from a table given to this function
804
        will not change during the execution of this function (but can be
805
        overwritten explicitly with the returned row if desired).
806
    """
807
    if prow['OBSTYPE'] in ['science', 'twiflat']:
1✔
808
        if calibjobs['nightlyflat'] is not None:
1✔
809
            dependency = calibjobs['nightlyflat']
1✔
810
        elif calibjobs['psfnight'] is not None:
1✔
811
            dependency = calibjobs['psfnight']
1✔
812
        elif calibjobs['ccdcalib'] is not None:
1✔
813
            dependency = calibjobs['ccdcalib']
1✔
814
        elif calibjobs['nightlybias'] is not None:
1✔
UNCOV
815
            dependency = calibjobs['nightlybias']
×
816
        elif calibjobs['badcol'] is not None:
1✔
UNCOV
817
            dependency = calibjobs['badcol']
×
818
        else:
819
            dependency = calibjobs['linkcal']
1✔
820
        if not use_tilenight:
1✔
821
            prow['JOBDESC'] = 'prestdstar'
1✔
822
    elif prow['OBSTYPE'] == 'flat':
1✔
823
        if calibjobs['psfnight'] is not None:
1✔
824
            dependency = calibjobs['psfnight']
1✔
825
        elif calibjobs['ccdcalib'] is not None:
1✔
826
            dependency = calibjobs['ccdcalib']
1✔
827
        elif calibjobs['nightlybias'] is not None:
1✔
UNCOV
828
            dependency = calibjobs['nightlybias']
×
829
        elif calibjobs['badcol'] is not None:
1✔
830
            dependency = calibjobs['badcol']
×
831
        else:
832
            dependency = calibjobs['linkcal']
1✔
833
    elif prow['OBSTYPE'] == 'arc':
1✔
834
        if calibjobs['ccdcalib'] is not None:
1✔
835
            dependency = calibjobs['ccdcalib']
1✔
836
        elif calibjobs['nightlybias'] is not None:
1✔
837
            dependency = calibjobs['nightlybias']
1✔
838
        elif calibjobs['badcol'] is not None:
1✔
839
            dependency = calibjobs['badcol']
×
840
        else:
841
            dependency = calibjobs['linkcal']
1✔
842
    elif prow['JOBDESC'] in ['badcol', 'nightlybias', 'ccdcalib']:
1✔
843
        dependency = calibjobs['linkcal']
1✔
844
    elif prow['OBSTYPE'] == 'dark':
1✔
UNCOV
845
        if calibjobs['ccdcalib'] is not None:
×
UNCOV
846
            dependency = calibjobs['ccdcalib']
×
UNCOV
847
        elif calibjobs['nightlybias'] is not None:
×
UNCOV
848
            dependency = calibjobs['nightlybias']
×
849
        elif calibjobs['badcol'] is not None:
×
UNCOV
850
            dependency = calibjobs['badcol']
×
851
        else:
UNCOV
852
            dependency = calibjobs['linkcal']
×
853
    elif prow['JOBDESC'] == 'linkcal' and refnight is not None:
1✔
854
        dependency = None
1✔
855
        ## For link cals only, enable cross-night dependencies if available
856
        refproctable = findfile('proctable', night=refnight)
1✔
857
        if os.path.exists(refproctable):
1✔
858
            ptab = load_table(tablename=refproctable, tabletype='proctable')
×
859
            ## This isn't perfect because we may depend on jobs that aren't
860
            ## actually being linked
861
            ## Also allows us to proceed even if jobs don't exist yet
UNCOV
862
            deps = []
×
UNCOV
863
            for job in ['nightlybias', 'ccdcalib', 'psfnight', 'nightlyflat']:
×
UNCOV
864
                if job in ptab['JOBDESC']:
×
865
                    ## add prow to dependencies
UNCOV
866
                    deps.append(ptab[ptab['JOBDESC']==job][0])
×
UNCOV
867
            if len(deps) > 0:
×
UNCOV
868
                dependency = deps
×
869
    else:
UNCOV
870
        dependency = None
×
871

872
    prow = assign_dependency(prow, dependency)
1✔
873

874
    return prow
1✔
875

876

877
def assign_dependency(prow, dependency):
1✔
878
    """
879
    Given input processing row and possible arcjob (processing row for psfnight) and flatjob (processing row for
880
    nightlyflat), this defines the JOBDESC keyword and assigns the dependency appropriate for the job type of prow.
881

882
    Args:
883
        prow, Table.Row or dict. Must include keyword accessible definitions for 'OBSTYPE'. A row must have column names for
884
            'JOBDESC', 'INT_DEP_IDS', and 'LATEST_DEP_ID'.
885
        dependency, NoneType or scalar/list/array of Table.Row, dict. Processing row corresponding to the required input
886
            for the job in prow. This must contain keyword
887
            accessible values for 'INTID', and 'LATEST_QID'.
888
            If None, it assumes the dependency doesn't exist
889
            and no dependency is assigned.
890

891
    Returns:
892
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
893
        'JOBDESC', 'INT_DEP_IDS'. and 'LATEST_DEP_ID'.
894

895
    Note:
896
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
897
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
898
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
899
    """
900
    prow['INT_DEP_IDS'] = np.ndarray(shape=0).astype(int)
1✔
901
    prow['LATEST_DEP_QID'] = np.ndarray(shape=0).astype(int)
1✔
902
    if dependency is not None:
1✔
903
        if type(dependency) in [list, np.array]:
1✔
904
            ids, qids = [], []
1✔
905
            for curdep in dependency:
1✔
906
                ids.append(curdep['INTID'])
1✔
907
                if still_a_dependency(curdep):
1✔
908
                    # ids.append(curdep['INTID'])
909
                    qids.append(curdep['LATEST_QID'])
1✔
910
            prow['INT_DEP_IDS'] = np.array(ids, dtype=int)
1✔
911
            prow['LATEST_DEP_QID'] = np.array(qids, dtype=int)
1✔
912
        elif type(dependency) in [dict, OrderedDict, Table.Row]:
1✔
913
            prow['INT_DEP_IDS'] = np.array([dependency['INTID']], dtype=int)
1✔
914
            if still_a_dependency(dependency):
1✔
915
                prow['LATEST_DEP_QID'] = np.array([dependency['LATEST_QID']], dtype=int)
1✔
916
    return prow
1✔
917

918
def still_a_dependency(dependency):
1✔
919
    """
920
    Defines the criteria for which a dependency is deemed complete (and therefore no longer a dependency).
921

922
     Args:
923
        dependency, Table.Row or dict. Processing row corresponding to the required input for the job in prow.
924
            This must contain keyword accessible values for 'STATUS', and 'LATEST_QID'.
925

926
    Returns:
927
        bool: False if the criteria indicate that the dependency is completed and no longer a blocking factor (ie no longer
928
        a genuine dependency). Returns True if the dependency is still a blocking factor such that the slurm
929
        scheduler needs to be aware of the pending job.
930

931
    """
932
    return dependency['LATEST_QID'] > 0 and dependency['STATUS'] != 'COMPLETED'
1✔
933

934
def get_type_and_tile(erow):
1✔
935
    """
936
    Trivial function to return the OBSTYPE and the TILEID from an exposure table row
937

938
    Args:
939
        erow, Table.Row or dict. Must contain 'OBSTYPE' and 'TILEID' as keywords.
940

941
    Returns:
942
        tuple (str, str), corresponding to the OBSTYPE and TILEID values of the input erow.
943
    """
UNCOV
944
    return str(erow['OBSTYPE']).lower(), erow['TILEID']
×
945

946

947
#############################################
948
#########   Table manipulators   ############
949
#############################################
950
def parse_previous_tables(etable, ptable, night):
1✔
951
    """
952
    This takes in the exposure and processing tables and regenerates all the working memory variables needed for the
953
    daily processing script.
954

955
    Used by the daily processing to define most of its state-ful variables into working memory.
956
    If the processing table is empty, these are simply declared and returned for use.
957
    If the code had previously run and exited (or crashed), however, this will all the code to
958
    re-establish itself by redefining these values.
959

960
    Args:
961
        etable, Table, Exposure table of all exposures that have been dealt with thus far.
962
        ptable, Table, Processing table of all exposures that have been processed.
963
        night, str or int, the night the data was taken.
964

965
    Returns:
966
        tuple: A tuple containing:
967

968
        * arcs, list of dicts, list of the individual arc jobs used for the psfnight (NOT all
969
          the arcs, if multiple sets existed)
970
        * flats, list of dicts, list of the individual flat jobs used for the nightlyflat (NOT
971
          all the flats, if multiple sets existed)
972
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
973
          (if currently processing that tile)
974
        * calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'badcol', 'psfnight'
975
          and 'nightlyflat'. Each key corresponds to a Table.Row or
976
          None. The table.Row() values are for the corresponding
977
          calibration job.
978
        * curtype, None, the obstype of the current job being run. Always None as first new job will define this.
979
        * lasttype, str or None, the obstype of the last individual exposure row to be processed.
980
        * curtile, None, the tileid of the current job (if science). Otherwise None. Always None as first
981
          new job will define this.
982
        * lasttile, str or None, the tileid of the last job (if science). Otherwise None.
983
        * internal_id, int, an internal identifier unique to each job. Increments with each new job. This
984
          is the latest unassigned value.
985
    """
986
    log = get_logger()
×
987
    arcs, flats, sciences = [], [], []
×
988
    calibjobs = {'nightlybias': None, 'ccdcalib': None, 'badcol': None,
×
989
                 'psfnight': None, 'nightlyflat': None, 'linkcal': None,
990
                 'accounted_for': dict()}
991
    curtype,lasttype = None,None
×
992
    curtile,lasttile = None,None
×
993

UNCOV
994
    if len(ptable) > 0:
×
995
        prow = ptable[-1]
×
996
        internal_id = int(prow['INTID'])+1
×
997
        lasttype,lasttile = get_type_and_tile(ptable[-1])
×
UNCOV
998
        jobtypes = ptable['JOBDESC']
×
999

1000
        if 'nightlybias' in jobtypes:
×
1001
            calibjobs['nightlybias'] = table_row_to_dict(ptable[jobtypes=='nightlybias'][0])
×
1002
            log.info("Located nightlybias job in exposure table: {}".format(calibjobs['nightlybias']))
×
1003

1004
        if 'ccdcalib' in jobtypes:
×
1005
            calibjobs['ccdcalib'] = table_row_to_dict(ptable[jobtypes=='ccdcalib'][0])
×
1006
            log.info("Located ccdcalib job in exposure table: {}".format(calibjobs['ccdcalib']))
×
1007

1008
        if 'psfnight' in jobtypes:
×
UNCOV
1009
            calibjobs['psfnight'] = table_row_to_dict(ptable[jobtypes=='psfnight'][0])
×
1010
            log.info("Located joint fit psfnight job in exposure table: {}".format(calibjobs['psfnight']))
×
UNCOV
1011
        elif lasttype == 'arc':
×
1012
            seqnum = 10
×
UNCOV
1013
            for row in ptable[::-1]:
×
1014
                erow = etable[etable['EXPID']==row['EXPID'][0]]
×
1015
                if row['OBSTYPE'].lower() == 'arc' and int(erow['SEQNUM'])<seqnum:
×
1016
                    arcs.append(table_row_to_dict(row))
×
1017
                    seqnum = int(erow['SEQNUM'])
×
1018
                else:
1019
                    break
×
1020
            ## Because we work backword to fill in, we need to reverse them to get chronological order back
1021
            arcs = arcs[::-1]
×
1022

UNCOV
1023
        if 'nightlyflat' in jobtypes:
×
1024
            calibjobs['nightlyflat'] = table_row_to_dict(ptable[jobtypes=='nightlyflat'][0])
×
1025
            log.info("Located joint fit nightlyflat job in exposure table: {}".format(calibjobs['nightlyflat']))
×
UNCOV
1026
        elif lasttype == 'flat':
×
1027
            for row in ptable[::-1]:
×
1028
                erow = etable[etable['EXPID']==row['EXPID'][0]]
×
1029
                if row['OBSTYPE'].lower() == 'flat' and int(erow['SEQTOT']) < 5:
×
UNCOV
1030
                    if float(erow['EXPTIME']) > 100.:
×
1031
                        flats.append(table_row_to_dict(row))
×
1032
                else:
1033
                    break
×
1034
            flats = flats[::-1]
×
1035

1036
        if lasttype.lower() == 'science':
×
UNCOV
1037
            for row in ptable[::-1]:
×
1038
                if row['OBSTYPE'].lower() == 'science' and row['TILEID'] == lasttile and \
×
1039
                   row['JOBDESC'] == 'prestdstar' and row['LASTSTEP'] != 'skysub':
UNCOV
1040
                    sciences.append(table_row_to_dict(row))
×
1041
                else:
UNCOV
1042
                    break
×
UNCOV
1043
            sciences = sciences[::-1]
×
1044
    else:
UNCOV
1045
        internal_id = night_to_starting_iid(night)
×
1046

UNCOV
1047
    return arcs,flats,sciences, \
×
1048
           calibjobs, \
1049
           curtype, lasttype, \
1050
           curtile, lasttile,\
1051
           internal_id
1052

1053
def generate_calibration_dict(ptable, files_to_link=None):
1✔
1054
    """
1055
    This takes in a processing table and regenerates the working memory calibration
1056
    dictionary for dependency tracking. Used by the daily processing to define 
1057
    most of its state-ful variables into working memory.
1058
    If the processing table is empty, these are simply declared and returned for use.
1059
    If the code had previously run and exited (or crashed), however, this will all the code to
1060
    re-establish itself by redefining these values.
1061

1062
    Args:
1063
        ptable, Table, Processing table of all exposures that have been processed.
1064
        files_to_link, set, Set of filenames that the linkcal job will link.
1065

1066
    Returns:
1067
        calibjobs, dict. Dictionary containing 'nightlybias', 'badcol', 'ccdcalib',
1068
            'psfnight', 'nightlyflat', 'linkcal', and 'completed'. Each key corresponds to a
1069
            Table.Row or None. The table.Row() values are for the corresponding
1070
            calibration job.
1071
    """
1072
    log = get_logger()
1✔
1073
    job_to_file_map = get_jobdesc_to_file_map()
1✔
1074
    accounted_for = {'biasnight': False, 'badcolumns': False,
1✔
1075
                     'ctecorrnight': False, 'psfnight': False,
1076
                     'fiberflatnight': False}
1077
    calibjobs = {'nightlybias': None, 'ccdcalib': None, 'badcol': None,
1✔
1078
                 'psfnight': None, 'nightlyflat': None, 'linkcal': None}
1079

1080
    ptable_jobtypes = ptable['JOBDESC']
1✔
1081

1082
    for jobtype in calibjobs.keys():
1✔
1083
        if jobtype in ptable_jobtypes:
1✔
1084
            calibjobs[jobtype] = table_row_to_dict(ptable[ptable_jobtypes==jobtype][0])
×
1085
            log.info(f"Located {jobtype} job in exposure table: {calibjobs[jobtype]}")
×
1086
            if jobtype == 'linkcal':
×
1087
                if files_to_link is not None and len(files_to_link) > 0:
×
1088
                    log.info(f"Assuming existing linkcal job processed "
×
1089
                             + f"{files_to_link} since given in override file.")
1090
                    accounted_for = update_accounted_for_with_linking(accounted_for,
×
1091
                                                                  files_to_link)
1092
                else:
1093
                    err = f"linkcal job exists but no files given: {files_to_link=}"
×
1094
                    log.error(err)
×
UNCOV
1095
                    raise ValueError(err)
×
UNCOV
1096
            elif jobtype == 'ccdcalib':
×
1097
                possible_ccd_files = set(['biasnight', 'badcolumns', 'ctecorrnight'])
×
1098
                if files_to_link is None:
×
UNCOV
1099
                    files_accounted_for = possible_ccd_files
×
1100
                else:
UNCOV
1101
                    files_accounted_for = possible_ccd_files.difference(files_to_link)
×
UNCOV
1102
                    ccd_files_linked = possible_ccd_files.intersection(files_to_link)
×
UNCOV
1103
                    log.info(f"Assuming existing ccdcalib job processed "
×
1104
                             + f"{files_accounted_for} since {ccd_files_linked} "
1105
                             + f"are linked.")
UNCOV
1106
                for fil in files_accounted_for:
×
UNCOV
1107
                    accounted_for[fil] = True
×
1108
            else:
UNCOV
1109
                accounted_for[job_to_file_map[jobtype]] = True
×
1110

1111
    calibjobs['accounted_for'] = accounted_for
1✔
1112
    return calibjobs
1✔
1113

1114
def update_accounted_for_with_linking(accounted_for, files_to_link):
1✔
1115
    """
1116
    This takes in a dictionary summarizing the calibration files accounted for
1117
     and updates it based on the files_to_link, which are assumed to have
1118
     already been linked such that those files already exist on disk and
1119
     don't need ot be generated.
1120

1121
    Parameters
1122
    ----------
1123
        accounted_for: dict
1124
            Dictionary containing 'biasnight', 'badcolumns', 'ctecorrnight',
1125
            'psfnight', and 'fiberflatnight'. Each value is True if file is
1126
            accounted for and False if it is not.
1127
        files_to_link: set
1128
            Set of filenames that the linkcal job will link.
1129

1130
    Returns
1131
    -------
1132
        accounted_for: dict
1133
            Dictionary containing 'biasnight', 'badcolumns', 'ctecorrnight',
1134
            'psfnight', and 'fiberflatnight'. Each value is True if file is
1135
            accounted for and False if it is not.
1136
    """
1137
    log = get_logger()
1✔
1138
    
1139
    for fil in files_to_link:
1✔
1140
        if fil in accounted_for:
1✔
1141
            accounted_for[fil] = True
1✔
1142
        else:
UNCOV
1143
            err = f"{fil} doesn't match an expected filetype: "
×
UNCOV
1144
            err += f"{accounted_for.keys()}"
×
UNCOV
1145
            log.error(err)
×
UNCOV
1146
            raise ValueError(err)
×
1147

1148
    return accounted_for
1✔
1149

1150
def all_calibs_submitted(accounted_for, do_cte_flats):
1✔
1151
    """
1152
    Function that returns the boolean logic to determine if the necessary
1153
    calibration jobs have been submitted for calibration.
1154

1155
    Args:
1156
        accounted_for, dict, Dictionary with keys corresponding to the calibration
1157
            filenames and values of True or False.
1158
        do_cte_flats, bool, whether ctecorrnight files are expected or not.
1159

1160
    Returns:
1161
        bool, True if all necessary calibrations have been submitted or handled, False otherwise.
1162
    """
1163
    test_dict = accounted_for.copy()
1✔
1164
    if not do_cte_flats:
1✔
1165
        test_dict.pop('ctecorrnight')
1✔
1166

1167
    return np.all(list(test_dict.values()))
1✔
1168

1169
def update_and_recursively_submit(proc_table, submits=0, resubmission_states=None,
1✔
1170
                                  no_resub_failed=False, ptab_name=None,
1171
                                  dry_run_level=0, reservation=None):
1172
    """
1173
    Given an processing table, this loops over job rows and resubmits failed jobs (as defined by resubmission_states).
1174
    Before submitting a job, it checks the dependencies for failures. If a dependency needs to be resubmitted, it recursively
1175
    follows dependencies until it finds the first job without a failed dependency and resubmits that. Then resubmits the
1176
    other jobs with the new Slurm jobID's for proper dependency coordination within Slurm.
1177

1178
    Args:
1179
        proc_table, Table, the processing table with a row per job.
1180
        submits, int, the number of submissions made to the queue. Used for saving files and in not overloading the scheduler.
1181
        resubmission_states, list or array of strings, each element should be a capitalized string corresponding to a
1182
            possible Slurm scheduler state, where you wish for jobs with that
1183
            outcome to be resubmitted
1184
        no_resub_failed: bool. Set to True if you do NOT want to resubmit
1185
            jobs with Slurm status 'FAILED' by default. Default is False.
1186
        ptab_name, str, the full pathname where the processing table should be saved.
1187
        dry_run_level (int, optional): If nonzero, this is a simulated run. Default is 0.
1188
            0 which runs the code normally.
1189
            1 writes all files but doesn't submit any jobs to Slurm.
1190
            2 writes tables but doesn't write scripts or submit anything.
1191
            3 Doesn't write or submit anything but queries Slurm normally for job status.
1192
            4 Doesn't write, submit jobs, or query Slurm.
1193
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
1194
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
1195

1196
    Returns:
1197
        tuple: A tuple containing:
1198

1199
        * proc_table: Table, a table with the same rows as the input except that Slurm and jobid relevant columns have
1200
          been updated for those jobs that needed to be resubmitted.
1201
        * submits: int, the number of submissions made to the queue. This is incremented from the input submits, so it is
1202
          the number of submissions made from this function call plus the input submits value.
1203

1204
    Note:
1205
        This modifies the inputs of both proc_table and submits and returns them.
1206
    """
1207
    log = get_logger()
1✔
1208
    if resubmission_states is None:
1✔
1209
        resubmission_states = get_resubmission_states(no_resub_failed=no_resub_failed)
1✔
1210

1211
    log.info(f"Resubmitting jobs with current states in the following: {resubmission_states}")
1✔
1212
    proc_table = update_from_queue(proc_table, dry_run_level=dry_run_level)
1✔
1213

1214
    log.info("Updated processing table queue information:")
1✔
1215
    cols = ['INTID', 'INT_DEP_IDS', 'EXPID', 'TILEID',
1✔
1216
            'OBSTYPE', 'JOBDESC', 'LATEST_QID', 'STATUS']
1217
    log.info(np.array(cols))
1✔
1218
    for row in proc_table:
1✔
1219
        log.info(np.array(row[cols]))
1✔
1220
    log.info("\n")
1✔
1221
    id_to_row_map = {row['INTID']: rown for rown, row in enumerate(proc_table)}
1✔
1222
    for rown in range(len(proc_table)):
1✔
1223
        if proc_table['STATUS'][rown] in resubmission_states:
1✔
1224
            proc_table, submits = recursive_submit_failed(rown, proc_table, submits,
1✔
1225
                                                          id_to_row_map, ptab_name,
1226
                                                          resubmission_states,
1227
                                                          reservation, dry_run_level)
1228

1229
    proc_table = update_from_queue(proc_table, dry_run_level=dry_run_level)
1✔
1230

1231
    return proc_table, submits
1✔
1232

1233
def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name=None,
1✔
1234
                            resubmission_states=None, reservation=None, dry_run_level=0):
1235
    """
1236
    Given a row of a processing table and the full processing table, this resubmits the given job.
1237
    Before submitting a job, it checks the dependencies for failures in the processing table. If a dependency needs to
1238
    be resubmitted, it recursively follows dependencies until it finds the first job without a failed dependency and
1239
    resubmits that. Then resubmits the other jobs with the new Slurm jobID's for proper dependency coordination within Slurm.
1240

1241
    Args:
1242
        rown, Table.Row, the row of the processing table that you want to resubmit.
1243
        proc_table, Table, the processing table with a row per job.
1244
        submits, int, the number of submissions made to the queue. Used for saving files and in not overloading the scheduler.
1245
        id_to_row_map, dict, lookup dictionary where the keys are internal ids (INTID's) and the values are the row position
1246
            in the processing table.
1247
        ptab_name, str, the full pathname where the processing table should be saved.
1248
        resubmission_states, list or array of strings, each element should be a capitalized string corresponding to a
1249
            possible Slurm scheduler state, where you wish for jobs with that
1250
            outcome to be resubmitted
1251
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
1252
        dry_run_level (int, optional): If nonzero, this is a simulated run. Default is 0.
1253
            0 which runs the code normally.
1254
            1 writes all files but doesn't submit any jobs to Slurm.
1255
            2 writes tables but doesn't write scripts or submit anything.
1256
            3 Doesn't write or submit anything but queries Slurm normally for job status.
1257
            4 Doesn't write, submit jobs, or query Slurm.
1258
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
1259

1260
    Returns:
1261
        tuple: A tuple containing:
1262

1263
        * proc_table: Table, a table with the same rows as the input except that Slurm and jobid relevant columns have
1264
          been updated for those jobs that needed to be resubmitted.
1265
        * submits: int, the number of submissions made to the queue. This is incremented from the input submits, so it is
1266
          the number of submissions made from this function call plus the input submits value.
1267

1268
    Note:
1269
        This modifies the inputs of both proc_table and submits and returns them.
1270
    """
1271
    log = get_logger()
1✔
1272
    row = proc_table[rown]
1✔
1273
    log.info(f"Identified row {row['INTID']} as needing resubmission.")
1✔
1274
    log.info(f"{row['INTID']}: Expid(s): {row['EXPID']}  Job: {row['JOBDESC']}")
1✔
1275
    if resubmission_states is None:
1✔
UNCOV
1276
        resubmission_states = get_resubmission_states()
×
1277
    ideps = proc_table['INT_DEP_IDS'][rown]
1✔
1278
    if ideps is None or len(ideps)==0:
1✔
UNCOV
1279
        proc_table['LATEST_DEP_QID'][rown] = np.ndarray(shape=0).astype(int)
×
1280
    else:
1281
        all_valid_states = list(resubmission_states.copy())
1✔
1282
        good_states = ['RUNNING','PENDING','SUBMITTED','COMPLETED']
1✔
1283
        all_valid_states.extend(good_states)
1✔
1284
        othernight_idep_qid_lookup = {}
1✔
1285
        for idep in np.sort(np.atleast_1d(ideps)):
1✔
1286
            if idep not in id_to_row_map:
1✔
1287
                if idep // 1000 != row['INTID'] // 1000:
1✔
1288
                    log.info(f"Internal ID: {idep} not in id_to_row_map. "
1✔
1289
                             + "This is expected since it's from another day. ")
1290
                    reference_night = 20000000 + (idep // 1000)
1✔
1291
                    reftab = read_minimal_full_proctab_cols(nights=[reference_night])
1✔
1292
                    if reftab is None:
1✔
UNCOV
1293
                        msg = f"The dependency is from night={reference_night}" \
×
1294
                              + f" but read_minimal_full_proctab_cols couldn't" \
1295
                              + f" locate that processing table, this is a " \
1296
                              +  f"fatal error."
UNCOV
1297
                        log.critical(msg)
×
UNCOV
1298
                        raise ValueError(msg)
×
1299
                    reftab = update_from_queue(reftab, dry_run_level=dry_run_level)
1✔
1300
                    entry = reftab[reftab['INTID'] == idep][0]
1✔
1301
                    if entry['STATUS'] not in good_states:
1✔
1302
                        msg = f"Internal ID: {idep} not in id_to_row_map. " \
1✔
1303
                              + f"Since the dependency is from night={reference_night} " \
1304
                              + f"and that job isn't in a good state this is an " \
1305
                              + f"error we can't overcome."
1306
                        log.error(msg)
1✔
1307
                        proc_table['STATUS'][rown] = "DEP_NOT_SUBD"
1✔
1308
                        return proc_table, submits
1✔
1309
                    else:
1310
                        ## otherwise all is good, just update the cache to use this
1311
                        ## in the next stage
1312
                        othernight_idep_qid_lookup[idep] = entry['LATEST_QID']
1✔
1313
                        update_full_ptab_cache(reftab)
1✔
1314
                else:
UNCOV
1315
                    msg = f"Internal ID: {idep} not in id_to_row_map. " \
×
1316
                         + f"Since the dependency is from the same night" \
1317
                         + f" and we can't find it, this is a fatal error."
1318
                    log.critical(msg)
×
UNCOV
1319
                    raise ValueError(msg)
×
1320
            elif proc_table['STATUS'][id_to_row_map[idep]] not in all_valid_states:
1✔
UNCOV
1321
                log.error(f"Proc INTID: {proc_table['INTID'][rown]} depended on" +
×
1322
                            f" INTID {proc_table['INTID'][id_to_row_map[idep]]}" +
1323
                            f" but that exposure has state" +
1324
                            f" {proc_table['STATUS'][id_to_row_map[idep]]} that" +
1325
                            f" isn't in the list of resubmission states." +
1326
                            f" Exiting this job's resubmission attempt.")
UNCOV
1327
                proc_table['STATUS'][rown] = "DEP_NOT_SUBD"
×
UNCOV
1328
                return proc_table, submits
×
1329
        qdeps = []
1✔
1330
        for idep in np.sort(np.atleast_1d(ideps)):
1✔
1331
            if idep in id_to_row_map:
1✔
1332
                if proc_table['STATUS'][id_to_row_map[idep]] in resubmission_states:
1✔
UNCOV
1333
                    proc_table, submits = recursive_submit_failed(id_to_row_map[idep],
×
1334
                                                                  proc_table, submits,
1335
                                                                  id_to_row_map,
1336
                                                                  reservation=reservation,
1337
                                                                  dry_run_level=dry_run_level)
1338
                ## Now that we've resubmitted the dependency if necessary,
1339
                ## add the most recent QID to the list
1340
                qdeps.append(proc_table['LATEST_QID'][id_to_row_map[idep]])
1✔
1341
            else:
1342
                ## Since we verified above that the cross night QID is still
1343
                ## either pending or successful, add that to the list of QID's
1344
                qdeps.append(othernight_idep_qid_lookup[idep])
1✔
1345

1346
        qdeps = np.atleast_1d(qdeps)
1✔
1347
        if len(qdeps) > 0:
1✔
1348
            proc_table['LATEST_DEP_QID'][rown] = qdeps
1✔
1349
        else:
UNCOV
1350
            log.error(f"number of qdeps should be 1 or more: Rown {rown}, ideps {ideps}")
×
1351

1352
    proc_table[rown] = submit_batch_script(proc_table[rown], reservation=reservation,
1✔
1353
                                           strictly_successful=True, dry_run=dry_run_level)
1354
    submits += 1
1✔
1355

1356
    if dry_run_level < 3:
1✔
UNCOV
1357
        if ptab_name is None:
×
UNCOV
1358
            write_table(proc_table, tabletype='processing', overwrite=True)
×
1359
        else:
UNCOV
1360
            write_table(proc_table, tablename=ptab_name, overwrite=True)
×
UNCOV
1361
        sleep_and_report(0.1 + 0.1*(submits % 10 == 0),
×
1362
                         message_suffix=f"after submitting job to queue and writing proctable")
1363
    return proc_table, submits
1✔
1364

1365

1366
#########################################
1367
########     Joint fit     ##############
1368
#########################################
1369
def joint_fit(ptable, prows, internal_id, queue, reservation, descriptor, z_submit_types=None,
1✔
1370
              dry_run=0, strictly_successful=False, check_for_outputs=True, resubmit_partial_complete=True,
1371
              system_name=None):
1372
    """
1373
    DEPRECATED
1374
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1375
    joint fitting job given by descriptor. If the joint fitting job is standard star fitting, the post standard star fits
1376
    for all the individual exposures also created and submitted. The returned ptable has all of these rows added to the
1377
    table given as input.
1378

1379
    Args:
1380
        ptable (Table): The processing table where each row is a processed job.
1381
        prows (list or array of dict): The rows corresponding to the individual exposure jobs that are
1382
            inputs to the joint fit.
1383
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1384
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1385
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1386
        descriptor (str): Description of the joint fitting job. Can either be 'science' or 'stdstarfit', 'arc' or 'psfnight',
1387
            or 'flat' or 'nightlyflat'.
1388
        z_submit_types (list of str, optional): The "group" types of redshifts that should be submitted with each
1389
            exposure. If not specified or None, then no redshifts are submitted.
1390
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
1391
            0 which runs the code normally.
1392
            1 writes all files but doesn't submit any jobs to Slurm.
1393
            2 writes tables but doesn't write scripts or submit anything.
1394
            3 Doesn't write or submit anything but queries Slurm normally for job status.
1395
            4 Doesn't write, submit jobs, or query Slurm.
1396
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
1397
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1398
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1399
            than failing completely from failed calibrations. Default is False.
1400
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1401
            data products for the script being submitted. If all files exist and this is True,
1402
            then the script will not be submitted. If some files exist and this is True, only the
1403
            subset of the cameras without the final data products will be generated and submitted.
1404
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1405
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1406
            remaining cameras not found to exist.
1407
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1408

1409
    Returns:
1410
        tuple: A tuple containing:
1411

1412
        * ptable, Table. The same processing table as input except with added rows for the joint fit job and, in the case
1413
          of a stdstarfit, the poststdstar science exposure jobs.
1414
        * joint_prow, dict. Row of a processing table corresponding to the joint fit job.
1415
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1416
    """
1417
    log = get_logger()
×
UNCOV
1418
    if len(prows) < 1:
×
1419
        return ptable, None, internal_id
×
1420

UNCOV
1421
    if descriptor is None:
×
1422
        return ptable, None
×
1423
    elif descriptor == 'arc':
×
UNCOV
1424
        descriptor = 'psfnight'
×
UNCOV
1425
    elif descriptor == 'flat':
×
1426
        descriptor = 'nightlyflat'
×
UNCOV
1427
    elif descriptor == 'science':
×
1428
        if z_submit_types is None or len(z_submit_types) == 0:
×
1429
            descriptor = 'stdstarfit'
×
1430

1431
    if descriptor not in ['psfnight', 'nightlyflat', 'science','stdstarfit']:
×
1432
        return ptable, None, internal_id
×
1433

1434
    log.info(" ")
×
1435
    log.info(f"Joint fit criteria found. Running {descriptor}.\n")
×
1436

UNCOV
1437
    if descriptor == 'science':
×
UNCOV
1438
        joint_prow, internal_id = make_joint_prow(prows, descriptor='stdstarfit', internal_id=internal_id)
×
1439
    else:
1440
        joint_prow, internal_id = make_joint_prow(prows, descriptor=descriptor, internal_id=internal_id)
×
1441
    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1442
                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1443
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1444
    ptable.add_row(joint_prow)
×
1445

1446
    if descriptor in ['science','stdstarfit']:
×
UNCOV
1447
        if descriptor == 'science':
×
1448
            zprows = []
×
1449
        log.info(" ")
×
1450
        log.info(f"Submitting individual science exposures now that joint fitting of standard stars is submitted.\n")
×
1451
        for row in prows:
×
1452
            if row['LASTSTEP'] == 'stdstarfit':
×
UNCOV
1453
                continue
×
UNCOV
1454
            row['JOBDESC'] = 'poststdstar'
×
1455

1456
            # poststdstar job can't process cameras not included in its stdstar joint fit
1457
            stdcamword = joint_prow['PROCCAMWORD']
×
UNCOV
1458
            thiscamword = row['PROCCAMWORD']
×
UNCOV
1459
            proccamword = camword_intersection([stdcamword, thiscamword])
×
1460
            if proccamword != thiscamword:
×
1461
                dropcams = difference_camwords(thiscamword, proccamword)
×
UNCOV
1462
                assert dropcams != ''  #- i.e. if they differ, we should be dropping something
×
UNCOV
1463
                log.warning(f"Dropping exp {row['EXPID']} poststdstar cameras {dropcams} since they weren't included in stdstar fit {stdcamword}")
×
UNCOV
1464
                row['PROCCAMWORD'] = proccamword
×
1465

1466
            row['INTID'] = internal_id
×
1467
            internal_id += 1
×
UNCOV
1468
            row['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
×
1469
            row = assign_dependency(row, joint_prow)
×
1470
            row = create_and_submit(row, queue=queue, reservation=reservation, dry_run=dry_run,
×
1471
                                    strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1472
                                    resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1473
            ptable.add_row(row)
×
1474
            if descriptor == 'science' and row['LASTSTEP'] == 'all':
×
1475
                zprows.append(row)
×
1476

1477
    ## Now run redshifts
1478
    if descriptor == 'science' and len(zprows) > 0 and z_submit_types is not None:
×
UNCOV
1479
        prow_selection = (  (ptable['OBSTYPE'] == 'science')
×
1480
                          & (ptable['LASTSTEP'] == 'all')
1481
                          & (ptable['JOBDESC'] == 'poststdstar')
1482
                          & (ptable['TILEID'] == int(zprows[0]['TILEID'])) )
1483
        nightly_zprows = []
×
1484
        if np.sum(prow_selection) == len(zprows):
×
1485
            nightly_zprows = zprows.copy()
×
1486
        else:
1487
            for prow in ptable[prow_selection]:
×
1488
                nightly_zprows.append(table_row_to_dict(prow))
×
1489

UNCOV
1490
        for zsubtype in z_submit_types:
×
1491
            if zsubtype == 'perexp':
×
UNCOV
1492
                for zprow in zprows:
×
1493
                    log.info(" ")
×
1494
                    log.info(f"Submitting redshift fit of type {zsubtype} for TILEID {zprow['TILEID']} and EXPID {zprow['EXPID']}.\n")
×
1495
                    joint_prow, internal_id = make_joint_prow([zprow], descriptor=zsubtype, internal_id=internal_id)
×
UNCOV
1496
                    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1497
                                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1498
                                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
UNCOV
1499
                    ptable.add_row(joint_prow)
×
1500
            else:
UNCOV
1501
                log.info(" ")
×
UNCOV
1502
                log.info(f"Submitting joint redshift fits of type {zsubtype} for TILEID {nightly_zprows[0]['TILEID']}.")
×
UNCOV
1503
                expids = [prow['EXPID'][0] for prow in nightly_zprows]
×
UNCOV
1504
                log.info(f"Expids: {expids}.\n")
×
UNCOV
1505
                joint_prow, internal_id = make_joint_prow(nightly_zprows, descriptor=zsubtype, internal_id=internal_id)
×
UNCOV
1506
                joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1507
                                               strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1508
                                               resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
UNCOV
1509
                ptable.add_row(joint_prow)
×
1510

UNCOV
1511
    if descriptor in ['psfnight', 'nightlyflat']:
×
UNCOV
1512
        log.info(f"Setting the calibration exposures as calibrators in the processing table.\n")
×
UNCOV
1513
        ptable = set_calibrator_flag(prows, ptable)
×
1514

UNCOV
1515
    return ptable, joint_prow, internal_id
×
1516

1517
def joint_cal_fit(descriptor, ptable, prows, internal_id, queue, reservation,
1✔
1518
                  dry_run=0, strictly_successful=False, check_for_outputs=True,
1519
                  resubmit_partial_complete=True, system_name=None):
1520
    """
1521
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1522
    joint fitting job given by descriptor. If the joint fitting job is standard star fitting, the post standard star fits
1523
    for all the individual exposures also created and submitted. The returned ptable has all of these rows added to the
1524
    table given as input.
1525

1526
    Args:
1527
        descriptor (str): Description of the joint fitting job. Can either be 'science' or 'stdstarfit', 'arc' or 'psfnight',
1528
            or 'flat' or 'nightlyflat'.
1529
        prows (list or array of dict): The rows corresponding to the individual exposure jobs that are
1530
            inputs to the joint fit.
1531
        ptable (Table): The processing table where each row is a processed job.
1532
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1533
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1534
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1535
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
1536
            0 which runs the code normally.
1537
            1 writes all files but doesn't submit any jobs to Slurm.
1538
            2 writes tables but doesn't write scripts or submit anything.
1539
            3 Doesn't write or submit anything but queries Slurm normally for job status.
1540
            4 Doesn't write, submit jobs, or query Slurm.
1541
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
1542
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1543
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1544
            than failing completely from failed calibrations. Default is False.
1545
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1546
            data products for the script being submitted. If all files exist and this is True,
1547
            then the script will not be submitted. If some files exist and this is True, only the
1548
            subset of the cameras without the final data products will be generated and submitted.
1549
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1550
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1551
            remaining cameras not found to exist.
1552
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1553

1554
    Returns:
1555
        tuple: A tuple containing:
1556

1557
        * ptable, Table. The same processing table as input except with added rows for the joint fit job and, in the case
1558
          of a stdstarfit, the poststdstar science exposure jobs.
1559
        * joint_prow, dict. Row of a processing table corresponding to the joint fit job.
1560
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1561
    """
1562
    log = get_logger()
×
UNCOV
1563
    if len(prows) < 1:
×
1564
        return ptable, None, internal_id
×
1565

1566
    if descriptor is None:
×
UNCOV
1567
        return ptable, None
×
1568
    elif descriptor == 'arc':
×
UNCOV
1569
        descriptor = 'psfnight'
×
UNCOV
1570
    elif descriptor == 'flat':
×
UNCOV
1571
        descriptor = 'nightlyflat'
×
1572

UNCOV
1573
    if descriptor not in ['psfnight', 'nightlyflat']:
×
UNCOV
1574
        return ptable, None, internal_id
×
1575

UNCOV
1576
    log.info(" ")
×
UNCOV
1577
    log.info(f"Joint fit criteria found. Running {descriptor}.\n")
×
1578

UNCOV
1579
    joint_prow, internal_id = make_joint_prow(prows, descriptor=descriptor, internal_id=internal_id)
×
UNCOV
1580
    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1581
                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1582
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
UNCOV
1583
    ptable.add_row(joint_prow)
×
1584

UNCOV
1585
    if descriptor in ['psfnight', 'nightlyflat']:
×
UNCOV
1586
        log.info(f"Setting the calibration exposures as calibrators in the processing table.\n")
×
UNCOV
1587
        ptable = set_calibrator_flag(prows, ptable)
×
1588

UNCOV
1589
    return ptable, joint_prow, internal_id
×
1590

1591

1592
#########################################
1593
########     Redshifts     ##############
1594
#########################################
1595
def submit_redshifts(ptable, prows, tnight, internal_id, queue, reservation,
1✔
1596
              dry_run=0, strictly_successful=False,
1597
              check_for_outputs=True, resubmit_partial_complete=True,
1598
              z_submit_types=None, system_name=None):
1599
    """
1600
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1601
    tilenight job given by descriptor. The returned ptable has all of these rows added to the
1602
    table given as input.
1603

1604
    Args:
1605
        ptable (Table): The processing table where each row is a processed job.
1606
        prows (list or array of dict): Unsubmitted prestdstar jobs that are first steps in tilenight.
1607
        tnight (Table.Row): The processing table row of the tilenight job on which the redshifts depend.
1608
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1609
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1610
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1611
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
1612
            0 which runs the code normally.
1613
            1 writes all files but doesn't submit any jobs to Slurm.
1614
            2 writes tables but doesn't write scripts or submit anything.
1615
            3 Doesn't write or submit anything but queries Slurm normally for job status.
1616
            4 Doesn't write, submit jobs, or query Slurm.
1617
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
1618
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1619
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1620
            than failing completely from failed calibrations. Default is False.
1621
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1622
            data products for the script being submitted. If all files exist and this is True,
1623
            then the script will not be submitted. If some files exist and this is True, only the
1624
            subset of the cameras without the final data products will be generated and submitted.
1625
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1626
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1627
            remaining cameras not found to exist.
1628
        z_submit_types (list of str): The "group" types of redshifts that should be submitted with each
1629
            exposure. If not specified or None, then no redshifts are submitted.
1630
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1631

1632
    Returns:
1633
        tuple: A tuple containing:
1634

1635
        * ptable, Table. The same processing table as input except with added rows for the joint fit job.
1636
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1637
    """
1638
    log = get_logger()
1✔
1639
    if len(prows) < 1 or z_submit_types == None:
1✔
1640
        return ptable, internal_id
1✔
1641

1642
    log.info(" ")
1✔
1643
    log.info(f"Running redshifts.\n")
1✔
1644

1645
    ## Now run redshifts
1646
    zprows = []
1✔
1647
    for row in prows:
1✔
1648
        if row['LASTSTEP'] == 'all':
1✔
1649
            zprows.append(row)
1✔
1650

1651
    if len(zprows) > 0:
1✔
1652
        for zsubtype in z_submit_types:
1✔
1653
            log.info(" ")
1✔
1654
            log.info(f"Submitting joint redshift fits of type {zsubtype} for TILEID {zprows[0]['TILEID']}.")
1✔
1655
            if zsubtype == 'perexp':
1✔
UNCOV
1656
                for zprow in zprows:
×
UNCOV
1657
                    log.info(f"EXPID: {zprow['EXPID']}.\n")
×
UNCOV
1658
                    redshift_prow = make_redshift_prow([zprow], tnight, descriptor=zsubtype, internal_id=internal_id)
×
UNCOV
1659
                    internal_id += 1
×
UNCOV
1660
                    redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1661
                                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1662
                                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
UNCOV
1663
                    ptable.add_row(redshift_prow)
×
1664
            elif zsubtype == 'cumulative':
1✔
1665
                tileids = np.unique([prow['TILEID'] for prow in zprows])
1✔
1666
                if len(tileids) > 1:
1✔
UNCOV
1667
                    msg = f"Error, more than one tileid provided for cumulative redshift job: {tileids}"
×
UNCOV
1668
                    log.critical(msg)
×
UNCOV
1669
                    raise ValueError(msg)
×
1670
                nights = np.unique([prow['NIGHT'] for prow in zprows])
1✔
1671
                if len(nights) > 1:
1✔
UNCOV
1672
                    msg = f"Error, more than one night provided for cumulative redshift job: {nights}"
×
UNCOV
1673
                    log.critical(msg)
×
UNCOV
1674
                    raise ValueError(msg)
×
1675
                tileid, night = tileids[0], nights[0]
1✔
1676
                ## For cumulative redshifts, get any existing processing rows for tile
1677
                matched_prows = read_minimal_tilenight_proctab_cols(tileids=tileids)
1✔
1678
                ## Identify the processing rows that should be assigned as dependecies
1679
                ## tnight should be first such that the new job inherits the other metadata from it
1680
                tnights = [tnight]
1✔
1681
                if matched_prows is not None:
1✔
1682
                    matched_prows = matched_prows[matched_prows['NIGHT'] <= night]
1✔
1683
                    for prow in matched_prows:
1✔
1684
                        if prow['INTID'] != tnight['INTID']:
1✔
1685
                            tnights.append(prow)
1✔
1686
                log.info(f"Internal Processing IDs: {[prow['INTID'] for prow in tnights]}.\n")
1✔
1687
                ## Identify all exposures that should go into the fit
1688
                expids = [prow['EXPID'][0] for prow in zprows]
1✔
1689
                ## note we can actually get the full list of exposures, but for now
1690
                ## we'll stay consistent with old processing where we only list exposures
1691
                ## from the current night
1692
                ## For cumulative redshifts, get valid expids from exptables
1693
                #matched_erows = read_minimal_science_exptab_cols(tileids=tileids)
1694
                #matched_erows = matched_erows[matched_erows['NIGHT']<=night]
1695
                #expids = list(set([prow['EXPID'][0] for prow in zprows])+set(matched_erows['EXPID']))
1696
                log.info(f"Expids: {expids}.\n")
1✔
1697
                redshift_prow, internal_id = make_joint_prow(tnights, descriptor=zsubtype, internal_id=internal_id)
1✔
1698
                redshift_prow['EXPID'] = expids
1✔
1699
                redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
1✔
1700
                                               strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1701
                                               resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1702
                ptable.add_row(redshift_prow)
1✔
1703
            else: # pernight
UNCOV
1704
                expids = [prow['EXPID'][0] for prow in zprows]
×
UNCOV
1705
                log.info(f"Expids: {expids}.\n")
×
UNCOV
1706
                redshift_prow = make_redshift_prow(zprows, tnight, descriptor=zsubtype, internal_id=internal_id)
×
UNCOV
1707
                internal_id += 1
×
UNCOV
1708
                redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1709
                                               strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1710
                                               resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
UNCOV
1711
                ptable.add_row(redshift_prow)
×
1712

1713
    return ptable, internal_id
1✔
1714

1715
#########################################
1716
########     Tilenight     ##############
1717
#########################################
1718
def submit_tilenight(ptable, prows, calibjobs, internal_id, queue, reservation,
1✔
1719
              dry_run=0, strictly_successful=False, resubmit_partial_complete=True,
1720
              system_name=None, use_specter=False, extra_job_args=None):
1721
    """
1722
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1723
    tilenight job given by descriptor. The returned ptable has all of these rows added to the
1724
    table given as input.
1725

1726
    Args:
1727
        ptable (Table): The processing table where each row is a processed job.
1728
        prows (list or array of dict): Unsubmitted prestdstar jobs that are first steps in tilenight.
1729
        calibjobs (dict): Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
1730
            and 'nightlyflat'. Each key corresponds to a Table.Row or
1731
            None. The table.Row() values are for the corresponding
1732
            calibration job.
1733
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1734
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1735
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1736
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
1737
            0 which runs the code normally.
1738
            1 writes all files but doesn't submit any jobs to Slurm.
1739
            2 writes tables but doesn't write scripts or submit anything.
1740
            3 Doesn't write or submit anything but queries Slurm normally for job status.
1741
            4 Doesn't write, submit jobs, or query Slurm.
1742
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
1743
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1744
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1745
            than failing completely from failed calibrations. Default is False.
1746
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1747
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1748
            remaining cameras not found to exist.
1749
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1750
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
1751
        extra_job_args (dict): Dictionary with key-value pairs that specify additional
1752
            information used for a specific type of job. Examples include
1753
            laststeps for for tilenight, etc.
1754

1755
    Returns:
1756
        tuple: A tuple containing:
1757

1758
        * ptable, Table. The same processing table as input except with added rows for the joint fit job.
1759
        * tnight_prow, dict. Row of a processing table corresponding to the tilenight job.
1760
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1761
    """
1762
    log = get_logger()
1✔
1763
    if len(prows) < 1:
1✔
UNCOV
1764
        return ptable, None, internal_id
×
1765

1766
    log.info(" ")
1✔
1767
    log.info(f"Running tilenight.\n")
1✔
1768

1769
    tnight_prow = make_tnight_prow(prows, calibjobs, internal_id=internal_id)
1✔
1770
    internal_id += 1
1✔
1771
    tnight_prow = create_and_submit(tnight_prow, queue=queue, reservation=reservation, dry_run=dry_run,
1✔
1772
                                   strictly_successful=strictly_successful, check_for_outputs=False,
1773
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name,
1774
                                   use_specter=use_specter, extra_job_args=extra_job_args)
1775
    ptable.add_row(tnight_prow)
1✔
1776

1777
    return ptable, tnight_prow, internal_id
1✔
1778

1779
## wrapper functions for joint fitting
1780
def science_joint_fit(ptable, sciences, internal_id, queue='realtime', reservation=None,
1✔
1781
                      z_submit_types=None, dry_run=0, strictly_successful=False,
1782
                      check_for_outputs=True, resubmit_partial_complete=True,
1783
                      system_name=None):
1784
    """
1785
    Wrapper function for desiproc.workflow.processing.joint_fit specific to the stdstarfit joint fit and redshift fitting.
1786

1787
    All variables are the same except:
1788

1789
        Arg 'sciences' is mapped to the prows argument of joint_fit.
1790
        The joint_fit argument descriptor is pre-defined as 'science'.
1791
    """
UNCOV
1792
    return joint_fit(ptable=ptable, prows=sciences, internal_id=internal_id, queue=queue, reservation=reservation,
×
1793
                     descriptor='science', z_submit_types=z_submit_types, dry_run=dry_run,
1794
                     strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1795
                     resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1796

1797

1798
def flat_joint_fit(ptable, flats, internal_id, queue='realtime',
1✔
1799
                   reservation=None, dry_run=0, strictly_successful=False,
1800
                   check_for_outputs=True, resubmit_partial_complete=True,
1801
                   system_name=None):
1802
    """
1803
    Wrapper function for desiproc.workflow.processing.joint_fit specific to the nightlyflat joint fit.
1804

1805
    All variables are the same except:
1806

1807
        Arg 'flats' is mapped to the prows argument of joint_fit.
1808
        The joint_fit argument descriptor is pre-defined as 'nightlyflat'.
1809
    """
UNCOV
1810
    return joint_fit(ptable=ptable, prows=flats, internal_id=internal_id, queue=queue, reservation=reservation,
×
1811
                     descriptor='nightlyflat', dry_run=dry_run, strictly_successful=strictly_successful,
1812
                     check_for_outputs=check_for_outputs, resubmit_partial_complete=resubmit_partial_complete,
1813
                     system_name=system_name)
1814

1815

1816
def arc_joint_fit(ptable, arcs, internal_id, queue='realtime',
1✔
1817
                  reservation=None, dry_run=0, strictly_successful=False,
1818
                  check_for_outputs=True, resubmit_partial_complete=True,
1819
                  system_name=None):
1820
    """
1821
    Wrapper function for desiproc.workflow.processing.joint_fit specific to the psfnight joint fit.
1822

1823
    All variables are the same except:
1824

1825
        Arg 'arcs' is mapped to the prows argument of joint_fit.
1826
        The joint_fit argument descriptor is pre-defined as 'psfnight'.
1827
    """
UNCOV
1828
    return joint_fit(ptable=ptable, prows=arcs, internal_id=internal_id, queue=queue, reservation=reservation,
×
1829
                     descriptor='psfnight', dry_run=dry_run, strictly_successful=strictly_successful,
1830
                     check_for_outputs=check_for_outputs, resubmit_partial_complete=resubmit_partial_complete,
1831
                     system_name=system_name)
1832

1833
def make_joint_prow(prows, descriptor, internal_id):
1✔
1834
    """
1835
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
1836
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
1837
    input prows).
1838

1839
    Args:
1840
        prows, list or array of dicts. The rows corresponding to the individual exposure jobs that are
1841
            inputs to the joint fit.
1842
        descriptor, str. Description of the joint fitting job. Can either be 'stdstarfit', 'psfnight', or 'nightlyflat'.
1843
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1844

1845
    Returns:
1846
        dict: Row of a processing table corresponding to the joint fit job.
1847
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1848
    """
1849
    log = get_logger()
1✔
1850
    first_row = table_row_to_dict(prows[0])
1✔
1851
    joint_prow = first_row.copy()
1✔
1852

1853
    joint_prow['INTID'] = internal_id
1✔
1854
    internal_id += 1
1✔
1855
    joint_prow['JOBDESC'] = descriptor
1✔
1856
    joint_prow['LATEST_QID'] = -99
1✔
1857
    joint_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
1✔
1858
    joint_prow['SUBMIT_DATE'] = -99
1✔
1859
    joint_prow['STATUS'] = 'UNSUBMITTED'
1✔
1860
    joint_prow['SCRIPTNAME'] = ''
1✔
1861
    joint_prow['EXPID'] = np.unique(np.concatenate([currow['EXPID'] for currow in prows])).astype(int)
1✔
1862

1863
    ## Assign the PROCCAMWORD based on the descriptor and the input exposures
1864
    ## UPDATE 2024-04-24: badamps are now included in arc/flat joint fits,
1865
    ## so grab all PROCCAMWORDs instead of filtering out BADAMP cameras
1866
    ## For flats we want any camera that exists in all 12 exposures
1867
    ## For arcs we want any camera that exists in at least 3 exposures
1868
    pcamwords = [prow['PROCCAMWORD'] for prow in prows]
1✔
1869
    if descriptor in 'stdstarfit':
1✔
UNCOV
1870
        joint_prow['PROCCAMWORD'] = camword_union(pcamwords,
×
1871
                                                  full_spectros_only=True)
1872
    elif descriptor in ['pernight', 'cumulative']:
1✔
1873
        joint_prow['PROCCAMWORD'] = camword_union(pcamwords,
1✔
1874
                                                  full_spectros_only=False)
1875
    elif descriptor == 'nightlyflat':
1✔
1876
        joint_prow['PROCCAMWORD'] = camword_intersection(pcamwords,
1✔
1877
                                                         full_spectros_only=False)
1878
    elif descriptor == 'psfnight':
1✔
1879
        ## Count number of exposures each camera is present for
1880
        camcheck = {}
1✔
1881
        for camword in pcamwords:
1✔
1882
            for cam in decode_camword(camword):
1✔
1883
                if cam in camcheck:
1✔
1884
                    camcheck[cam] += 1
1✔
1885
                else:
1886
                    camcheck[cam] = 1
1✔
1887
        ## if exists in 3 or more exposures, then include it
1888
        goodcams = []
1✔
1889
        for cam,camcount in camcheck.items():
1✔
1890
            if camcount >= 3:
1✔
1891
                goodcams.append(cam)
1✔
1892
        joint_prow['PROCCAMWORD'] = create_camword(goodcams)
1✔
1893
    else:
UNCOV
1894
        log.warning("Warning asked to produce joint proc table row for unknown"
×
1895
                    + f" job description {descriptor}")
1896

1897
    joint_prow = assign_dependency(joint_prow, dependency=prows)
1✔
1898
    return joint_prow, internal_id
1✔
1899

1900
def make_exposure_prow(erow, int_id, calibjobs, jobdesc=None):
1✔
1901
    prow = erow_to_prow(erow)
1✔
1902
    prow['INTID'] = int_id
1✔
1903
    int_id += 1
1✔
1904
    if jobdesc is None:
1✔
1905
        prow['JOBDESC'] = prow['OBSTYPE']
1✔
1906
    else:
1907
        prow['JOBDESC'] = jobdesc
1✔
1908
    prow = define_and_assign_dependency(prow, calibjobs)
1✔
1909
    return prow, int_id
1✔
1910

1911
def make_tnight_prow(prows, calibjobs, internal_id):
1✔
1912
    """
1913
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
1914
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
1915
    input prows).
1916

1917
    Args:
1918
        prows, list or array of dicts. Unsumbitted rows corresponding to the individual prestdstar jobs that are
1919
            the first steps of tilenight.
1920
        calibjobs, dict. Dictionary containing keys that each corresponds to a Table.Row or
1921
            None, with each table.Row() value corresponding to a calibration job
1922
            on which the tilenight job depends.
1923
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1924

1925
    Returns:
1926
        dict: Row of a processing table corresponding to the tilenight job.
1927
    """
1928
    first_row = table_row_to_dict(prows[0])
1✔
1929
    joint_prow = first_row.copy()
1✔
1930

1931
    joint_prow['INTID'] = internal_id
1✔
1932
    joint_prow['JOBDESC'] = 'tilenight'
1✔
1933
    joint_prow['LATEST_QID'] = -99
1✔
1934
    joint_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
1✔
1935
    joint_prow['SUBMIT_DATE'] = -99
1✔
1936
    joint_prow['STATUS'] = 'UNSUBMITTED'
1✔
1937
    joint_prow['SCRIPTNAME'] = ''
1✔
1938
    joint_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int)
1✔
1939

1940
    joint_prow = define_and_assign_dependency(joint_prow, calibjobs, use_tilenight=True)
1✔
1941

1942
    return joint_prow
1✔
1943

1944
def make_redshift_prow(prows, tnights, descriptor, internal_id):
1✔
1945
    """
1946
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
1947
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
1948
    input prows).
1949

1950
    Args:
1951
        prows, list or array of dicts. Unsumbitted rows corresponding to the individual prestdstar jobs that are
1952
            the first steps of tilenight.
1953
        tnights, list or array of Table.Row objects. Rows corresponding to the tilenight jobs on which the redshift job depends.
1954
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1955

1956
    Returns:
1957
        dict: Row of a processing table corresponding to the tilenight jobs.
1958
    """
UNCOV
1959
    first_row = table_row_to_dict(prows[0])
×
UNCOV
1960
    redshift_prow = first_row.copy()
×
1961

UNCOV
1962
    redshift_prow['INTID'] = internal_id
×
UNCOV
1963
    redshift_prow['JOBDESC'] = descriptor
×
UNCOV
1964
    redshift_prow['LATEST_QID'] = -99
×
UNCOV
1965
    redshift_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
×
UNCOV
1966
    redshift_prow['SUBMIT_DATE'] = -99
×
UNCOV
1967
    redshift_prow['STATUS'] = 'UNSUBMITTED'
×
UNCOV
1968
    redshift_prow['SCRIPTNAME'] = ''
×
UNCOV
1969
    redshift_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int)
×
1970

UNCOV
1971
    redshift_prow = assign_dependency(redshift_prow,dependency=tnights)
×
1972

UNCOV
1973
    return redshift_prow
×
1974

1975
def checkfor_and_submit_joint_job(ptable, arcs, flats, sciences, calibjobs,
1✔
1976
                                  lasttype, internal_id, z_submit_types=None, dry_run=0,
1977
                                  queue='realtime', reservation=None, strictly_successful=False,
1978
                                  check_for_outputs=True, resubmit_partial_complete=True,
1979
                                  system_name=None):
1980
    """
1981
    Takes all the state-ful data from daily processing and determines whether a joint fit needs to be submitted. Places
1982
    the decision criteria into a single function for easier maintainability over time. These are separate from the
1983
    new standard manifest*.json method of indicating a calibration sequence is complete. That is checked independently
1984
    elsewhere and doesn't interact with this.
1985

1986
    Args:
1987
        ptable (Table): Processing table of all exposures that have been processed.
1988
        arcs (list of dict): list of the individual arc jobs to be used for the psfnight (NOT all
1989
            the arcs, if multiple sets existed). May be empty if none identified yet.
1990
        flats (list of dict): list of the individual flat jobs to be used for the nightlyflat (NOT
1991
            all the flats, if multiple sets existed). May be empty if none identified yet.
1992
        sciences (list of dict): list of the most recent individual prestdstar science exposures
1993
            (if currently processing that tile). May be empty if none identified yet.
1994
        calibjobs (dict): Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
1995
            and 'nightlyflat'. Each key corresponds to a Table.Row or
1996
            None. The table.Row() values are for the corresponding
1997
            calibration job.
1998
        lasttype (str or None): the obstype of the last individual exposure row to be processed.
1999
        internal_id (int): an internal identifier unique to each job. Increments with each new job. This
2000
            is the smallest unassigned value.
2001
        z_submit_types (list of str): The "group" types of redshifts that should be submitted with each
2002
            exposure. If not specified or None, then no redshifts are submitted.
2003
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
2004
            0 which runs the code normally.
2005
            1 writes all files but doesn't submit any jobs to Slurm.
2006
            2 writes tables but doesn't write scripts or submit anything.
2007
            3 Doesn't write or submit anything but queries Slurm normally for job status.
2008
            4 Doesn't write, submit jobs, or query Slurm.
2009
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
2010
        queue (str, optional): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
2011
        reservation (str, optional): The reservation to submit jobs to. If None, it is not submitted to a reservation.
2012
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
2013
            less desirable because e.g. the sciences can run with SVN default calibrations rather
2014
            than failing completely from failed calibrations. Default is False.
2015
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
2016
            data products for the script being submitted. If all files exist and this is True,
2017
            then the script will not be submitted. If some files exist and this is True, only the
2018
            subset of the cameras without the final data products will be generated and submitted.
2019
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
2020
            jobs with some prior data are pruned using PROCCAMWORD to only process the
2021
            remaining cameras not found to exist.
2022
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, permutter-gpu
2023

2024
    Returns:
2025
        tuple: A tuple containing:
2026

2027
        * ptable, Table, Processing table of all exposures that have been processed.
2028
        * calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
2029
          and 'nightlyflat'. Each key corresponds to a Table.Row or
2030
          None. The table.Row() values are for the corresponding
2031
          calibration job.
2032
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
2033
          (if currently processing that tile). May be empty if none identified yet or
2034
          we just submitted them for processing.
2035
        * internal_id, int, if no job is submitted, this is the same as the input, otherwise it is incremented upward from
2036
          from the input such that it represents the smallest unused ID.
2037
    """
UNCOV
2038
    if lasttype == 'science' and len(sciences) > 0:
×
UNCOV
2039
        log = get_logger()
×
UNCOV
2040
        skysubonly = np.array([sci['LASTSTEP'] == 'skysub' for sci in sciences])
×
UNCOV
2041
        if np.all(skysubonly):
×
UNCOV
2042
            log.error("Identified all exposures in joint fitting request as skysub-only. Not submitting")
×
2043
            sciences = []
×
2044
            return ptable, calibjobs, sciences, internal_id
×
2045

2046
        if np.any(skysubonly):
×
UNCOV
2047
            log.error("Identified skysub-only exposures in joint fitting request")
×
UNCOV
2048
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
UNCOV
2049
            log.info("LASTSTEP's: {}".format([row['LASTSTEP'] for row in sciences]))
×
UNCOV
2050
            sciences = (np.array(sciences,dtype=object)[~skysubonly]).tolist()
×
UNCOV
2051
            log.info("Removed skysub only exposures in joint fitting:")
×
2052
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
2053
            log.info("LASTSTEP's: {}".format([row['LASTSTEP'] for row in sciences]))
×
2054

2055
        from collections import Counter
×
UNCOV
2056
        tiles = np.array([sci['TILEID'] for sci in sciences])
×
2057
        counts = Counter(tiles)
×
UNCOV
2058
        if len(counts.most_common()) > 1:
×
UNCOV
2059
            log.error("Identified more than one tile in a joint fitting request")
×
UNCOV
2060
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
UNCOV
2061
            log.info("Tileid's: {}".format(tiles))
×
UNCOV
2062
            log.info("Returning without joint fitting any of these exposures.")
×
2063
            # most_common, nmost_common = counts.most_common()[0]
2064
            # if most_common == -99:
2065
            #     most_common, nmost_common = counts.most_common()[1]
2066
            # log.warning(f"Given multiple tiles to jointly fit: {counts}. "+
2067
            #             "Only processing the most common non-default " +
2068
            #             f"tile: {most_common} with {nmost_common} exposures")
2069
            # sciences = (np.array(sciences,dtype=object)[tiles == most_common]).tolist()
2070
            # log.info("Tiles and exposure id's being submitted for joint fitting:")
2071
            # log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
2072
            # log.info("Tileid's: {}".format([row['TILEID'] for row in sciences]))
UNCOV
2073
            sciences = []
×
2074
            return ptable, calibjobs, sciences, internal_id
×
2075

UNCOV
2076
        ptable, tilejob, internal_id = science_joint_fit(ptable, sciences, internal_id, z_submit_types=z_submit_types,
×
2077
                                                         dry_run=dry_run, queue=queue, reservation=reservation,
2078
                                                         strictly_successful=strictly_successful,
2079
                                                         check_for_outputs=check_for_outputs,
2080
                                                         resubmit_partial_complete=resubmit_partial_complete,
2081
                                                         system_name=system_name)
UNCOV
2082
        if tilejob is not None:
×
UNCOV
2083
            sciences = []
×
2084

UNCOV
2085
    elif lasttype == 'flat' and calibjobs['nightlyflat'] is None and len(flats) == 12:
×
2086
        ## Note here we have an assumption about the number of expected flats being greater than 11
UNCOV
2087
        ptable, calibjobs['nightlyflat'], internal_id \
×
2088
            = flat_joint_fit(ptable, flats, internal_id, dry_run=dry_run, queue=queue,
2089
                             reservation=reservation, strictly_successful=strictly_successful,
2090
                             check_for_outputs=check_for_outputs,
2091
                             resubmit_partial_complete=resubmit_partial_complete,
2092
                             system_name=system_name
2093
                            )
2094

UNCOV
2095
    elif lasttype == 'arc' and calibjobs['psfnight'] is None and len(arcs) == 5:
×
2096
        ## Note here we have an assumption about the number of expected arcs being greater than 4
UNCOV
2097
        ptable, calibjobs['psfnight'], internal_id \
×
2098
            = arc_joint_fit(ptable, arcs, internal_id, dry_run=dry_run, queue=queue,
2099
                            reservation=reservation, strictly_successful=strictly_successful,
2100
                            check_for_outputs=check_for_outputs,
2101
                            resubmit_partial_complete=resubmit_partial_complete,
2102
                            system_name=system_name
2103
                            )
UNCOV
2104
    return ptable, calibjobs, sciences, internal_id
×
2105

2106
def submit_tilenight_and_redshifts(ptable, sciences, calibjobs, internal_id, dry_run=0,
1✔
2107
                                  queue='realtime', reservation=None, strictly_successful=False,
2108
                                  check_for_outputs=True, resubmit_partial_complete=True,
2109
                                  system_name=None,use_specter=False, extra_job_args=None):
2110
    """
2111
    Takes all the state-ful data from daily processing and determines whether a tilenight job needs to be submitted.
2112

2113
    Args:
2114
        ptable (Table): Processing table of all exposures that have been processed.
2115
        sciences (list of dict): list of the most recent individual prestdstar science exposures
2116
            (if currently processing that tile). May be empty if none identified yet.
2117
        internal_id (int): an internal identifier unique to each job. Increments with each new job. This
2118
            is the smallest unassigned value.
2119
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
2120
            0 which runs the code normally.
2121
            1 writes all files but doesn't submit any jobs to Slurm.
2122
            2 writes tables but doesn't write scripts or submit anything.
2123
            3 Doesn't write or submit anything but queries Slurm normally for job status.
2124
            4 Doesn't write, submit jobs, or query Slurm.
2125
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
2126
        queue (str, optional): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
2127
        reservation (str, optional): The reservation to submit jobs to. If None, it is not submitted to a reservation.
2128
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
2129
            less desirable because e.g. the sciences can run with SVN default calibrations rather
2130
            than failing completely from failed calibrations. Default is False.
2131
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
2132
            data products for the script being submitted. If all files exist and this is True,
2133
            then the script will not be submitted. If some files exist and this is True, only the
2134
            subset of the cameras without the final data products will be generated and submitted.
2135
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
2136
            jobs with some prior data are pruned using PROCCAMWORD to only process the
2137
            remaining cameras not found to exist.
2138
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, permutter-gpu
2139
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
2140
        extra_job_args (dict, optional): Dictionary with key-value pairs that specify additional
2141
            information used for a specific type of job. Examples include
2142
            laststeps for tilenight, z_submit_types for redshifts, etc.
2143

2144
    Returns:
2145
        tuple: A tuple containing:
2146

2147
        * ptable, Table, Processing table of all exposures that have been processed.
2148
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
2149
          (if currently processing that tile). May be empty if none identified yet or
2150
          we just submitted them for processing.
2151
        * internal_id, int, if no job is submitted, this is the same as the input, otherwise it is incremented upward from
2152
          from the input such that it represents the smallest unused ID.
2153
    """
2154
    ptable, tnight, internal_id = submit_tilenight(ptable, sciences, calibjobs, internal_id,
1✔
2155
                                             queue=queue, reservation=reservation,
2156
                                             dry_run=dry_run, strictly_successful=strictly_successful,
2157
                                             resubmit_partial_complete=resubmit_partial_complete,
2158
                                             system_name=system_name,use_specter=use_specter,
2159
                                             extra_job_args=extra_job_args)
2160

2161
    z_submit_types = None
1✔
2162
    if 'z_submit_types'  in extra_job_args:
1✔
2163
        z_submit_types = extra_job_args['z_submit_types']
1✔
2164
        
2165
    ptable, internal_id = submit_redshifts(ptable, sciences, tnight, internal_id,
1✔
2166
                                    queue=queue, reservation=reservation,
2167
                                    dry_run=dry_run, strictly_successful=strictly_successful,
2168
                                    check_for_outputs=check_for_outputs,
2169
                                    resubmit_partial_complete=resubmit_partial_complete,
2170
                                    z_submit_types=z_submit_types,
2171
                                    system_name=system_name)
2172

2173
    if tnight is not None:
1✔
2174
        sciences = []
1✔
2175

2176
    return ptable, sciences, internal_id
1✔
2177

2178
def set_calibrator_flag(prows, ptable):
1✔
2179
    """
2180
    Sets the "CALIBRATOR" column of a procesing table row to 1 (integer representation of True)
2181
     for all input rows. Used within joint fitting code to flag the exposures that were input
2182
     to the psfnight or nightlyflat for later reference.
2183

2184
    Args:
2185
        prows, list or array of Table.Rows or dicts. The rows corresponding to the individual exposure jobs that are
2186
            inputs to the joint fit.
2187
        ptable, Table. The processing table where each row is a processed job.
2188

2189
    Returns:
2190
        Table: The same processing table as input except with added rows for the joint fit job and, in the case
2191
        of a stdstarfit, the poststdstar science exposure jobs.
2192
    """
2193
    for prow in prows:
1✔
2194
        ptable['CALIBRATOR'][ptable['INTID'] == prow['INTID']] = 1
1✔
2195
    return ptable
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc