• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

desihub / desispec / 13688772709

06 Mar 2025 12:58AM UTC coverage: 38.985% (-0.004%) from 38.989%
13688772709

push

github

web-flow
Allow user to specify exposures or tiles to resubmit (PR #2450)

Allow user to specify exposures or tiles to resubmit

7 of 15 new or added lines in 1 file covered. (46.67%)

2 existing lines in 1 file now uncovered.

12950 of 33218 relevant lines covered (38.98%)

0.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.91
/py/desispec/workflow/processing.py
1
"""
2
desispec.workflow.processing
3
============================
4

5
"""
6
import sys, os, glob
1✔
7
import json
1✔
8
from astropy.io import fits
1✔
9
from astropy.table import Table, join
1✔
10
import numpy as np
1✔
11

12
import time, datetime
1✔
13
from collections import OrderedDict
1✔
14
import subprocess
1✔
15

16
from desispec.scripts.link_calibnight import derive_include_exclude
1✔
17
from desispec.scripts.tile_redshifts import generate_tile_redshift_scripts
1✔
18
from desispec.workflow.redshifts import get_ztile_script_pathname, \
1✔
19
    get_ztile_relpath, \
20
    get_ztile_script_suffix
21
from desispec.workflow.exptable import read_minimal_science_exptab_cols
1✔
22
from desispec.workflow.queue import get_resubmission_states, update_from_queue, \
1✔
23
    queue_info_from_qids, get_queue_states_from_qids, update_queue_state_cache, \
24
    get_non_final_states
25
from desispec.workflow.timing import what_night_is_it
1✔
26
from desispec.workflow.desi_proc_funcs import get_desi_proc_batch_file_pathname, \
1✔
27
    create_desi_proc_batch_script, \
28
    get_desi_proc_batch_file_path, \
29
    get_desi_proc_tilenight_batch_file_pathname, \
30
    create_desi_proc_tilenight_batch_script, create_linkcal_batch_script
31
from desispec.workflow.batch import parse_reservation
1✔
32
from desispec.workflow.utils import pathjoin, sleep_and_report, \
1✔
33
    load_override_file
34
from desispec.workflow.tableio import write_table, load_table
1✔
35
from desispec.workflow.proctable import table_row_to_dict, erow_to_prow, \
1✔
36
    read_minimal_tilenight_proctab_cols, read_minimal_full_proctab_cols, \
37
    update_full_ptab_cache, default_prow, get_default_qid
38
from desiutil.log import get_logger
1✔
39

40
from desispec.io import findfile, specprod_root
1✔
41
from desispec.io.util import decode_camword, create_camword, \
1✔
42
    difference_camwords, \
43
    camword_to_spectros, camword_union, camword_intersection, parse_badamps
44

45

46
#################################################
47
############## Misc Functions ###################
48
#################################################
49
def night_to_starting_iid(night=None):
1✔
50
    """
51
    Creates an internal ID for a given night. The resulting integer is an 8 digit number.
52
    The digits are YYMMDDxxx where YY is the years since 2000, MM and DD are the month and day. xxx are 000,
53
    and are incremented for up to 1000 unique job ID's for a given night.
54

55
    Args:
56
        night (str or int): YYYYMMDD of the night to get the starting internal ID for.
57

58
    Returns:
59
        int: 9 digit number consisting of YYMMDD000. YY is years after 2000, MMDD is month and day.
60
        000 being the starting job number (0).
61
    """
62
    if night is None:
1✔
63
        night = what_night_is_it()
×
64
    night = int(night)
1✔
65
    internal_id = (night - 20000000) * 1000
1✔
66
    return internal_id
1✔
67

68
class ProcessingParams():
1✔
69
    def __init__(self, dry_run_level=0, queue='realtime',
1✔
70
                 reservation=None, strictly_successful=True,
71
                 check_for_outputs=True,
72
                 resubmit_partial_complete=True,
73
                 system_name='perlmutter', use_specter=True):
74

75
        self.dry_run_level = dry_run_level
×
76
        self.system_name = system_name
×
77
        self.queue = queue
×
78
        self.reservation = reservation
×
79
        self.strictly_successful = strictly_successful
×
80
        self.check_for_outputs = check_for_outputs
×
81
        self.resubmit_partial_complete = resubmit_partial_complete
×
82
        self.use_specter = use_specter
×
83

84
#################################################
85
############ Script Functions ###################
86
#################################################
87
def batch_script_name(prow):
1✔
88
    """
89
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, JOBDESC, PROCCAMWORD defined)
90
    and determines the script file pathname as defined by desi_proc's helper functions.
91

92
    Args:
93
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
94

95
    Returns:
96
        str: The complete pathname to the script file, as it is defined within the desi_proc ecosystem.
97
    """
98
    expids = prow['EXPID']
1✔
99
    if len(expids) == 0:
1✔
100
        expids = None
1✔
101
    if prow['JOBDESC'] == 'tilenight':
1✔
102
        pathname = get_desi_proc_tilenight_batch_file_pathname(night = prow['NIGHT'], tileid=prow['TILEID'])
1✔
103
    else:
104
        pathname = get_desi_proc_batch_file_pathname(night = prow['NIGHT'], exp=expids, \
1✔
105
                                             jobdesc=prow['JOBDESC'], cameras=prow['PROCCAMWORD'])
106
    scriptfile =  pathname + '.slurm'
1✔
107
    return scriptfile
1✔
108

109
def get_jobdesc_to_file_map():
1✔
110
    """
111
    Returns a mapping of job descriptions to the filenames of the output files
112

113
    Args:
114
        None
115

116
    Returns:
117
        dict. Dictionary with keys as lowercase job descriptions and to the
118
            filename of their expected outputs.
119

120
    """
121
    return {'prestdstar': 'sframe',
1✔
122
            'stdstarfit': 'stdstars',
123
            'poststdstar': 'cframe',
124
            'nightlybias': 'biasnight',
125
            # 'ccdcalib': 'badcolumns',
126
            'badcol': 'badcolumns',
127
            'arc': 'fitpsf',
128
            'flat': 'fiberflat',
129
            'psfnight': 'psfnight',
130
            'nightlyflat': 'fiberflatnight',
131
            'spectra': 'spectra_tile',
132
            'coadds': 'coadds_tile',
133
            'redshift': 'redrock_tile'}
134

135
def get_file_to_jobdesc_map():
1✔
136
    """
137
    Returns a mapping of output filenames to job descriptions
138

139
    Args:
140
        None
141

142
    Returns:
143
        dict. Dictionary with keys as filename of their expected outputs to
144
            the lowercase job descriptions
145
            .
146

147
    """
148
    job_to_file_map = get_jobdesc_to_file_map()
×
149
    job_to_file_map.pop('badcol') # these files can also be in a ccdcalib job
×
150
    job_to_file_map.pop('nightlybias') # these files can also be in a ccdcalib job
×
151
    return {value: key for key, value in job_to_file_map.items()}
×
152

153
def check_for_outputs_on_disk(prow, resubmit_partial_complete=True):
1✔
154
    """
155
    Args:
156
        prow (Table.Row or dict): Must include keyword accessible definitions for processing_table columns found in
157
            desispect.workflow.proctable.get_processing_table_column_defs()
158
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
159
            jobs with some prior data are pruned using PROCCAMWORD to only process the
160
            remaining cameras not found to exist.
161

162
    Returns:
163
        Table.Row or dict: The same prow type and keywords as input except with modified values updated to reflect
164
        the change in job status after creating and submitting the job for processing.
165
    """
166
    prow['STATUS'] = 'UNKNOWN'
1✔
167
    log = get_logger()
1✔
168

169
    if prow['JOBDESC'] in ['linkcal', 'ccdcalib']:
1✔
170
        log.info(f"jobdesc={prow['JOBDESC']} has indeterminated outputs, so "
1✔
171
                + "not checking for files on disk.")
172
        return prow
1✔
173

174
    job_to_file_map = get_jobdesc_to_file_map()
1✔
175

176
    night = prow['NIGHT']
1✔
177
    if prow['JOBDESC'] in ['cumulative','pernight-v0','pernight','perexp']:
1✔
178
        filetype = 'redrock_tile'
1✔
179
    else:
180
        filetype = job_to_file_map[prow['JOBDESC']]
1✔
181
    orig_camword = prow['PROCCAMWORD']
1✔
182

183
    ## if spectro based, look for spectros, else look for cameras
184
    if prow['JOBDESC'] in ['stdstarfit','spectra','coadds','redshift']:
1✔
185
        ## Spectrograph based
186
        spectros = camword_to_spectros(prow['PROCCAMWORD'])
×
187
        n_desired = len(spectros)
×
188
        ## Suppress outputs about using tile based files in findfile if only looking for stdstarfits
189
        if prow['JOBDESC'] == 'stdstarfit':
×
190
            tileid = None
×
191
        else:
192
            tileid = prow['TILEID']
×
193
        expid = prow['EXPID'][0]
×
194
        existing_spectros = []
×
195
        for spectro in spectros:
×
196
            if os.path.exists(findfile(filetype=filetype, night=night, expid=expid, spectrograph=spectro, tile=tileid)):
×
197
                existing_spectros.append(spectro)
×
198
        completed = (len(existing_spectros) == n_desired)
×
199
        if not completed and resubmit_partial_complete and len(existing_spectros) > 0:
×
200
            existing_camword = 'a' + ''.join([str(spec) for spec in sorted(existing_spectros)])
×
201
            prow['PROCCAMWORD'] = difference_camwords(prow['PROCCAMWORD'],existing_camword)
×
202
    elif prow['JOBDESC'] in ['cumulative','pernight-v0','pernight','perexp']:
1✔
203
        ## Spectrograph based
204
        spectros = camword_to_spectros(prow['PROCCAMWORD'])
1✔
205
        n_desired = len(spectros)
1✔
206
        ## Suppress outputs about using tile based files in findfile if only looking for stdstarfits
207
        tileid = prow['TILEID']
1✔
208
        expid = prow['EXPID'][0]
1✔
209
        redux_dir = specprod_root()
1✔
210
        outdir = os.path.join(redux_dir,get_ztile_relpath(tileid,group=prow['JOBDESC'],night=night,expid=expid))
1✔
211
        suffix = get_ztile_script_suffix(tileid, group=prow['JOBDESC'], night=night, expid=expid)
1✔
212
        existing_spectros = []
1✔
213
        for spectro in spectros:
1✔
214
            if os.path.exists(os.path.join(outdir, f"redrock-{spectro}-{suffix}.fits")):
1✔
215
                existing_spectros.append(spectro)
×
216
        completed = (len(existing_spectros) == n_desired)
1✔
217
        if not completed and resubmit_partial_complete and len(existing_spectros) > 0:
1✔
218
            existing_camword = 'a' + ''.join([str(spec) for spec in sorted(existing_spectros)])
×
219
            prow['PROCCAMWORD'] = difference_camwords(prow['PROCCAMWORD'],existing_camword)
×
220
    else:
221
        ## Otheriwse camera based
222
        cameras = decode_camword(prow['PROCCAMWORD'])
1✔
223
        n_desired = len(cameras)
1✔
224
        if len(prow['EXPID']) > 0:
1✔
225
            expid = prow['EXPID'][0]
1✔
226
        else:
227
            expid = None
1✔
228
        if len(prow['EXPID']) > 1 and prow['JOBDESC'] not in ['psfnight','nightlyflat']:
1✔
229
            log.warning(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']}. This job type only makes " +
×
230
                     f"sense with a single exposure. Proceeding with {expid}.")
231
        missing_cameras = []
1✔
232
        for cam in cameras:
1✔
233
            if not os.path.exists(findfile(filetype=filetype, night=night, expid=expid, camera=cam)):
1✔
234
                missing_cameras.append(cam)
1✔
235
        completed = (len(missing_cameras) == 0)
1✔
236
        if not completed and resubmit_partial_complete and len(missing_cameras) < n_desired:
1✔
237
            prow['PROCCAMWORD'] = create_camword(missing_cameras)
×
238

239
    if completed:
1✔
240
        prow['STATUS'] = 'COMPLETED'
×
241
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} already has " +
×
242
                 f"the desired {n_desired} {filetype}'s. Not submitting this job.")
243
    elif resubmit_partial_complete and orig_camword != prow['PROCCAMWORD']:
1✔
244
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} already has " +
×
245
                 f"some {filetype}'s. Submitting smaller camword={prow['PROCCAMWORD']}.")
246
    elif not resubmit_partial_complete:
1✔
247
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} doesn't have all " +
×
248
                 f"{filetype}'s and resubmit_partial_complete=False. "+
249
                 f"Submitting full camword={prow['PROCCAMWORD']}.")
250
    else:
251
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} has no " +
1✔
252
                 f"existing {filetype}'s. Submitting full camword={prow['PROCCAMWORD']}.")
253
    return prow
1✔
254

255
def create_and_submit(prow, queue='realtime', reservation=None, dry_run=0,
1✔
256
                      joint=False, strictly_successful=False,
257
                      check_for_outputs=True, resubmit_partial_complete=True,
258
                      system_name=None, use_specter=False,
259
                      extra_job_args=None):
260
    """
261
    Wrapper script that takes a processing table row and three modifier keywords, creates a submission script for the
262
    compute nodes, and then submits that script to the Slurm scheduler with appropriate dependencies.
263

264
    Args:
265
        prow (Table.Row or dict): Must include keyword accessible definitions for processing_table columns found in
266
            desispect.workflow.proctable.get_processing_table_column_defs()
267
        queue (str, optional): The name of the NERSC Slurm queue to submit to. Default is the realtime queue.
268
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
269
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
270
            0 which runs the code normally.
271
            1 writes all files but doesn't submit any jobs to Slurm.
272
            2 writes tables but doesn't write scripts or submit anything.
273
            3 Doesn't write or submit anything but queries Slurm normally for job status.
274
            4 Doesn't write, submit jobs, or query Slurm.
275
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
276
        joint (bool, optional): Whether this is a joint fitting job (the job involves multiple exposures) and therefore needs to be
277
            run with desi_proc_joint_fit. Default is False.
278
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
279
            less desirable because e.g. the sciences can run with SVN default calibrations rather
280
            than failing completely from failed calibrations. Default is False.
281
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
282
            data products for the script being submitted. If all files exist and this is True,
283
            then the script will not be submitted. If some files exist and this is True, only the
284
            subset of the cameras without the final data products will be generated and submitted.
285
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
286
            jobs with some prior data are pruned using PROCCAMWORD to only process the
287
            remaining cameras not found to exist.
288
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
289
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
290
        extra_job_args (dict): Dictionary with key-value pairs that specify additional
291
            information used for a specific type of job. Examples include refnight
292
            and include/exclude lists for linkcals, laststeps for for tilenight, etc.
293

294
    Returns:
295
        Table.Row or dict: The same prow type and keywords as input except with modified values updated to reflect
296
        the change in job status after creating and submitting the job for processing.
297

298
    Note:
299
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
300
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
301
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
302
    """
303
    orig_prow = prow.copy()
1✔
304
    if check_for_outputs:
1✔
305
        prow = check_for_outputs_on_disk(prow, resubmit_partial_complete)
1✔
306
        if prow['STATUS'].upper() == 'COMPLETED':
1✔
307
            return prow
×
308

309
    prow = create_batch_script(prow, queue=queue, dry_run=dry_run, joint=joint,
1✔
310
                               system_name=system_name, use_specter=use_specter,
311
                               extra_job_args=extra_job_args)
312
    prow = submit_batch_script(prow, reservation=reservation, dry_run=dry_run,
1✔
313
                               strictly_successful=strictly_successful)
314

315
    ## If resubmitted partial, the PROCCAMWORD and SCRIPTNAME will correspond
316
    ## to the pruned values. But we want to
317
    ## retain the full job's value, so get those from the old job.
318
    if resubmit_partial_complete:
1✔
319
        prow['PROCCAMWORD'] = orig_prow['PROCCAMWORD']
1✔
320
        prow['SCRIPTNAME'] = orig_prow['SCRIPTNAME']
1✔
321
    return prow
1✔
322

323
def desi_link_calibnight_command(prow, refnight, include=None):
1✔
324
    """
325
    Wrapper script that takes a processing table row (or dictionary with
326
    REFNIGHT, NIGHT, PROCCAMWORD defined) and determines the proper command
327
    line call to link data defined by the input row/dict.
328

329
    Args:
330
        prow (Table.Row or dict): Must include keyword accessible definitions
331
            for 'NIGHT', 'REFNIGHT', and 'PROCCAMWORD'.
332
        refnight (str or int): The night with a valid set of calibrations
333
            be created.
334
        include (list): The filetypes to include in the linking.
335
    Returns:
336
        str: The proper command to be submitted to desi_link_calibnight
337
            to process the job defined by the prow values.
338
    """
339
    cmd = 'desi_link_calibnight'
1✔
340
    cmd += f' --refnight={refnight}'
1✔
341
    cmd += f' --newnight={prow["NIGHT"]}'
1✔
342
    cmd += f' --cameras={prow["PROCCAMWORD"]}'
1✔
343
    if include is not None:
1✔
344
        cmd += f' --include=' + ','.join(list(include))
1✔
345
    return cmd
1✔
346

347
def desi_proc_command(prow, system_name, use_specter=False, queue=None):
1✔
348
    """
349
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, OBSTYPE, JOBDESC, PROCCAMWORD defined)
350
    and determines the proper command line call to process the data defined by the input row/dict.
351

352
    Args:
353
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
354
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, perlmutter-gpu
355
        queue (str, optional): The name of the NERSC Slurm queue to submit to. Default is None (which leaves it to the desi_proc default).
356
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
357

358
    Returns:
359
        str: The proper command to be submitted to desi_proc to process the job defined by the prow values.
360
    """
361
    cmd = 'desi_proc'
1✔
362
    cmd += ' --batch'
1✔
363
    cmd += ' --nosubmit'
1✔
364
    if queue is not None:
1✔
365
        cmd += f' -q {queue}'
1✔
366
    if prow['OBSTYPE'].lower() == 'science':
1✔
367
        if prow['JOBDESC'] == 'prestdstar':
×
368
            cmd += ' --nostdstarfit --nofluxcalib'
×
369
        elif prow['JOBDESC'] == 'poststdstar':
×
370
            cmd += ' --noprestdstarfit --nostdstarfit'
×
371

372
        if use_specter:
×
373
            cmd += ' --use-specter'
×
374
    elif prow['JOBDESC'] in ['flat', 'prestdstar'] and use_specter:
1✔
375
        cmd += ' --use-specter'
×
376
    pcamw = str(prow['PROCCAMWORD'])
1✔
377
    cmd += f" --cameras={pcamw} -n {prow['NIGHT']}"
1✔
378
    if len(prow['EXPID']) > 0:
1✔
379
        ## If ccdcalib job without a dark exposure, don't assign the flat expid
380
        ## since it would incorrectly process the flat using desi_proc
381
        if prow['OBSTYPE'].lower() != 'flat' or prow['JOBDESC'] != 'ccdcalib':
1✔
382
            cmd += f" -e {prow['EXPID'][0]}"
1✔
383
    if prow['BADAMPS'] != '':
1✔
384
        cmd += ' --badamps={}'.format(prow['BADAMPS'])
×
385
    return cmd
1✔
386

387
def desi_proc_joint_fit_command(prow, queue=None):
1✔
388
    """
389
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, OBSTYPE, PROCCAMWORD defined)
390
    and determines the proper command line call to process the data defined by the input row/dict.
391

392
    Args:
393
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
394
        queue (str): The name of the NERSC Slurm queue to submit to. Default is None (which leaves it to the desi_proc default).
395

396
    Returns:
397
        str: The proper command to be submitted to desi_proc_joint_fit
398
            to process the job defined by the prow values.
399
    """
400
    cmd = 'desi_proc_joint_fit'
1✔
401
    cmd += ' --batch'
1✔
402
    cmd += ' --nosubmit'
1✔
403
    if queue is not None:
1✔
404
        cmd += f' -q {queue}'
1✔
405

406
    descriptor = prow['OBSTYPE'].lower()
1✔
407

408
    night = prow['NIGHT']
1✔
409
    specs = str(prow['PROCCAMWORD'])
1✔
410
    expid_str = ','.join([str(eid) for eid in prow['EXPID']])
1✔
411

412
    cmd += f' --obstype {descriptor}'
1✔
413
    cmd += f' --cameras={specs} -n {night}'
1✔
414
    if len(expid_str) > 0:
1✔
415
        cmd += f' -e {expid_str}'
1✔
416
    return cmd
1✔
417

418

419
def create_batch_script(prow, queue='realtime', dry_run=0, joint=False,
1✔
420
                        system_name=None, use_specter=False, extra_job_args=None):
421
    """
422
    Wrapper script that takes a processing table row and three modifier keywords and creates a submission script for the
423
    compute nodes.
424

425
    Args:
426
        prow, Table.Row or dict. Must include keyword accessible definitions for processing_table columns found in
427
            desispect.workflow.proctable.get_processing_table_column_defs()
428
        queue, str. The name of the NERSC Slurm queue to submit to. Default is the realtime queue.
429
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
430
            0 which runs the code normally.
431
            1 writes all files but doesn't submit any jobs to Slurm.
432
            2 writes tables but doesn't write scripts or submit anything.
433
            3 Doesn't write or submit anything but queries Slurm normally for job status.
434
            4 Doesn't write, submit jobs, or query Slurm.
435
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
436
        joint, bool. Whether this is a joint fitting job (the job involves multiple exposures) and therefore needs to be
437
            run with desi_proc_joint_fit when not using tilenight. Default is False.
438
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
439
        use_specter, bool, optional. Default is False. If True, use specter, otherwise use gpu_specter by default.
440
        extra_job_args (dict): Dictionary with key-value pairs that specify additional
441
            information used for a specific type of job. Examples include refnight
442
            and include/exclude lists for linkcal, laststeps for tilenight, etc.
443

444
    Returns:
445
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
446
        scriptname.
447

448
    Note:
449
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
450
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
451
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
452
    """
453
    log = get_logger()
1✔
454

455
    if extra_job_args is None:
1✔
456
        extra_job_args = {}
1✔
457

458
    if prow['JOBDESC'] in ['perexp','pernight','pernight-v0','cumulative']:
1✔
459
        if dry_run > 1:
1✔
460
            scriptpathname = get_ztile_script_pathname(tileid=prow['TILEID'],group=prow['JOBDESC'],
1✔
461
                                                               night=prow['NIGHT'], expid=prow['EXPID'][0])
462

463
            log.info("Output file would have been: {}".format(scriptpathname))
1✔
464
        else:
465
            #- run zmtl for cumulative redshifts but not others
466
            run_zmtl = (prow['JOBDESC'] == 'cumulative')
1✔
467
            no_afterburners = False
1✔
468
            print(f"entering tileredshiftscript: {prow}")
1✔
469
            scripts, failed_scripts = generate_tile_redshift_scripts(tileid=prow['TILEID'], group=prow['JOBDESC'],
1✔
470
                                                                     nights=[prow['NIGHT']], expids=prow['EXPID'],
471
                                                                     batch_queue=queue, system_name=system_name,
472
                                                                     run_zmtl=run_zmtl,
473
                                                                     no_afterburners=no_afterburners,
474
                                                                     nosubmit=True)
475
            if len(failed_scripts) > 0:
1✔
476
                log.error(f"Redshifts failed for group={prow['JOBDESC']}, night={prow['NIGHT']}, "+
×
477
                          f"tileid={prow['TILEID']}, expid={prow['EXPID']}.")
478
                log.info(f"Returned failed scriptname is {failed_scripts}")
×
479
            elif len(scripts) > 1:
1✔
480
                log.error(f"More than one redshifts returned for group={prow['JOBDESC']}, night={prow['NIGHT']}, "+
×
481
                          f"tileid={prow['TILEID']}, expid={prow['EXPID']}.")
482
                log.info(f"Returned scriptnames were {scripts}")
×
483
            elif len(scripts) == 0:
1✔
484
                msg = f'No scripts were generated for {prow=}'
×
485
                log.critical(prow)
×
486
                raise ValueError(msg)
×
487
            else:
488
                scriptpathname = scripts[0]
1✔
489

490
    elif prow['JOBDESC'] == 'linkcal':
1✔
491
        refnight, include, exclude = -99, None, None
1✔
492
        if 'refnight' in extra_job_args:
1✔
493
            refnight = extra_job_args['refnight']
1✔
494
        if 'include' in extra_job_args:
1✔
495
            include = extra_job_args['include']
1✔
496
        if 'exclude' in extra_job_args:
1✔
497
            exclude = extra_job_args['exclude']
×
498
        include, exclude = derive_include_exclude(include, exclude)
1✔
499
        ## Fiberflatnights shouldn't to be generated with psfs from same time, so
500
        ## shouldn't link psfs without also linking fiberflatnight
501
        ## However, this should be checked at a higher level. If set here,
502
        ## go ahead and do it
503
        # if 'psfnight' in include and not 'fiberflatnight' in include:
504
        #     err = "Must link fiberflatnight if linking psfnight"
505
        #     log.error(err)
506
        #     raise ValueError(err)
507
        if dry_run > 1:
1✔
508
            scriptpathname = batch_script_name(prow)
1✔
509
            log.info("Output file would have been: {}".format(scriptpathname))
1✔
510
            cmd = desi_link_calibnight_command(prow, refnight, include)
1✔
511
            log.info("Command to be run: {}".format(cmd.split()))
1✔
512
        else:
513
            if refnight == -99:
×
514
                err = f'For {prow=} asked to link calibration but not given' \
×
515
                      + ' a valid refnight'
516
                log.error(err)
×
517
                raise ValueError(err)
×
518

519
            cmd = desi_link_calibnight_command(prow, refnight, include)
×
520
            log.info(f"Running: {cmd.split()}")
×
521
            scriptpathname = create_linkcal_batch_script(newnight=prow['NIGHT'],
×
522
                                                        cameras=prow['PROCCAMWORD'],
523
                                                        queue=queue,
524
                                                        cmd=cmd,
525
                                                        system_name=system_name)
526
    else:
527
        if prow['JOBDESC'] != 'tilenight':
1✔
528
            nightlybias, nightlycte, cte_expids = False, False, None
1✔
529
            if 'nightlybias' in extra_job_args:
1✔
530
                nightlybias = extra_job_args['nightlybias']
1✔
531
            elif prow['JOBDESC'].lower() == 'nightlybias':
1✔
532
                nightlybias = True
1✔
533
            if 'nightlycte' in extra_job_args:
1✔
534
                nightlycte = extra_job_args['nightlycte']
1✔
535
            if 'cte_expids' in extra_job_args:
1✔
536
                cte_expids = extra_job_args['cte_expids']
1✔
537
            ## run known joint jobs as joint even if unspecified
538
            ## in the future we can eliminate the need for "joint"
539
            if joint or prow['JOBDESC'].lower() in ['psfnight', 'nightlyflat']:
1✔
540
                cmd = desi_proc_joint_fit_command(prow, queue=queue)
1✔
541
                ## For consistency with how we edit the other commands, do them
542
                ## here, but future TODO would be to move these into the command
543
                ## generation itself
544
                if 'extra_cmd_args' in extra_job_args:
1✔
545
                    cmd += ' ' + ' '.join(np.atleast_1d(extra_job_args['extra_cmd_args']))
1✔
546
            else:
547
                cmd = desi_proc_command(prow, system_name, use_specter, queue=queue)
1✔
548
                if nightlybias:
1✔
549
                    cmd += ' --nightlybias'
1✔
550
                if nightlycte:
1✔
551
                    cmd += ' --nightlycte'
1✔
552
                    if cte_expids is not None:
1✔
553
                        cmd += ' --cte-expids '
1✔
554
                        cmd += ','.join(np.atleast_1d(cte_expids).astype(str))
1✔
555
        if dry_run > 1:
1✔
556
            scriptpathname = batch_script_name(prow)
1✔
557
            log.info("Output file would have been: {}".format(scriptpathname))
1✔
558
            if prow['JOBDESC'] != 'tilenight':
1✔
559
                log.info("Command to be run: {}".format(cmd.split()))
1✔
560
        else:
561
            expids = prow['EXPID']
1✔
562
            if len(expids) == 0:
1✔
563
                expids = None
×
564

565
            if prow['JOBDESC'] == 'tilenight':
1✔
566
                log.info("Creating tilenight script for tile {}".format(prow['TILEID']))
1✔
567
                if 'laststeps' in extra_job_args:
1✔
568
                    laststeps = extra_job_args['laststeps']
1✔
569
                else:
570
                    err = f'{prow=} job did not specify last steps to tilenight'
×
571
                    log.error(err)
×
572
                    raise ValueError(err)
×
573
                ncameras = len(decode_camword(prow['PROCCAMWORD']))
1✔
574
                scriptpathname = create_desi_proc_tilenight_batch_script(
1✔
575
                                                               night=prow['NIGHT'], exp=expids,
576
                                                               tileid=prow['TILEID'],
577
                                                               ncameras=ncameras,
578
                                                               queue=queue,
579
                                                               mpistdstars=True,
580
                                                               use_specter=use_specter,
581
                                                               system_name=system_name,
582
                                                               laststeps=laststeps)
583
            else:
584
                if expids is not None and len(expids) > 1:
1✔
585
                    expids = expids[:1]
1✔
586
                log.info("Running: {}".format(cmd.split()))
1✔
587
                scriptpathname = create_desi_proc_batch_script(night=prow['NIGHT'], exp=expids,
1✔
588
                                                               cameras=prow['PROCCAMWORD'],
589
                                                               jobdesc=prow['JOBDESC'],
590
                                                               queue=queue, cmdline=cmd,
591
                                                               use_specter=use_specter,
592
                                                               system_name=system_name,
593
                                                               nightlybias=nightlybias,
594
                                                               nightlycte=nightlycte,
595
                                                               cte_expids=cte_expids)
596
    log.info("Outfile is: {}".format(scriptpathname))
1✔
597
    prow['SCRIPTNAME'] = os.path.basename(scriptpathname)
1✔
598
    return prow
1✔
599

600
_fake_qid = int(time.time() - 1.7e9)
1✔
601
def _get_fake_qid():
1✔
602
    """
603
    Return fake slurm queue jobid to use for dry-run testing
604
    """
605
    # Note: not implemented as a yield generator so that this returns a
606
    # genuine int, not a generator object
607
    global _fake_qid
608
    _fake_qid += 1
1✔
609
    return _fake_qid
1✔
610

611
def submit_batch_script(prow, dry_run=0, reservation=None, strictly_successful=False):
1✔
612
    """
613
    Wrapper script that takes a processing table row and three modifier keywords and submits the scripts to the Slurm
614
    scheduler.
615

616
    Args:
617
        prow, Table.Row or dict. Must include keyword accessible definitions for processing_table columns found in
618
            desispect.workflow.proctable.get_processing_table_column_defs()
619
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
620
            0 which runs the code normally.
621
            1 writes all files but doesn't submit any jobs to Slurm.
622
            2 writes tables but doesn't write scripts or submit anything.
623
            3 Doesn't write or submit anything but queries Slurm normally for job status.
624
            4 Doesn't write, submit jobs, or query Slurm.
625
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
626
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
627
        strictly_successful, bool. Whether all jobs require all inputs to have succeeded. For daily processing, this is
628
            less desirable because e.g. the sciences can run with SVN default calibrations rather
629
            than failing completely from failed calibrations. Default is False.
630

631
    Returns:
632
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
633
        scriptname.
634

635
    Note:
636
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
637
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
638
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
639
    """
640
    log = get_logger()
1✔
641
    dep_qids = prow['LATEST_DEP_QID']
1✔
642
    dep_list, dep_str = '', ''
1✔
643

644
    ## With desi_proc_night we now either resubmit failed jobs or exit, so this
645
    ## should no longer be necessary in the normal workflow.
646
    # workaround for sbatch --dependency bug not tracking jobs correctly
647
    # see NERSC TICKET INC0203024
648
    failed_dependency = False
1✔
649
    if len(dep_qids) > 0 and not dry_run:
1✔
650
        non_final_states = get_non_final_states()
×
651
        state_dict = get_queue_states_from_qids(dep_qids, dry_run_level=dry_run, use_cache=True)
×
652
        still_depids = []
×
653
        for depid in dep_qids:
×
654
            if depid in state_dict.keys():
×
655
                if state_dict[int(depid)] == 'COMPLETED':
×
656
                   log.info(f"removing completed jobid {depid}")
×
657
                elif state_dict[int(depid)] not in non_final_states:
×
658
                    failed_dependency = True
×
659
                    log.info("Found a dependency in a bad final state="
×
660
                             + f"{state_dict[int(depid)]} for depjobid={depid},"
661
                             + " not submitting this job.")
662
                    still_depids.append(depid)
×
663
                else:
664
                    still_depids.append(depid)
×
665
            else:
666
                still_depids.append(depid)
×
667
        dep_qids = np.array(still_depids)
×
668

669
    if len(dep_qids) > 0:
1✔
670
        jobtype = prow['JOBDESC']
1✔
671
        if strictly_successful:
1✔
672
            depcond = 'afterok'
1✔
673
        elif jobtype in ['arc', 'psfnight', 'prestdstar', 'stdstarfit']:
×
674
            ## (though psfnight and stdstarfit will require some inputs otherwise they'll go up in flames)
675
            depcond = 'afterany'
×
676
        else:
677
            ## if 'flat','nightlyflat','poststdstar', or any type of redshift, require strict success of inputs
678
            depcond = 'afterok'
×
679

680
        dep_str = f'--dependency={depcond}:'
1✔
681

682
        if np.isscalar(dep_qids):
1✔
683
            dep_list = str(dep_qids).strip(' \t')
×
684
            if dep_list == '':
×
685
                dep_str = ''
×
686
            else:
687
                dep_str += dep_list
×
688
        else:
689
            if len(dep_qids)>1:
1✔
690
                dep_list = ':'.join(np.array(dep_qids).astype(str))
1✔
691
                dep_str += dep_list
1✔
692
            elif len(dep_qids) == 1 and dep_qids[0] not in [None, 0]:
1✔
693
                dep_str += str(dep_qids[0])
1✔
694
            else:
695
                dep_str = ''
×
696

697
    # script = f'{jobname}.slurm'
698
    # script_path = pathjoin(batchdir, script)
699
    if prow['JOBDESC'] in ['pernight-v0','pernight','perexp','cumulative']:
1✔
700
        script_path = get_ztile_script_pathname(tileid=prow['TILEID'],group=prow['JOBDESC'],
1✔
701
                                                        night=prow['NIGHT'], expid=np.min(prow['EXPID']))
702
        jobname = os.path.basename(script_path)
1✔
703
    else:
704
        batchdir = get_desi_proc_batch_file_path(night=prow['NIGHT'])
1✔
705
        jobname = batch_script_name(prow)
1✔
706
        script_path = pathjoin(batchdir, jobname)
1✔
707

708
    batch_params = ['sbatch', '--parsable']
1✔
709
    if dep_str != '':
1✔
710
        batch_params.append(f'{dep_str}')
1✔
711

712
    reservation = parse_reservation(reservation, prow['JOBDESC'])
1✔
713
    if reservation is not None:
1✔
714
        batch_params.append(f'--reservation={reservation}')
×
715

716
    batch_params.append(f'{script_path}')
1✔
717
    submitted = True
1✔
718
    ## If dry_run give it a fake QID
719
    ## if a dependency has failed don't even try to submit the job because
720
    ## Slurm will refuse, instead just mark as unsubmitted.
721
    if dry_run:
1✔
722
        current_qid = _get_fake_qid()
1✔
723
    elif not failed_dependency:
×
724
        #- sbatch sometimes fails; try several times before giving up
725
        max_attempts = 3
×
726
        for attempt in range(max_attempts):
×
727
            try:
×
728
                current_qid = subprocess.check_output(batch_params, stderr=subprocess.STDOUT, text=True)
×
729
                current_qid = int(current_qid.strip(' \t\n'))
×
730
                break
×
731
            except subprocess.CalledProcessError as err:
×
732
                log.error(f'{jobname} submission failure at {datetime.datetime.now()}')
×
733
                log.error(f'{jobname}   {batch_params}')
×
734
                log.error(f'{jobname}   {err.output=}')
×
735
                if attempt < max_attempts - 1:
×
736
                    log.info('Sleeping 60 seconds then retrying')
×
737
                    time.sleep(60)
×
738
        else:  #- for/else happens if loop doesn't succeed
739
            msg = f'{jobname} submission failed {max_attempts} times.' \
×
740
                  + ' setting as unsubmitted and moving on'
741
            log.error(msg)
×
742
            current_qid = get_default_qid()
×
743
            submitted = False
×
744
    else:
745
        current_qid = get_default_qid()
×
746
        submitted = False
×
747

748
    ## Update prow with new information
749
    prow['LATEST_QID'] = current_qid
1✔
750

751
    ## If we didn't submit, don't say we did and don't add to ALL_QIDS
752
    if submitted:
1✔
753
        log.info(batch_params)
1✔
754
        log.info(f'Submitted {jobname} with dependencies {dep_str} and '
1✔
755
                 + f'reservation={reservation}. Returned qid: {current_qid}')
756

757
        ## Update prow with new information
758
        prow['ALL_QIDS'] = np.append(prow['ALL_QIDS'],current_qid)
1✔
759
        prow['STATUS'] = 'SUBMITTED'
1✔
760
        prow['SUBMIT_DATE'] = int(time.time())
1✔
761
    else:
762
        log.info(f"Would have submitted: {batch_params}")
×
763
        prow['STATUS'] = 'UNSUBMITTED'
×
764

765
        ## Update the Slurm jobid cache of job states
766
        update_queue_state_cache(qid=prow['LATEST_QID'], state=prow['STATUS'])
×
767

768
    return prow
1✔
769

770

771
#############################################
772
##########   Row Manipulations   ############
773
#############################################
774
def define_and_assign_dependency(prow, calibjobs, use_tilenight=False,
1✔
775
                                 refnight=None):
776
    """
777
    Given input processing row and possible calibjobs, this defines the
778
    JOBDESC keyword and assigns the dependency appropriate for the job type of
779
    prow.
780

781
    Args:
782
        prow, Table.Row or dict. Must include keyword accessible definitions for
783
            'OBSTYPE'. A row must have column names for
784
            'JOBDESC', 'INT_DEP_IDS', and 'LATEST_DEP_ID'.
785
        calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
786
            and 'nightlyflat'. Each key corresponds to a Table.Row or
787
            None. The table.Row() values are for the corresponding
788
            calibration job. Each value that isn't None must contain
789
            'INTID', and 'LATEST_QID'. If None, it assumes the
790
            dependency doesn't exist and no dependency is assigned.
791
        use_tilenight, bool. Default is False. If True, use desi_proc_tilenight
792
            for prestdstar, stdstar,and poststdstar steps for
793
            science exposures.
794
        refnight, int. The reference night for linking jobs
795

796
    Returns:
797
        Table.Row or dict: The same prow type and keywords as input except
798
        with modified values updated values for 'JOBDESC', 'INT_DEP_IDS'. and 'LATEST_DEP_ID'.
799

800
    Note:
801
        This modifies the input. Though Table.Row objects are generally copied
802
        on modification, so the change to the input object in memory may or may
803
        not be changed. As of writing, a row from a table given to this function
804
        will not change during the execution of this function (but can be
805
        overwritten explicitly with the returned row if desired).
806
    """
807
    if prow['OBSTYPE'] in ['science', 'twiflat']:
1✔
808
        if calibjobs['nightlyflat'] is not None:
1✔
809
            dependency = calibjobs['nightlyflat']
1✔
810
        elif calibjobs['psfnight'] is not None:
1✔
811
            dependency = calibjobs['psfnight']
1✔
812
        elif calibjobs['ccdcalib'] is not None:
1✔
813
            dependency = calibjobs['ccdcalib']
1✔
814
        elif calibjobs['nightlybias'] is not None:
1✔
815
            dependency = calibjobs['nightlybias']
×
816
        elif calibjobs['badcol'] is not None:
1✔
817
            dependency = calibjobs['badcol']
×
818
        else:
819
            dependency = calibjobs['linkcal']
1✔
820
        if not use_tilenight:
1✔
821
            prow['JOBDESC'] = 'prestdstar'
1✔
822
    elif prow['OBSTYPE'] == 'flat':
1✔
823
        if calibjobs['psfnight'] is not None:
1✔
824
            dependency = calibjobs['psfnight']
1✔
825
        elif calibjobs['ccdcalib'] is not None:
1✔
826
            dependency = calibjobs['ccdcalib']
1✔
827
        elif calibjobs['nightlybias'] is not None:
1✔
828
            dependency = calibjobs['nightlybias']
×
829
        elif calibjobs['badcol'] is not None:
1✔
830
            dependency = calibjobs['badcol']
×
831
        else:
832
            dependency = calibjobs['linkcal']
1✔
833
    elif prow['OBSTYPE'] == 'arc':
1✔
834
        if calibjobs['ccdcalib'] is not None:
1✔
835
            dependency = calibjobs['ccdcalib']
1✔
836
        elif calibjobs['nightlybias'] is not None:
1✔
837
            dependency = calibjobs['nightlybias']
1✔
838
        elif calibjobs['badcol'] is not None:
1✔
839
            dependency = calibjobs['badcol']
×
840
        else:
841
            dependency = calibjobs['linkcal']
1✔
842
    elif prow['JOBDESC'] in ['badcol', 'nightlybias', 'ccdcalib']:
1✔
843
        dependency = calibjobs['linkcal']
1✔
844
    elif prow['OBSTYPE'] == 'dark':
1✔
845
        if calibjobs['ccdcalib'] is not None:
×
846
            dependency = calibjobs['ccdcalib']
×
847
        elif calibjobs['nightlybias'] is not None:
×
848
            dependency = calibjobs['nightlybias']
×
849
        elif calibjobs['badcol'] is not None:
×
850
            dependency = calibjobs['badcol']
×
851
        else:
852
            dependency = calibjobs['linkcal']
×
853
    elif prow['JOBDESC'] == 'linkcal' and refnight is not None:
1✔
854
        dependency = None
1✔
855
        ## For link cals only, enable cross-night dependencies if available
856
        refproctable = findfile('proctable', night=refnight)
1✔
857
        if os.path.exists(refproctable):
1✔
858
            ptab = load_table(tablename=refproctable, tabletype='proctable')
×
859
            ## This isn't perfect because we may depend on jobs that aren't
860
            ## actually being linked
861
            ## Also allows us to proceed even if jobs don't exist yet
862
            deps = []
×
863
            for job in ['nightlybias', 'ccdcalib', 'psfnight', 'nightlyflat']:
×
864
                if job in ptab['JOBDESC']:
×
865
                    ## add prow to dependencies
866
                    deps.append(ptab[ptab['JOBDESC']==job][0])
×
867
            if len(deps) > 0:
×
868
                dependency = deps
×
869
    else:
870
        dependency = None
×
871

872
    prow = assign_dependency(prow, dependency)
1✔
873

874
    return prow
1✔
875

876

877
def assign_dependency(prow, dependency):
1✔
878
    """
879
    Given input processing row and possible arcjob (processing row for psfnight) and flatjob (processing row for
880
    nightlyflat), this defines the JOBDESC keyword and assigns the dependency appropriate for the job type of prow.
881

882
    Args:
883
        prow, Table.Row or dict. Must include keyword accessible definitions for 'OBSTYPE'. A row must have column names for
884
            'JOBDESC', 'INT_DEP_IDS', and 'LATEST_DEP_ID'.
885
        dependency, NoneType or scalar/list/array of Table.Row, dict. Processing row corresponding to the required input
886
            for the job in prow. This must contain keyword
887
            accessible values for 'INTID', and 'LATEST_QID'.
888
            If None, it assumes the dependency doesn't exist
889
            and no dependency is assigned.
890

891
    Returns:
892
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
893
        'JOBDESC', 'INT_DEP_IDS'. and 'LATEST_DEP_ID'.
894

895
    Note:
896
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
897
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
898
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
899
    """
900
    prow['INT_DEP_IDS'] = np.ndarray(shape=0).astype(int)
1✔
901
    prow['LATEST_DEP_QID'] = np.ndarray(shape=0).astype(int)
1✔
902
    if dependency is not None:
1✔
903
        if type(dependency) in [list, np.array]:
1✔
904
            ids, qids = [], []
1✔
905
            for curdep in dependency:
1✔
906
                ids.append(curdep['INTID'])
1✔
907
                if still_a_dependency(curdep):
1✔
908
                    # ids.append(curdep['INTID'])
909
                    qids.append(curdep['LATEST_QID'])
1✔
910
            prow['INT_DEP_IDS'] = np.array(ids, dtype=int)
1✔
911
            prow['LATEST_DEP_QID'] = np.array(qids, dtype=int)
1✔
912
        elif type(dependency) in [dict, OrderedDict, Table.Row]:
1✔
913
            prow['INT_DEP_IDS'] = np.array([dependency['INTID']], dtype=int)
1✔
914
            if still_a_dependency(dependency):
1✔
915
                prow['LATEST_DEP_QID'] = np.array([dependency['LATEST_QID']], dtype=int)
1✔
916
    return prow
1✔
917

918
def still_a_dependency(dependency):
1✔
919
    """
920
    Defines the criteria for which a dependency is deemed complete (and therefore no longer a dependency).
921

922
     Args:
923
        dependency, Table.Row or dict. Processing row corresponding to the required input for the job in prow.
924
            This must contain keyword accessible values for 'STATUS', and 'LATEST_QID'.
925

926
    Returns:
927
        bool: False if the criteria indicate that the dependency is completed and no longer a blocking factor (ie no longer
928
        a genuine dependency). Returns True if the dependency is still a blocking factor such that the slurm
929
        scheduler needs to be aware of the pending job.
930

931
    """
932
    return dependency['LATEST_QID'] > 0 and dependency['STATUS'] != 'COMPLETED'
1✔
933

934
def get_type_and_tile(erow):
1✔
935
    """
936
    Trivial function to return the OBSTYPE and the TILEID from an exposure table row
937

938
    Args:
939
        erow, Table.Row or dict. Must contain 'OBSTYPE' and 'TILEID' as keywords.
940

941
    Returns:
942
        tuple (str, str), corresponding to the OBSTYPE and TILEID values of the input erow.
943
    """
944
    return str(erow['OBSTYPE']).lower(), erow['TILEID']
×
945

946

947
#############################################
948
#########   Table manipulators   ############
949
#############################################
950
def parse_previous_tables(etable, ptable, night):
1✔
951
    """
952
    This takes in the exposure and processing tables and regenerates all the working memory variables needed for the
953
    daily processing script.
954

955
    Used by the daily processing to define most of its state-ful variables into working memory.
956
    If the processing table is empty, these are simply declared and returned for use.
957
    If the code had previously run and exited (or crashed), however, this will all the code to
958
    re-establish itself by redefining these values.
959

960
    Args:
961
        etable, Table, Exposure table of all exposures that have been dealt with thus far.
962
        ptable, Table, Processing table of all exposures that have been processed.
963
        night, str or int, the night the data was taken.
964

965
    Returns:
966
        tuple: A tuple containing:
967

968
        * arcs, list of dicts, list of the individual arc jobs used for the psfnight (NOT all
969
          the arcs, if multiple sets existed)
970
        * flats, list of dicts, list of the individual flat jobs used for the nightlyflat (NOT
971
          all the flats, if multiple sets existed)
972
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
973
          (if currently processing that tile)
974
        * calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'badcol', 'psfnight'
975
          and 'nightlyflat'. Each key corresponds to a Table.Row or
976
          None. The table.Row() values are for the corresponding
977
          calibration job.
978
        * curtype, None, the obstype of the current job being run. Always None as first new job will define this.
979
        * lasttype, str or None, the obstype of the last individual exposure row to be processed.
980
        * curtile, None, the tileid of the current job (if science). Otherwise None. Always None as first
981
          new job will define this.
982
        * lasttile, str or None, the tileid of the last job (if science). Otherwise None.
983
        * internal_id, int, an internal identifier unique to each job. Increments with each new job. This
984
          is the latest unassigned value.
985
    """
986
    log = get_logger()
×
987
    arcs, flats, sciences = [], [], []
×
988
    calibjobs = {'nightlybias': None, 'ccdcalib': None, 'badcol': None,
×
989
                 'psfnight': None, 'nightlyflat': None, 'linkcal': None,
990
                 'accounted_for': dict()}
991
    curtype,lasttype = None,None
×
992
    curtile,lasttile = None,None
×
993

994
    if len(ptable) > 0:
×
995
        prow = ptable[-1]
×
996
        internal_id = int(prow['INTID'])+1
×
997
        lasttype,lasttile = get_type_and_tile(ptable[-1])
×
998
        jobtypes = ptable['JOBDESC']
×
999

1000
        if 'nightlybias' in jobtypes:
×
1001
            calibjobs['nightlybias'] = table_row_to_dict(ptable[jobtypes=='nightlybias'][0])
×
1002
            log.info("Located nightlybias job in exposure table: {}".format(calibjobs['nightlybias']))
×
1003

1004
        if 'ccdcalib' in jobtypes:
×
1005
            calibjobs['ccdcalib'] = table_row_to_dict(ptable[jobtypes=='ccdcalib'][0])
×
1006
            log.info("Located ccdcalib job in exposure table: {}".format(calibjobs['ccdcalib']))
×
1007

1008
        if 'psfnight' in jobtypes:
×
1009
            calibjobs['psfnight'] = table_row_to_dict(ptable[jobtypes=='psfnight'][0])
×
1010
            log.info("Located joint fit psfnight job in exposure table: {}".format(calibjobs['psfnight']))
×
1011
        elif lasttype == 'arc':
×
1012
            seqnum = 10
×
1013
            for row in ptable[::-1]:
×
1014
                erow = etable[etable['EXPID']==row['EXPID'][0]]
×
1015
                if row['OBSTYPE'].lower() == 'arc' and int(erow['SEQNUM'])<seqnum:
×
1016
                    arcs.append(table_row_to_dict(row))
×
1017
                    seqnum = int(erow['SEQNUM'])
×
1018
                else:
1019
                    break
×
1020
            ## Because we work backword to fill in, we need to reverse them to get chronological order back
1021
            arcs = arcs[::-1]
×
1022

1023
        if 'nightlyflat' in jobtypes:
×
1024
            calibjobs['nightlyflat'] = table_row_to_dict(ptable[jobtypes=='nightlyflat'][0])
×
1025
            log.info("Located joint fit nightlyflat job in exposure table: {}".format(calibjobs['nightlyflat']))
×
1026
        elif lasttype == 'flat':
×
1027
            for row in ptable[::-1]:
×
1028
                erow = etable[etable['EXPID']==row['EXPID'][0]]
×
1029
                if row['OBSTYPE'].lower() == 'flat' and int(erow['SEQTOT']) < 5:
×
1030
                    if float(erow['EXPTIME']) > 100.:
×
1031
                        flats.append(table_row_to_dict(row))
×
1032
                else:
1033
                    break
×
1034
            flats = flats[::-1]
×
1035

1036
        if lasttype.lower() == 'science':
×
1037
            for row in ptable[::-1]:
×
1038
                if row['OBSTYPE'].lower() == 'science' and row['TILEID'] == lasttile and \
×
1039
                   row['JOBDESC'] == 'prestdstar' and row['LASTSTEP'] != 'skysub':
1040
                    sciences.append(table_row_to_dict(row))
×
1041
                else:
1042
                    break
×
1043
            sciences = sciences[::-1]
×
1044
    else:
1045
        internal_id = night_to_starting_iid(night)
×
1046

1047
    return arcs,flats,sciences, \
×
1048
           calibjobs, \
1049
           curtype, lasttype, \
1050
           curtile, lasttile,\
1051
           internal_id
1052

1053
def generate_calibration_dict(ptable, files_to_link=None):
1✔
1054
    """
1055
    This takes in a processing table and regenerates the working memory calibration
1056
    dictionary for dependency tracking. Used by the daily processing to define 
1057
    most of its state-ful variables into working memory.
1058
    If the processing table is empty, these are simply declared and returned for use.
1059
    If the code had previously run and exited (or crashed), however, this will all the code to
1060
    re-establish itself by redefining these values.
1061

1062
    Args:
1063
        ptable, Table, Processing table of all exposures that have been processed.
1064
        files_to_link, set, Set of filenames that the linkcal job will link.
1065

1066
    Returns:
1067
        calibjobs, dict. Dictionary containing 'nightlybias', 'badcol', 'ccdcalib',
1068
            'psfnight', 'nightlyflat', 'linkcal', and 'completed'. Each key corresponds to a
1069
            Table.Row or None. The table.Row() values are for the corresponding
1070
            calibration job.
1071
    """
1072
    log = get_logger()
1✔
1073
    job_to_file_map = get_jobdesc_to_file_map()
1✔
1074
    accounted_for = {'biasnight': False, 'badcolumns': False,
1✔
1075
                     'ctecorrnight': False, 'psfnight': False,
1076
                     'fiberflatnight': False}
1077
    calibjobs = {'nightlybias': None, 'ccdcalib': None, 'badcol': None,
1✔
1078
                 'psfnight': None, 'nightlyflat': None, 'linkcal': None}
1079

1080
    ptable_jobtypes = ptable['JOBDESC']
1✔
1081

1082
    for jobtype in calibjobs.keys():
1✔
1083
        if jobtype in ptable_jobtypes:
1✔
1084
            calibjobs[jobtype] = table_row_to_dict(ptable[ptable_jobtypes==jobtype][0])
×
1085
            log.info(f"Located {jobtype} job in exposure table: {calibjobs[jobtype]}")
×
1086
            if jobtype == 'linkcal':
×
1087
                if files_to_link is not None and len(files_to_link) > 0:
×
1088
                    log.info(f"Assuming existing linkcal job processed "
×
1089
                             + f"{files_to_link} since given in override file.")
1090
                    accounted_for = update_accounted_for_with_linking(accounted_for,
×
1091
                                                                  files_to_link)
1092
                else:
1093
                    err = f"linkcal job exists but no files given: {files_to_link=}"
×
1094
                    log.error(err)
×
1095
                    raise ValueError(err)
×
1096
            elif jobtype == 'ccdcalib':
×
1097
                possible_ccd_files = set(['biasnight', 'badcolumns', 'ctecorrnight'])
×
1098
                if files_to_link is None:
×
1099
                    files_accounted_for = possible_ccd_files
×
1100
                else:
1101
                    files_accounted_for = possible_ccd_files.difference(files_to_link)
×
1102
                    ccd_files_linked = possible_ccd_files.intersection(files_to_link)
×
1103
                    log.info(f"Assuming existing ccdcalib job processed "
×
1104
                             + f"{files_accounted_for} since {ccd_files_linked} "
1105
                             + f"are linked.")
1106
                for fil in files_accounted_for:
×
1107
                    accounted_for[fil] = True
×
1108
            else:
1109
                accounted_for[job_to_file_map[jobtype]] = True
×
1110

1111
    calibjobs['accounted_for'] = accounted_for
1✔
1112
    return calibjobs
1✔
1113

1114
def update_accounted_for_with_linking(accounted_for, files_to_link):
1✔
1115
    """
1116
    This takes in a dictionary summarizing the calibration files accounted for
1117
     and updates it based on the files_to_link, which are assumed to have
1118
     already been linked such that those files already exist on disk and
1119
     don't need ot be generated.
1120

1121
    Parameters
1122
    ----------
1123
        accounted_for: dict
1124
            Dictionary containing 'biasnight', 'badcolumns', 'ctecorrnight',
1125
            'psfnight', and 'fiberflatnight'. Each value is True if file is
1126
            accounted for and False if it is not.
1127
        files_to_link: set
1128
            Set of filenames that the linkcal job will link.
1129

1130
    Returns
1131
    -------
1132
        accounted_for: dict
1133
            Dictionary containing 'biasnight', 'badcolumns', 'ctecorrnight',
1134
            'psfnight', and 'fiberflatnight'. Each value is True if file is
1135
            accounted for and False if it is not.
1136
    """
1137
    log = get_logger()
1✔
1138
    
1139
    for fil in files_to_link:
1✔
1140
        if fil in accounted_for:
1✔
1141
            accounted_for[fil] = True
1✔
1142
        else:
1143
            err = f"{fil} doesn't match an expected filetype: "
×
1144
            err += f"{accounted_for.keys()}"
×
1145
            log.error(err)
×
1146
            raise ValueError(err)
×
1147

1148
    return accounted_for
1✔
1149

1150
def all_calibs_submitted(accounted_for, do_cte_flats):
1✔
1151
    """
1152
    Function that returns the boolean logic to determine if the necessary
1153
    calibration jobs have been submitted for calibration.
1154

1155
    Args:
1156
        accounted_for, dict, Dictionary with keys corresponding to the calibration
1157
            filenames and values of True or False.
1158
        do_cte_flats, bool, whether ctecorrnight files are expected or not.
1159

1160
    Returns:
1161
        bool, True if all necessary calibrations have been submitted or handled, False otherwise.
1162
    """
1163
    test_dict = accounted_for.copy()
1✔
1164
    if not do_cte_flats:
1✔
1165
        test_dict.pop('ctecorrnight')
1✔
1166

1167
    return np.all(list(test_dict.values()))
1✔
1168

1169
def update_and_recursively_submit(proc_table, submits=0, max_resubs=100,
1✔
1170
                                  resubmission_states=None,
1171
                                  no_resub_failed=False, ptab_name=None,
1172
                                  dry_run_level=0, reservation=None,
1173
                                  expids=None, tileids=None):
1174
    """
1175
    Given an processing table, this loops over job rows and resubmits failed jobs (as defined by resubmission_states).
1176
    Before submitting a job, it checks the dependencies for failures. If a dependency needs to be resubmitted, it recursively
1177
    follows dependencies until it finds the first job without a failed dependency and resubmits that. Then resubmits the
1178
    other jobs with the new Slurm jobID's for proper dependency coordination within Slurm.
1179

1180
    Args:
1181
        proc_table, Table, the processing table with a row per job.
1182
        submits, int, the number of submissions made to the queue. Used for saving files and in not overloading the scheduler.
1183
        max_resubs, int, the number of times a job should be resubmitted before giving up. Default is very high at 100.
1184
        resubmission_states, list or array of strings, each element should be a capitalized string corresponding to a
1185
            possible Slurm scheduler state, where you wish for jobs with that
1186
            outcome to be resubmitted
1187
        no_resub_failed: bool. Set to True if you do NOT want to resubmit
1188
            jobs with Slurm status 'FAILED' by default. Default is False.
1189
        ptab_name, str, the full pathname where the processing table should be saved.
1190
        dry_run_level (int, optional): If nonzero, this is a simulated run. Default is 0.
1191
            0 which runs the code normally.
1192
            1 writes all files but doesn't submit any jobs to Slurm.
1193
            2 writes tables but doesn't write scripts or submit anything.
1194
            3 Doesn't write or submit anything but queries Slurm normally for job status.
1195
            4 Doesn't write, submit jobs, or query Slurm.
1196
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
1197
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
1198
        expids: list of ints. The exposure ids to resubmit (along with the jobs they depend on).
1199
        tileids: list of ints. The tile ids to resubmit (along with the jobs they depend on).
1200

1201
    Returns:
1202
        tuple: A tuple containing:
1203

1204
        * proc_table: Table, a table with the same rows as the input except that Slurm and jobid relevant columns have
1205
          been updated for those jobs that needed to be resubmitted.
1206
        * submits: int, the number of submissions made to the queue. This is incremented from the input submits, so it is
1207
          the number of submissions made from this function call plus the input submits value.
1208

1209
    Note:
1210
        This modifies the inputs of both proc_table and submits and returns them.
1211
    """
1212
    log = get_logger()
1✔
1213
    if tileids is not None and expids is not None:
1✔
NEW
1214
        msg = f"Provided both expids and tilesids. Please only provide one."
×
NEW
1215
        log.critical(msg)
×
1216
        raise AssertionError(msg)
1217
    elif tileids is not None:
1✔
NEW
1218
        msg = f"Only resubmitting the following tileids and the jobs they depend on: {tileids=}"
×
NEW
1219
        log.info(msg)
×
1220
    elif expids is not None:
1✔
NEW
1221
        msg = f"Only resubmitting the following expids and the jobs they depend on: {expids=}"
×
NEW
1222
        log.info(msg)
×
1223

1224
    if resubmission_states is None:
1✔
1225
        resubmission_states = get_resubmission_states(no_resub_failed=no_resub_failed)
1✔
1226

1227
    log.info(f"Resubmitting jobs with current states in the following: {resubmission_states}")
1✔
1228
    proc_table = update_from_queue(proc_table, dry_run_level=dry_run_level)
1✔
1229

1230
    log.info("Updated processing table queue information:")
1✔
1231
    cols = ['INTID', 'INT_DEP_IDS', 'EXPID', 'TILEID',
1✔
1232
            'OBSTYPE', 'JOBDESC', 'LATEST_QID', 'STATUS']
1233
    log.info(np.array(cols))
1✔
1234
    for row in proc_table:
1✔
1235
        log.info(np.array(row[cols]))
1✔
1236

1237
    ## If expids or tileids are given, subselect to the processing table rows
1238
    ## that included those exposures or tiles otherwise just list all indices
1239
    ## NOTE: Other rows can still be submitted if the selected rows depend on them
1240
    ## we hand the entire table to recursive_submit_failed(), which will walk the
1241
    ## entire dependency tree as necessary.
1242
    if expids is not None:
1✔
NEW
1243
        select_ptab_rows = np.where([np.any(np.isin(prow_eids, expids)) for prow_eids in proc_table['EXPID']])[0]
×
1244
    elif tileids is not None:
1✔
NEW
1245
        select_ptab_rows = np.where(np.isin(proc_table['TILEID'], tileids))[0]
×
1246
    else:
1247
        select_ptab_rows = np.arange(len(proc_table))
1✔
1248

1249
    log.info("\n")
1✔
1250
    id_to_row_map = {row['INTID']: rown for rown, row in enumerate(proc_table)}
1✔
1251
    ## Loop over all requested rows and resubmit those that have failed
1252
    for rown in select_ptab_rows:
1✔
1253
        if proc_table['STATUS'][rown] in resubmission_states:
1✔
1254
            proc_table, submits = recursive_submit_failed(rown=rown, proc_table=proc_table,
1✔
1255
                                                          submits=submits, max_resubs=max_resubs,
1256
                                                          id_to_row_map=id_to_row_map,
1257
                                                          ptab_name=ptab_name,
1258
                                                          resubmission_states=resubmission_states,
1259
                                                          reservation=reservation,
1260
                                                          dry_run_level=dry_run_level)
1261

1262
    proc_table = update_from_queue(proc_table, dry_run_level=dry_run_level)
1✔
1263

1264
    return proc_table, submits
1✔
1265

1266
def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, max_resubs=100, ptab_name=None,
1✔
1267
                            resubmission_states=None, reservation=None, dry_run_level=0):
1268
    """
1269
    Given a row of a processing table and the full processing table, this resubmits the given job.
1270
    Before submitting a job, it checks the dependencies for failures in the processing table. If a dependency needs to
1271
    be resubmitted, it recursively follows dependencies until it finds the first job without a failed dependency and
1272
    resubmits that. Then resubmits the other jobs with the new Slurm jobID's for proper dependency coordination within Slurm.
1273

1274
    Args:
1275
        rown, Table.Row, the row of the processing table that you want to resubmit.
1276
        proc_table, Table, the processing table with a row per job.
1277
        submits, int, the number of submissions made to the queue. Used for saving files and in not overloading the scheduler.
1278
        id_to_row_map, dict, lookup dictionary where the keys are internal ids (INTID's) and the values are the row position
1279
            in the processing table.
1280
        max_resubs, int, the number of times a job should be resubmitted before giving up. Default is very high at 100.
1281
        ptab_name, str, the full pathname where the processing table should be saved.
1282
        resubmission_states, list or array of strings, each element should be a capitalized string corresponding to a
1283
            possible Slurm scheduler state, where you wish for jobs with that
1284
            outcome to be resubmitted
1285
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
1286
        dry_run_level (int, optional): If nonzero, this is a simulated run. Default is 0.
1287
            0 which runs the code normally.
1288
            1 writes all files but doesn't submit any jobs to Slurm.
1289
            2 writes tables but doesn't write scripts or submit anything.
1290
            3 Doesn't write or submit anything but queries Slurm normally for job status.
1291
            4 Doesn't write, submit jobs, or query Slurm.
1292
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
1293

1294
    Returns:
1295
        tuple: A tuple containing:
1296

1297
        * proc_table: Table, a table with the same rows as the input except that Slurm and jobid relevant columns have
1298
          been updated for those jobs that needed to be resubmitted.
1299
        * submits: int, the number of submissions made to the queue. This is incremented from the input submits, so it is
1300
          the number of submissions made from this function call plus the input submits value.
1301

1302
    Note:
1303
        This modifies the inputs of both proc_table and submits and returns them.
1304
    """
1305
    log = get_logger()
1✔
1306
    row = proc_table[rown]
1✔
1307
    log.info(f"Identified row {row['INTID']} as needing resubmission.")
1✔
1308
    log.info(f"\t{row['INTID']}: Tileid={row['TILEID']}, Expid(s)={row['EXPID']}, Jobdesc={row['JOBDESC']}")
1✔
1309
    if len(proc_table['ALL_QIDS'][rown]) > max_resubs:
1✔
1310
        log.warning(f"Tileid={row['TILEID']}, Expid(s)={row['EXPID']}, "
×
1311
                    + f"Jobdesc={row['JOBDESC']} has already been submitted "
1312
                    + f"{max_resubs+1} times. Not resubmitting.")
1313
        proc_table['STATUS'][rown] = "MAX_RESUB"
×
1314
        return proc_table, submits
×
1315
    if resubmission_states is None:
1✔
1316
        resubmission_states = get_resubmission_states()
×
1317
    ideps = proc_table['INT_DEP_IDS'][rown]
1✔
1318
    if ideps is None or len(ideps)==0:
1✔
1319
        proc_table['LATEST_DEP_QID'][rown] = np.ndarray(shape=0).astype(int)
×
1320
    else:
1321
        all_valid_states = list(resubmission_states.copy())
1✔
1322
        good_states = ['RUNNING','PENDING','SUBMITTED','COMPLETED']
1✔
1323
        all_valid_states.extend(good_states)
1✔
1324
        othernight_idep_qid_lookup = {}
1✔
1325
        for idep in np.sort(np.atleast_1d(ideps)):
1✔
1326
            if idep not in id_to_row_map:
1✔
1327
                if idep // 1000 != row['INTID'] // 1000:
1✔
1328
                    log.debug("Internal ID: %d not in id_to_row_map. "
1✔
1329
                             + "This is expected since it is from another day. ", idep)
1330
                    reference_night = 20000000 + (idep // 1000)
1✔
1331
                    reftab = read_minimal_full_proctab_cols(nights=[reference_night])
1✔
1332
                    if reftab is None:
1✔
1333
                        msg = f"The dependency is from night={reference_night}" \
×
1334
                              + f" but read_minimal_full_proctab_cols couldn't" \
1335
                              + f" locate that processing table, this is a " \
1336
                              +  f"fatal error."
1337
                        log.critical(msg)
×
1338
                        raise ValueError(msg)
×
1339
                    reftab = update_from_queue(reftab, dry_run_level=dry_run_level)
1✔
1340
                    entry = reftab[reftab['INTID'] == idep][0]
1✔
1341
                    if entry['STATUS'] not in good_states:
1✔
1342
                        msg = f"Internal ID: {idep} not in id_to_row_map. " \
1✔
1343
                              + f"Since the dependency is from night={reference_night} " \
1344
                              + f"and that job isn't in a good state this is an " \
1345
                              + f"error we can't overcome."
1346
                        log.error(msg)
1✔
1347
                        proc_table['STATUS'][rown] = "DEP_NOT_SUBD"
1✔
1348
                        return proc_table, submits
1✔
1349
                    else:
1350
                        ## otherwise all is good, just update the cache to use this
1351
                        ## in the next stage
1352
                        othernight_idep_qid_lookup[idep] = entry['LATEST_QID']
1✔
1353
                        update_full_ptab_cache(reftab)
1✔
1354
                else:
1355
                    msg = f"Internal ID: {idep} not in id_to_row_map. " \
×
1356
                         + f"Since the dependency is from the same night" \
1357
                         + f" and we can't find it, this is a fatal error."
1358
                    log.critical(msg)
×
1359
                    raise ValueError(msg)
×
1360
            elif proc_table['STATUS'][id_to_row_map[idep]] not in all_valid_states:
1✔
1361
                log.error(f"Proc INTID: {proc_table['INTID'][rown]} depended on" +
×
1362
                            f" INTID {proc_table['INTID'][id_to_row_map[idep]]}" +
1363
                            f" but that exposure has state" +
1364
                            f" {proc_table['STATUS'][id_to_row_map[idep]]} that" +
1365
                            f" isn't in the list of resubmission states." +
1366
                            f" Exiting this job's resubmission attempt.")
1367
                proc_table['STATUS'][rown] = "DEP_NOT_SUBD"
×
1368
                return proc_table, submits
×
1369
        qdeps = []
1✔
1370
        for idep in np.sort(np.atleast_1d(ideps)):
1✔
1371
            if idep in id_to_row_map:
1✔
1372
                if proc_table['STATUS'][id_to_row_map[idep]] in resubmission_states:
1✔
1373
                    proc_table, submits = recursive_submit_failed(id_to_row_map[idep],
×
1374
                                                                  proc_table, submits,
1375
                                                                  id_to_row_map,
1376
                                                                  reservation=reservation,
1377
                                                                  dry_run_level=dry_run_level)
1378
                ## Now that we've resubmitted the dependency if necessary,
1379
                ## add the most recent QID to the list
1380
                qdeps.append(proc_table['LATEST_QID'][id_to_row_map[idep]])
1✔
1381
            else:
1382
                ## Since we verified above that the cross night QID is still
1383
                ## either pending or successful, add that to the list of QID's
1384
                qdeps.append(othernight_idep_qid_lookup[idep])
1✔
1385

1386
        qdeps = np.atleast_1d(qdeps)
1✔
1387
        if len(qdeps) > 0:
1✔
1388
            proc_table['LATEST_DEP_QID'][rown] = qdeps
1✔
1389
        else:
1390
            log.error(f"number of qdeps should be 1 or more: Rown {rown}, ideps {ideps}")
×
1391

1392
    proc_table[rown] = submit_batch_script(proc_table[rown], reservation=reservation,
1✔
1393
                                           strictly_successful=True, dry_run=dry_run_level)
1394
    submits += 1
1✔
1395

1396
    if dry_run_level < 3:
1✔
1397
        if ptab_name is None:
×
1398
            write_table(proc_table, tabletype='processing', overwrite=True)
×
1399
        else:
1400
            write_table(proc_table, tablename=ptab_name, overwrite=True)
×
1401
        sleep_and_report(0.1 + 0.1*(submits % 10 == 0),
×
1402
                         message_suffix=f"after submitting job to queue and writing proctable")
1403
    return proc_table, submits
1✔
1404

1405

1406
#########################################
1407
########     Joint fit     ##############
1408
#########################################
1409
def joint_fit(ptable, prows, internal_id, queue, reservation, descriptor, z_submit_types=None,
1✔
1410
              dry_run=0, strictly_successful=False, check_for_outputs=True, resubmit_partial_complete=True,
1411
              system_name=None):
1412
    """
1413
    DEPRECATED
1414
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1415
    joint fitting job given by descriptor. If the joint fitting job is standard star fitting, the post standard star fits
1416
    for all the individual exposures also created and submitted. The returned ptable has all of these rows added to the
1417
    table given as input.
1418

1419
    Args:
1420
        ptable (Table): The processing table where each row is a processed job.
1421
        prows (list or array of dict): The rows corresponding to the individual exposure jobs that are
1422
            inputs to the joint fit.
1423
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1424
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1425
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1426
        descriptor (str): Description of the joint fitting job. Can either be 'science' or 'stdstarfit', 'arc' or 'psfnight',
1427
            or 'flat' or 'nightlyflat'.
1428
        z_submit_types (list of str, optional): The "group" types of redshifts that should be submitted with each
1429
            exposure. If not specified or None, then no redshifts are submitted.
1430
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
1431
            0 which runs the code normally.
1432
            1 writes all files but doesn't submit any jobs to Slurm.
1433
            2 writes tables but doesn't write scripts or submit anything.
1434
            3 Doesn't write or submit anything but queries Slurm normally for job status.
1435
            4 Doesn't write, submit jobs, or query Slurm.
1436
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
1437
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1438
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1439
            than failing completely from failed calibrations. Default is False.
1440
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1441
            data products for the script being submitted. If all files exist and this is True,
1442
            then the script will not be submitted. If some files exist and this is True, only the
1443
            subset of the cameras without the final data products will be generated and submitted.
1444
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1445
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1446
            remaining cameras not found to exist.
1447
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1448

1449
    Returns:
1450
        tuple: A tuple containing:
1451

1452
        * ptable, Table. The same processing table as input except with added rows for the joint fit job and, in the case
1453
          of a stdstarfit, the poststdstar science exposure jobs.
1454
        * joint_prow, dict. Row of a processing table corresponding to the joint fit job.
1455
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1456
    """
1457
    log = get_logger()
×
1458
    if len(prows) < 1:
×
1459
        return ptable, None, internal_id
×
1460

1461
    if descriptor is None:
×
1462
        return ptable, None
×
1463
    elif descriptor == 'arc':
×
1464
        descriptor = 'psfnight'
×
1465
    elif descriptor == 'flat':
×
1466
        descriptor = 'nightlyflat'
×
1467
    elif descriptor == 'science':
×
1468
        if z_submit_types is None or len(z_submit_types) == 0:
×
1469
            descriptor = 'stdstarfit'
×
1470

1471
    if descriptor not in ['psfnight', 'nightlyflat', 'science','stdstarfit']:
×
1472
        return ptable, None, internal_id
×
1473

1474
    log.info(" ")
×
1475
    log.info(f"Joint fit criteria found. Running {descriptor}.\n")
×
1476

1477
    if descriptor == 'science':
×
1478
        joint_prow, internal_id = make_joint_prow(prows, descriptor='stdstarfit', internal_id=internal_id)
×
1479
    else:
1480
        joint_prow, internal_id = make_joint_prow(prows, descriptor=descriptor, internal_id=internal_id)
×
1481
    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1482
                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1483
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1484
    ptable.add_row(joint_prow)
×
1485

1486
    if descriptor in ['science','stdstarfit']:
×
1487
        if descriptor == 'science':
×
1488
            zprows = []
×
1489
        log.info(" ")
×
1490
        log.info(f"Submitting individual science exposures now that joint fitting of standard stars is submitted.\n")
×
1491
        for row in prows:
×
1492
            if row['LASTSTEP'] == 'stdstarfit':
×
1493
                continue
×
1494
            row['JOBDESC'] = 'poststdstar'
×
1495

1496
            # poststdstar job can't process cameras not included in its stdstar joint fit
1497
            stdcamword = joint_prow['PROCCAMWORD']
×
1498
            thiscamword = row['PROCCAMWORD']
×
1499
            proccamword = camword_intersection([stdcamword, thiscamword])
×
1500
            if proccamword != thiscamword:
×
1501
                dropcams = difference_camwords(thiscamword, proccamword)
×
1502
                assert dropcams != ''  #- i.e. if they differ, we should be dropping something
×
1503
                log.warning(f"Dropping exp {row['EXPID']} poststdstar cameras {dropcams} since they weren't included in stdstar fit {stdcamword}")
×
1504
                row['PROCCAMWORD'] = proccamword
×
1505

1506
            row['INTID'] = internal_id
×
1507
            internal_id += 1
×
1508
            row['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
×
1509
            row = assign_dependency(row, joint_prow)
×
1510
            row = create_and_submit(row, queue=queue, reservation=reservation, dry_run=dry_run,
×
1511
                                    strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1512
                                    resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1513
            ptable.add_row(row)
×
1514
            if descriptor == 'science' and row['LASTSTEP'] == 'all':
×
1515
                zprows.append(row)
×
1516

1517
    ## Now run redshifts
1518
    if descriptor == 'science' and len(zprows) > 0 and z_submit_types is not None:
×
1519
        prow_selection = (  (ptable['OBSTYPE'] == 'science')
×
1520
                          & (ptable['LASTSTEP'] == 'all')
1521
                          & (ptable['JOBDESC'] == 'poststdstar')
1522
                          & (ptable['TILEID'] == int(zprows[0]['TILEID'])) )
1523
        nightly_zprows = []
×
1524
        if np.sum(prow_selection) == len(zprows):
×
1525
            nightly_zprows = zprows.copy()
×
1526
        else:
1527
            for prow in ptable[prow_selection]:
×
1528
                nightly_zprows.append(table_row_to_dict(prow))
×
1529

1530
        for zsubtype in z_submit_types:
×
1531
            if zsubtype == 'perexp':
×
1532
                for zprow in zprows:
×
1533
                    log.info(" ")
×
1534
                    log.info(f"Submitting redshift fit of type {zsubtype} for TILEID {zprow['TILEID']} and EXPID {zprow['EXPID']}.\n")
×
1535
                    joint_prow, internal_id = make_joint_prow([zprow], descriptor=zsubtype, internal_id=internal_id)
×
1536
                    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1537
                                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1538
                                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1539
                    ptable.add_row(joint_prow)
×
1540
            else:
1541
                log.info(" ")
×
1542
                log.info(f"Submitting joint redshift fits of type {zsubtype} for TILEID {nightly_zprows[0]['TILEID']}.")
×
1543
                expids = [prow['EXPID'][0] for prow in nightly_zprows]
×
1544
                log.info(f"Expids: {expids}.\n")
×
1545
                joint_prow, internal_id = make_joint_prow(nightly_zprows, descriptor=zsubtype, internal_id=internal_id)
×
1546
                joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1547
                                               strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1548
                                               resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1549
                ptable.add_row(joint_prow)
×
1550

1551
    if descriptor in ['psfnight', 'nightlyflat']:
×
1552
        log.info(f"Setting the calibration exposures as calibrators in the processing table.\n")
×
1553
        ptable = set_calibrator_flag(prows, ptable)
×
1554

1555
    return ptable, joint_prow, internal_id
×
1556

1557
def joint_cal_fit(descriptor, ptable, prows, internal_id, queue, reservation,
1✔
1558
                  dry_run=0, strictly_successful=False, check_for_outputs=True,
1559
                  resubmit_partial_complete=True, system_name=None):
1560
    """
1561
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1562
    joint fitting job given by descriptor. If the joint fitting job is standard star fitting, the post standard star fits
1563
    for all the individual exposures also created and submitted. The returned ptable has all of these rows added to the
1564
    table given as input.
1565

1566
    Args:
1567
        descriptor (str): Description of the joint fitting job. Can either be 'science' or 'stdstarfit', 'arc' or 'psfnight',
1568
            or 'flat' or 'nightlyflat'.
1569
        prows (list or array of dict): The rows corresponding to the individual exposure jobs that are
1570
            inputs to the joint fit.
1571
        ptable (Table): The processing table where each row is a processed job.
1572
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1573
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1574
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1575
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
1576
            0 which runs the code normally.
1577
            1 writes all files but doesn't submit any jobs to Slurm.
1578
            2 writes tables but doesn't write scripts or submit anything.
1579
            3 Doesn't write or submit anything but queries Slurm normally for job status.
1580
            4 Doesn't write, submit jobs, or query Slurm.
1581
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
1582
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1583
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1584
            than failing completely from failed calibrations. Default is False.
1585
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1586
            data products for the script being submitted. If all files exist and this is True,
1587
            then the script will not be submitted. If some files exist and this is True, only the
1588
            subset of the cameras without the final data products will be generated and submitted.
1589
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1590
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1591
            remaining cameras not found to exist.
1592
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1593

1594
    Returns:
1595
        tuple: A tuple containing:
1596

1597
        * ptable, Table. The same processing table as input except with added rows for the joint fit job and, in the case
1598
          of a stdstarfit, the poststdstar science exposure jobs.
1599
        * joint_prow, dict. Row of a processing table corresponding to the joint fit job.
1600
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1601
    """
1602
    log = get_logger()
×
1603
    if len(prows) < 1:
×
1604
        return ptable, None, internal_id
×
1605

1606
    if descriptor is None:
×
1607
        return ptable, None
×
1608
    elif descriptor == 'arc':
×
1609
        descriptor = 'psfnight'
×
1610
    elif descriptor == 'flat':
×
1611
        descriptor = 'nightlyflat'
×
1612

1613
    if descriptor not in ['psfnight', 'nightlyflat']:
×
1614
        return ptable, None, internal_id
×
1615

1616
    log.info(" ")
×
1617
    log.info(f"Joint fit criteria found. Running {descriptor}.\n")
×
1618

1619
    joint_prow, internal_id = make_joint_prow(prows, descriptor=descriptor, internal_id=internal_id)
×
1620
    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1621
                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1622
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1623
    ptable.add_row(joint_prow)
×
1624

1625
    if descriptor in ['psfnight', 'nightlyflat']:
×
1626
        log.info(f"Setting the calibration exposures as calibrators in the processing table.\n")
×
1627
        ptable = set_calibrator_flag(prows, ptable)
×
1628

1629
    return ptable, joint_prow, internal_id
×
1630

1631

1632
#########################################
1633
########     Redshifts     ##############
1634
#########################################
1635
def submit_redshifts(ptable, prows, tnight, internal_id, queue, reservation,
1✔
1636
              dry_run=0, strictly_successful=False,
1637
              check_for_outputs=True, resubmit_partial_complete=True,
1638
              z_submit_types=None, system_name=None):
1639
    """
1640
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1641
    tilenight job given by descriptor. The returned ptable has all of these rows added to the
1642
    table given as input.
1643

1644
    Args:
1645
        ptable (Table): The processing table where each row is a processed job.
1646
        prows (list or array of dict): Unsubmitted prestdstar jobs that are first steps in tilenight.
1647
        tnight (Table.Row): The processing table row of the tilenight job on which the redshifts depend.
1648
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1649
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1650
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1651
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
1652
            0 which runs the code normally.
1653
            1 writes all files but doesn't submit any jobs to Slurm.
1654
            2 writes tables but doesn't write scripts or submit anything.
1655
            3 Doesn't write or submit anything but queries Slurm normally for job status.
1656
            4 Doesn't write, submit jobs, or query Slurm.
1657
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
1658
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1659
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1660
            than failing completely from failed calibrations. Default is False.
1661
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1662
            data products for the script being submitted. If all files exist and this is True,
1663
            then the script will not be submitted. If some files exist and this is True, only the
1664
            subset of the cameras without the final data products will be generated and submitted.
1665
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1666
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1667
            remaining cameras not found to exist.
1668
        z_submit_types (list of str): The "group" types of redshifts that should be submitted with each
1669
            exposure. If not specified or None, then no redshifts are submitted.
1670
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1671

1672
    Returns:
1673
        tuple: A tuple containing:
1674

1675
        * ptable, Table. The same processing table as input except with added rows for the joint fit job.
1676
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1677
    """
1678
    log = get_logger()
1✔
1679
    if len(prows) < 1 or z_submit_types == None:
1✔
1680
        return ptable, internal_id
1✔
1681

1682
    log.info(" ")
1✔
1683
    log.info(f"Running redshifts.\n")
1✔
1684

1685
    ## Now run redshifts
1686
    zprows = []
1✔
1687
    for row in prows:
1✔
1688
        if row['LASTSTEP'] == 'all':
1✔
1689
            zprows.append(row)
1✔
1690

1691
    if len(zprows) > 0:
1✔
1692
        for zsubtype in z_submit_types:
1✔
1693
            log.info(" ")
1✔
1694
            log.info(f"Submitting joint redshift fits of type {zsubtype} for TILEID {zprows[0]['TILEID']}.")
1✔
1695
            if zsubtype == 'perexp':
1✔
1696
                for zprow in zprows:
×
1697
                    log.info(f"EXPID: {zprow['EXPID']}.\n")
×
1698
                    redshift_prow = make_redshift_prow([zprow], tnight, descriptor=zsubtype, internal_id=internal_id)
×
1699
                    internal_id += 1
×
1700
                    redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1701
                                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1702
                                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1703
                    ptable.add_row(redshift_prow)
×
1704
            elif zsubtype == 'cumulative':
1✔
1705
                tileids = np.unique([prow['TILEID'] for prow in zprows])
1✔
1706
                if len(tileids) > 1:
1✔
1707
                    msg = f"Error, more than one tileid provided for cumulative redshift job: {tileids}"
×
1708
                    log.critical(msg)
×
1709
                    raise ValueError(msg)
×
1710
                nights = np.unique([prow['NIGHT'] for prow in zprows])
1✔
1711
                if len(nights) > 1:
1✔
1712
                    msg = f"Error, more than one night provided for cumulative redshift job: {nights}"
×
1713
                    log.critical(msg)
×
1714
                    raise ValueError(msg)
×
1715
                tileid, night = tileids[0], nights[0]
1✔
1716
                ## For cumulative redshifts, get any existing processing rows for tile
1717
                matched_prows = read_minimal_tilenight_proctab_cols(tileids=tileids)
1✔
1718
                ## Identify the processing rows that should be assigned as dependecies
1719
                ## tnight should be first such that the new job inherits the other metadata from it
1720
                tnights = [tnight]
1✔
1721
                if matched_prows is not None:
1✔
1722
                    matched_prows = matched_prows[matched_prows['NIGHT'] <= night]
1✔
1723
                    for prow in matched_prows:
1✔
1724
                        if prow['INTID'] != tnight['INTID']:
1✔
1725
                            tnights.append(prow)
1✔
1726
                log.info(f"Internal Processing IDs: {[prow['INTID'] for prow in tnights]}.\n")
1✔
1727
                ## Identify all exposures that should go into the fit
1728
                expids = [prow['EXPID'][0] for prow in zprows]
1✔
1729
                ## note we can actually get the full list of exposures, but for now
1730
                ## we'll stay consistent with old processing where we only list exposures
1731
                ## from the current night
1732
                ## For cumulative redshifts, get valid expids from exptables
1733
                #matched_erows = read_minimal_science_exptab_cols(tileids=tileids)
1734
                #matched_erows = matched_erows[matched_erows['NIGHT']<=night]
1735
                #expids = list(set([prow['EXPID'][0] for prow in zprows])+set(matched_erows['EXPID']))
1736
                log.info(f"Expids: {expids}.\n")
1✔
1737
                redshift_prow, internal_id = make_joint_prow(tnights, descriptor=zsubtype, internal_id=internal_id)
1✔
1738
                redshift_prow['EXPID'] = expids
1✔
1739
                redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
1✔
1740
                                               strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1741
                                               resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1742
                ptable.add_row(redshift_prow)
1✔
1743
            else: # pernight
1744
                expids = [prow['EXPID'][0] for prow in zprows]
×
1745
                log.info(f"Expids: {expids}.\n")
×
1746
                redshift_prow = make_redshift_prow(zprows, tnight, descriptor=zsubtype, internal_id=internal_id)
×
1747
                internal_id += 1
×
1748
                redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1749
                                               strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1750
                                               resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1751
                ptable.add_row(redshift_prow)
×
1752

1753
    return ptable, internal_id
1✔
1754

1755
#########################################
1756
########     Tilenight     ##############
1757
#########################################
1758
def submit_tilenight(ptable, prows, calibjobs, internal_id, queue, reservation,
1✔
1759
              dry_run=0, strictly_successful=False, resubmit_partial_complete=True,
1760
              system_name=None, use_specter=False, extra_job_args=None):
1761
    """
1762
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1763
    tilenight job given by descriptor. The returned ptable has all of these rows added to the
1764
    table given as input.
1765

1766
    Args:
1767
        ptable (Table): The processing table where each row is a processed job.
1768
        prows (list or array of dict): Unsubmitted prestdstar jobs that are first steps in tilenight.
1769
        calibjobs (dict): Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
1770
            and 'nightlyflat'. Each key corresponds to a Table.Row or
1771
            None. The table.Row() values are for the corresponding
1772
            calibration job.
1773
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1774
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1775
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1776
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
1777
            0 which runs the code normally.
1778
            1 writes all files but doesn't submit any jobs to Slurm.
1779
            2 writes tables but doesn't write scripts or submit anything.
1780
            3 Doesn't write or submit anything but queries Slurm normally for job status.
1781
            4 Doesn't write, submit jobs, or query Slurm.
1782
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
1783
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1784
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1785
            than failing completely from failed calibrations. Default is False.
1786
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1787
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1788
            remaining cameras not found to exist.
1789
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1790
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
1791
        extra_job_args (dict): Dictionary with key-value pairs that specify additional
1792
            information used for a specific type of job. Examples include
1793
            laststeps for for tilenight, etc.
1794

1795
    Returns:
1796
        tuple: A tuple containing:
1797

1798
        * ptable, Table. The same processing table as input except with added rows for the joint fit job.
1799
        * tnight_prow, dict. Row of a processing table corresponding to the tilenight job.
1800
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1801
    """
1802
    log = get_logger()
1✔
1803
    if len(prows) < 1:
1✔
1804
        return ptable, None, internal_id
×
1805

1806
    log.info(" ")
1✔
1807
    log.info(f"Running tilenight.\n")
1✔
1808

1809
    tnight_prow = make_tnight_prow(prows, calibjobs, internal_id=internal_id)
1✔
1810
    internal_id += 1
1✔
1811
    tnight_prow = create_and_submit(tnight_prow, queue=queue, reservation=reservation, dry_run=dry_run,
1✔
1812
                                   strictly_successful=strictly_successful, check_for_outputs=False,
1813
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name,
1814
                                   use_specter=use_specter, extra_job_args=extra_job_args)
1815
    ptable.add_row(tnight_prow)
1✔
1816

1817
    return ptable, tnight_prow, internal_id
1✔
1818

1819
## wrapper functions for joint fitting
1820
def science_joint_fit(ptable, sciences, internal_id, queue='realtime', reservation=None,
1✔
1821
                      z_submit_types=None, dry_run=0, strictly_successful=False,
1822
                      check_for_outputs=True, resubmit_partial_complete=True,
1823
                      system_name=None):
1824
    """
1825
    Wrapper function for desiproc.workflow.processing.joint_fit specific to the stdstarfit joint fit and redshift fitting.
1826

1827
    All variables are the same except:
1828

1829
        Arg 'sciences' is mapped to the prows argument of joint_fit.
1830
        The joint_fit argument descriptor is pre-defined as 'science'.
1831
    """
1832
    return joint_fit(ptable=ptable, prows=sciences, internal_id=internal_id, queue=queue, reservation=reservation,
×
1833
                     descriptor='science', z_submit_types=z_submit_types, dry_run=dry_run,
1834
                     strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1835
                     resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1836

1837

1838
def flat_joint_fit(ptable, flats, internal_id, queue='realtime',
1✔
1839
                   reservation=None, dry_run=0, strictly_successful=False,
1840
                   check_for_outputs=True, resubmit_partial_complete=True,
1841
                   system_name=None):
1842
    """
1843
    Wrapper function for desiproc.workflow.processing.joint_fit specific to the nightlyflat joint fit.
1844

1845
    All variables are the same except:
1846

1847
        Arg 'flats' is mapped to the prows argument of joint_fit.
1848
        The joint_fit argument descriptor is pre-defined as 'nightlyflat'.
1849
    """
1850
    return joint_fit(ptable=ptable, prows=flats, internal_id=internal_id, queue=queue, reservation=reservation,
×
1851
                     descriptor='nightlyflat', dry_run=dry_run, strictly_successful=strictly_successful,
1852
                     check_for_outputs=check_for_outputs, resubmit_partial_complete=resubmit_partial_complete,
1853
                     system_name=system_name)
1854

1855

1856
def arc_joint_fit(ptable, arcs, internal_id, queue='realtime',
1✔
1857
                  reservation=None, dry_run=0, strictly_successful=False,
1858
                  check_for_outputs=True, resubmit_partial_complete=True,
1859
                  system_name=None):
1860
    """
1861
    Wrapper function for desiproc.workflow.processing.joint_fit specific to the psfnight joint fit.
1862

1863
    All variables are the same except:
1864

1865
        Arg 'arcs' is mapped to the prows argument of joint_fit.
1866
        The joint_fit argument descriptor is pre-defined as 'psfnight'.
1867
    """
1868
    return joint_fit(ptable=ptable, prows=arcs, internal_id=internal_id, queue=queue, reservation=reservation,
×
1869
                     descriptor='psfnight', dry_run=dry_run, strictly_successful=strictly_successful,
1870
                     check_for_outputs=check_for_outputs, resubmit_partial_complete=resubmit_partial_complete,
1871
                     system_name=system_name)
1872

1873
def make_joint_prow(prows, descriptor, internal_id):
1✔
1874
    """
1875
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
1876
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
1877
    input prows).
1878

1879
    Args:
1880
        prows, list or array of dicts. The rows corresponding to the individual exposure jobs that are
1881
            inputs to the joint fit.
1882
        descriptor, str. Description of the joint fitting job. Can either be 'stdstarfit', 'psfnight', or 'nightlyflat'.
1883
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1884

1885
    Returns:
1886
        dict: Row of a processing table corresponding to the joint fit job.
1887
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1888
    """
1889
    log = get_logger()
1✔
1890
    first_row = table_row_to_dict(prows[0])
1✔
1891
    joint_prow = first_row.copy()
1✔
1892

1893
    joint_prow['INTID'] = internal_id
1✔
1894
    internal_id += 1
1✔
1895
    joint_prow['JOBDESC'] = descriptor
1✔
1896
    joint_prow['LATEST_QID'] = -99
1✔
1897
    joint_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
1✔
1898
    joint_prow['SUBMIT_DATE'] = -99
1✔
1899
    joint_prow['STATUS'] = 'UNSUBMITTED'
1✔
1900
    joint_prow['SCRIPTNAME'] = ''
1✔
1901
    joint_prow['EXPID'] = np.unique(np.concatenate([currow['EXPID'] for currow in prows])).astype(int)
1✔
1902

1903
    ## Assign the PROCCAMWORD based on the descriptor and the input exposures
1904
    ## UPDATE 2024-04-24: badamps are now included in arc/flat joint fits,
1905
    ## so grab all PROCCAMWORDs instead of filtering out BADAMP cameras
1906
    ## For flats we want any camera that exists in all 12 exposures
1907
    ## For arcs we want any camera that exists in at least 3 exposures
1908
    pcamwords = [prow['PROCCAMWORD'] for prow in prows]
1✔
1909
    if descriptor in 'stdstarfit':
1✔
1910
        joint_prow['PROCCAMWORD'] = camword_union(pcamwords,
×
1911
                                                  full_spectros_only=True)
1912
    elif descriptor in ['pernight', 'cumulative']:
1✔
1913
        joint_prow['PROCCAMWORD'] = camword_union(pcamwords,
1✔
1914
                                                  full_spectros_only=False)
1915
    elif descriptor == 'nightlyflat':
1✔
1916
        joint_prow['PROCCAMWORD'] = camword_intersection(pcamwords,
1✔
1917
                                                         full_spectros_only=False)
1918
    elif descriptor == 'psfnight':
1✔
1919
        ## Count number of exposures each camera is present for
1920
        camcheck = {}
1✔
1921
        for camword in pcamwords:
1✔
1922
            for cam in decode_camword(camword):
1✔
1923
                if cam in camcheck:
1✔
1924
                    camcheck[cam] += 1
1✔
1925
                else:
1926
                    camcheck[cam] = 1
1✔
1927
        ## if exists in 3 or more exposures, then include it
1928
        goodcams = []
1✔
1929
        for cam,camcount in camcheck.items():
1✔
1930
            if camcount >= 3:
1✔
1931
                goodcams.append(cam)
1✔
1932
        joint_prow['PROCCAMWORD'] = create_camword(goodcams)
1✔
1933
    else:
1934
        log.warning("Warning asked to produce joint proc table row for unknown"
×
1935
                    + f" job description {descriptor}")
1936

1937
    joint_prow = assign_dependency(joint_prow, dependency=prows)
1✔
1938
    return joint_prow, internal_id
1✔
1939

1940
def make_exposure_prow(erow, int_id, calibjobs, jobdesc=None):
1✔
1941
    prow = erow_to_prow(erow)
1✔
1942
    prow['INTID'] = int_id
1✔
1943
    int_id += 1
1✔
1944
    if jobdesc is None:
1✔
1945
        prow['JOBDESC'] = prow['OBSTYPE']
1✔
1946
    else:
1947
        prow['JOBDESC'] = jobdesc
1✔
1948
    prow = define_and_assign_dependency(prow, calibjobs)
1✔
1949
    return prow, int_id
1✔
1950

1951
def make_tnight_prow(prows, calibjobs, internal_id):
1✔
1952
    """
1953
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
1954
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
1955
    input prows).
1956

1957
    Args:
1958
        prows, list or array of dicts. Unsumbitted rows corresponding to the individual prestdstar jobs that are
1959
            the first steps of tilenight.
1960
        calibjobs, dict. Dictionary containing keys that each corresponds to a Table.Row or
1961
            None, with each table.Row() value corresponding to a calibration job
1962
            on which the tilenight job depends.
1963
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1964

1965
    Returns:
1966
        dict: Row of a processing table corresponding to the tilenight job.
1967
    """
1968
    first_row = table_row_to_dict(prows[0])
1✔
1969
    joint_prow = first_row.copy()
1✔
1970

1971
    joint_prow['INTID'] = internal_id
1✔
1972
    joint_prow['JOBDESC'] = 'tilenight'
1✔
1973
    joint_prow['LATEST_QID'] = -99
1✔
1974
    joint_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
1✔
1975
    joint_prow['SUBMIT_DATE'] = -99
1✔
1976
    joint_prow['STATUS'] = 'UNSUBMITTED'
1✔
1977
    joint_prow['SCRIPTNAME'] = ''
1✔
1978
    joint_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int)
1✔
1979

1980
    joint_prow = define_and_assign_dependency(joint_prow, calibjobs, use_tilenight=True)
1✔
1981

1982
    return joint_prow
1✔
1983

1984
def make_redshift_prow(prows, tnights, descriptor, internal_id):
1✔
1985
    """
1986
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
1987
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
1988
    input prows).
1989

1990
    Args:
1991
        prows, list or array of dicts. Unsumbitted rows corresponding to the individual prestdstar jobs that are
1992
            the first steps of tilenight.
1993
        tnights, list or array of Table.Row objects. Rows corresponding to the tilenight jobs on which the redshift job depends.
1994
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1995

1996
    Returns:
1997
        dict: Row of a processing table corresponding to the tilenight jobs.
1998
    """
1999
    first_row = table_row_to_dict(prows[0])
×
2000
    redshift_prow = first_row.copy()
×
2001

2002
    redshift_prow['INTID'] = internal_id
×
2003
    redshift_prow['JOBDESC'] = descriptor
×
2004
    redshift_prow['LATEST_QID'] = -99
×
2005
    redshift_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
×
2006
    redshift_prow['SUBMIT_DATE'] = -99
×
2007
    redshift_prow['STATUS'] = 'UNSUBMITTED'
×
2008
    redshift_prow['SCRIPTNAME'] = ''
×
2009
    redshift_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int)
×
2010

2011
    redshift_prow = assign_dependency(redshift_prow,dependency=tnights)
×
2012

2013
    return redshift_prow
×
2014

2015
def checkfor_and_submit_joint_job(ptable, arcs, flats, sciences, calibjobs,
1✔
2016
                                  lasttype, internal_id, z_submit_types=None, dry_run=0,
2017
                                  queue='realtime', reservation=None, strictly_successful=False,
2018
                                  check_for_outputs=True, resubmit_partial_complete=True,
2019
                                  system_name=None):
2020
    """
2021
    Takes all the state-ful data from daily processing and determines whether a joint fit needs to be submitted. Places
2022
    the decision criteria into a single function for easier maintainability over time. These are separate from the
2023
    new standard manifest*.json method of indicating a calibration sequence is complete. That is checked independently
2024
    elsewhere and doesn't interact with this.
2025

2026
    Args:
2027
        ptable (Table): Processing table of all exposures that have been processed.
2028
        arcs (list of dict): list of the individual arc jobs to be used for the psfnight (NOT all
2029
            the arcs, if multiple sets existed). May be empty if none identified yet.
2030
        flats (list of dict): list of the individual flat jobs to be used for the nightlyflat (NOT
2031
            all the flats, if multiple sets existed). May be empty if none identified yet.
2032
        sciences (list of dict): list of the most recent individual prestdstar science exposures
2033
            (if currently processing that tile). May be empty if none identified yet.
2034
        calibjobs (dict): Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
2035
            and 'nightlyflat'. Each key corresponds to a Table.Row or
2036
            None. The table.Row() values are for the corresponding
2037
            calibration job.
2038
        lasttype (str or None): the obstype of the last individual exposure row to be processed.
2039
        internal_id (int): an internal identifier unique to each job. Increments with each new job. This
2040
            is the smallest unassigned value.
2041
        z_submit_types (list of str): The "group" types of redshifts that should be submitted with each
2042
            exposure. If not specified or None, then no redshifts are submitted.
2043
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
2044
            0 which runs the code normally.
2045
            1 writes all files but doesn't submit any jobs to Slurm.
2046
            2 writes tables but doesn't write scripts or submit anything.
2047
            3 Doesn't write or submit anything but queries Slurm normally for job status.
2048
            4 Doesn't write, submit jobs, or query Slurm.
2049
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
2050
        queue (str, optional): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
2051
        reservation (str, optional): The reservation to submit jobs to. If None, it is not submitted to a reservation.
2052
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
2053
            less desirable because e.g. the sciences can run with SVN default calibrations rather
2054
            than failing completely from failed calibrations. Default is False.
2055
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
2056
            data products for the script being submitted. If all files exist and this is True,
2057
            then the script will not be submitted. If some files exist and this is True, only the
2058
            subset of the cameras without the final data products will be generated and submitted.
2059
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
2060
            jobs with some prior data are pruned using PROCCAMWORD to only process the
2061
            remaining cameras not found to exist.
2062
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, permutter-gpu
2063

2064
    Returns:
2065
        tuple: A tuple containing:
2066

2067
        * ptable, Table, Processing table of all exposures that have been processed.
2068
        * calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
2069
          and 'nightlyflat'. Each key corresponds to a Table.Row or
2070
          None. The table.Row() values are for the corresponding
2071
          calibration job.
2072
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
2073
          (if currently processing that tile). May be empty if none identified yet or
2074
          we just submitted them for processing.
2075
        * internal_id, int, if no job is submitted, this is the same as the input, otherwise it is incremented upward from
2076
          from the input such that it represents the smallest unused ID.
2077
    """
2078
    if lasttype == 'science' and len(sciences) > 0:
×
2079
        log = get_logger()
×
2080
        skysubonly = np.array([sci['LASTSTEP'] == 'skysub' for sci in sciences])
×
2081
        if np.all(skysubonly):
×
2082
            log.error("Identified all exposures in joint fitting request as skysub-only. Not submitting")
×
2083
            sciences = []
×
2084
            return ptable, calibjobs, sciences, internal_id
×
2085

2086
        if np.any(skysubonly):
×
2087
            log.error("Identified skysub-only exposures in joint fitting request")
×
2088
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
2089
            log.info("LASTSTEP's: {}".format([row['LASTSTEP'] for row in sciences]))
×
2090
            sciences = (np.array(sciences,dtype=object)[~skysubonly]).tolist()
×
2091
            log.info("Removed skysub only exposures in joint fitting:")
×
2092
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
2093
            log.info("LASTSTEP's: {}".format([row['LASTSTEP'] for row in sciences]))
×
2094

2095
        from collections import Counter
×
2096
        tiles = np.array([sci['TILEID'] for sci in sciences])
×
2097
        counts = Counter(tiles)
×
2098
        if len(counts.most_common()) > 1:
×
2099
            log.error("Identified more than one tile in a joint fitting request")
×
2100
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
2101
            log.info("Tileid's: {}".format(tiles))
×
2102
            log.info("Returning without joint fitting any of these exposures.")
×
2103
            # most_common, nmost_common = counts.most_common()[0]
2104
            # if most_common == -99:
2105
            #     most_common, nmost_common = counts.most_common()[1]
2106
            # log.warning(f"Given multiple tiles to jointly fit: {counts}. "+
2107
            #             "Only processing the most common non-default " +
2108
            #             f"tile: {most_common} with {nmost_common} exposures")
2109
            # sciences = (np.array(sciences,dtype=object)[tiles == most_common]).tolist()
2110
            # log.info("Tiles and exposure id's being submitted for joint fitting:")
2111
            # log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
2112
            # log.info("Tileid's: {}".format([row['TILEID'] for row in sciences]))
2113
            sciences = []
×
2114
            return ptable, calibjobs, sciences, internal_id
×
2115

2116
        ptable, tilejob, internal_id = science_joint_fit(ptable, sciences, internal_id, z_submit_types=z_submit_types,
×
2117
                                                         dry_run=dry_run, queue=queue, reservation=reservation,
2118
                                                         strictly_successful=strictly_successful,
2119
                                                         check_for_outputs=check_for_outputs,
2120
                                                         resubmit_partial_complete=resubmit_partial_complete,
2121
                                                         system_name=system_name)
2122
        if tilejob is not None:
×
2123
            sciences = []
×
2124

2125
    elif lasttype == 'flat' and calibjobs['nightlyflat'] is None and len(flats) == 12:
×
2126
        ## Note here we have an assumption about the number of expected flats being greater than 11
2127
        ptable, calibjobs['nightlyflat'], internal_id \
×
2128
            = flat_joint_fit(ptable, flats, internal_id, dry_run=dry_run, queue=queue,
2129
                             reservation=reservation, strictly_successful=strictly_successful,
2130
                             check_for_outputs=check_for_outputs,
2131
                             resubmit_partial_complete=resubmit_partial_complete,
2132
                             system_name=system_name
2133
                            )
2134

2135
    elif lasttype == 'arc' and calibjobs['psfnight'] is None and len(arcs) == 5:
×
2136
        ## Note here we have an assumption about the number of expected arcs being greater than 4
2137
        ptable, calibjobs['psfnight'], internal_id \
×
2138
            = arc_joint_fit(ptable, arcs, internal_id, dry_run=dry_run, queue=queue,
2139
                            reservation=reservation, strictly_successful=strictly_successful,
2140
                            check_for_outputs=check_for_outputs,
2141
                            resubmit_partial_complete=resubmit_partial_complete,
2142
                            system_name=system_name
2143
                            )
2144
    return ptable, calibjobs, sciences, internal_id
×
2145

2146
def submit_tilenight_and_redshifts(ptable, sciences, calibjobs, internal_id, dry_run=0,
1✔
2147
                                  queue='realtime', reservation=None, strictly_successful=False,
2148
                                  check_for_outputs=True, resubmit_partial_complete=True,
2149
                                  system_name=None,use_specter=False, extra_job_args=None):
2150
    """
2151
    Takes all the state-ful data from daily processing and determines whether a tilenight job needs to be submitted.
2152

2153
    Args:
2154
        ptable (Table): Processing table of all exposures that have been processed.
2155
        sciences (list of dict): list of the most recent individual prestdstar science exposures
2156
            (if currently processing that tile). May be empty if none identified yet.
2157
        internal_id (int): an internal identifier unique to each job. Increments with each new job. This
2158
            is the smallest unassigned value.
2159
        dry_run (int, optional): If nonzero, this is a simulated run. Default is 0.
2160
            0 which runs the code normally.
2161
            1 writes all files but doesn't submit any jobs to Slurm.
2162
            2 writes tables but doesn't write scripts or submit anything.
2163
            3 Doesn't write or submit anything but queries Slurm normally for job status.
2164
            4 Doesn't write, submit jobs, or query Slurm.
2165
            5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
2166
        queue (str, optional): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
2167
        reservation (str, optional): The reservation to submit jobs to. If None, it is not submitted to a reservation.
2168
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
2169
            less desirable because e.g. the sciences can run with SVN default calibrations rather
2170
            than failing completely from failed calibrations. Default is False.
2171
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
2172
            data products for the script being submitted. If all files exist and this is True,
2173
            then the script will not be submitted. If some files exist and this is True, only the
2174
            subset of the cameras without the final data products will be generated and submitted.
2175
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
2176
            jobs with some prior data are pruned using PROCCAMWORD to only process the
2177
            remaining cameras not found to exist.
2178
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, permutter-gpu
2179
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
2180
        extra_job_args (dict, optional): Dictionary with key-value pairs that specify additional
2181
            information used for a specific type of job. Examples include
2182
            laststeps for tilenight, z_submit_types for redshifts, etc.
2183

2184
    Returns:
2185
        tuple: A tuple containing:
2186

2187
        * ptable, Table, Processing table of all exposures that have been processed.
2188
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
2189
          (if currently processing that tile). May be empty if none identified yet or
2190
          we just submitted them for processing.
2191
        * internal_id, int, if no job is submitted, this is the same as the input, otherwise it is incremented upward from
2192
          from the input such that it represents the smallest unused ID.
2193
    """
2194
    ptable, tnight, internal_id = submit_tilenight(ptable, sciences, calibjobs, internal_id,
1✔
2195
                                             queue=queue, reservation=reservation,
2196
                                             dry_run=dry_run, strictly_successful=strictly_successful,
2197
                                             resubmit_partial_complete=resubmit_partial_complete,
2198
                                             system_name=system_name,use_specter=use_specter,
2199
                                             extra_job_args=extra_job_args)
2200

2201
    z_submit_types = None
1✔
2202
    if 'z_submit_types'  in extra_job_args:
1✔
2203
        z_submit_types = extra_job_args['z_submit_types']
1✔
2204
        
2205
    ptable, internal_id = submit_redshifts(ptable, sciences, tnight, internal_id,
1✔
2206
                                    queue=queue, reservation=reservation,
2207
                                    dry_run=dry_run, strictly_successful=strictly_successful,
2208
                                    check_for_outputs=check_for_outputs,
2209
                                    resubmit_partial_complete=resubmit_partial_complete,
2210
                                    z_submit_types=z_submit_types,
2211
                                    system_name=system_name)
2212

2213
    if tnight is not None:
1✔
2214
        sciences = []
1✔
2215

2216
    return ptable, sciences, internal_id
1✔
2217

2218
def set_calibrator_flag(prows, ptable):
1✔
2219
    """
2220
    Sets the "CALIBRATOR" column of a procesing table row to 1 (integer representation of True)
2221
     for all input rows. Used within joint fitting code to flag the exposures that were input
2222
     to the psfnight or nightlyflat for later reference.
2223

2224
    Args:
2225
        prows, list or array of Table.Rows or dicts. The rows corresponding to the individual exposure jobs that are
2226
            inputs to the joint fit.
2227
        ptable, Table. The processing table where each row is a processed job.
2228

2229
    Returns:
2230
        Table: The same processing table as input except with added rows for the joint fit job and, in the case
2231
        of a stdstarfit, the poststdstar science exposure jobs.
2232
    """
2233
    for prow in prows:
1✔
2234
        ptable['CALIBRATOR'][ptable['INTID'] == prow['INTID']] = 1
1✔
2235
    return ptable
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc