• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

desihub / desispec / 11924036485

20 Nov 2024 12:07AM UTC coverage: 30.072% (-0.09%) from 30.16%
11924036485

Pull #2411

github

segasai
fixes following the review
update QA code
expand comments
uppercase the column names
Pull Request #2411: save the trace shift offsets in the psf file

1 of 31 new or added lines in 3 files covered. (3.23%)

1675 existing lines in 20 files now uncovered.

14637 of 48673 relevant lines covered (30.07%)

0.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.27
/py/desispec/workflow/processing.py
1
"""
2
desispec.workflow.processing
3
============================
4

5
"""
6
import sys, os, glob
1✔
7
import json
1✔
8
from astropy.io import fits
1✔
9
from astropy.table import Table, join
1✔
10
import numpy as np
1✔
11

12
import time, datetime
1✔
13
from collections import OrderedDict
1✔
14
import subprocess
1✔
15

16
from desispec.scripts.link_calibnight import derive_include_exclude
1✔
17
from desispec.scripts.tile_redshifts import generate_tile_redshift_scripts
1✔
18
from desispec.workflow.redshifts import get_ztile_script_pathname, \
1✔
19
    get_ztile_relpath, \
20
    get_ztile_script_suffix
21
from desispec.workflow.exptable import read_minimal_science_exptab_cols
1✔
22
from desispec.workflow.queue import get_resubmission_states, update_from_queue, \
1✔
23
    queue_info_from_qids, get_queue_states_from_qids, update_queue_state_cache, \
24
    get_non_final_states
25
from desispec.workflow.timing import what_night_is_it
1✔
26
from desispec.workflow.desi_proc_funcs import get_desi_proc_batch_file_pathname, \
1✔
27
    create_desi_proc_batch_script, \
28
    get_desi_proc_batch_file_path, \
29
    get_desi_proc_tilenight_batch_file_pathname, \
30
    create_desi_proc_tilenight_batch_script, create_linkcal_batch_script
31
from desispec.workflow.batch import parse_reservation
1✔
32
from desispec.workflow.utils import pathjoin, sleep_and_report, \
1✔
33
    load_override_file
34
from desispec.workflow.tableio import write_table, load_table
1✔
35
from desispec.workflow.proctable import table_row_to_dict, erow_to_prow, \
1✔
36
    read_minimal_tilenight_proctab_cols, read_minimal_full_proctab_cols, \
37
    update_full_ptab_cache, default_prow, get_default_qid
38
from desiutil.log import get_logger
1✔
39

40
from desispec.io import findfile, specprod_root
1✔
41
from desispec.io.util import decode_camword, create_camword, \
1✔
42
    difference_camwords, \
43
    camword_to_spectros, camword_union, camword_intersection, parse_badamps
44

45

46
#################################################
47
############## Misc Functions ###################
48
#################################################
49
def night_to_starting_iid(night=None):
1✔
50
    """
51
    Creates an internal ID for a given night. The resulting integer is an 8 digit number.
52
    The digits are YYMMDDxxx where YY is the years since 2000, MM and DD are the month and day. xxx are 000,
53
    and are incremented for up to 1000 unique job ID's for a given night.
54

55
    Args:
56
        night (str or int): YYYYMMDD of the night to get the starting internal ID for.
57

58
    Returns:
59
        int: 9 digit number consisting of YYMMDD000. YY is years after 2000, MMDD is month and day.
60
        000 being the starting job number (0).
61
    """
62
    if night is None:
1✔
63
        night = what_night_is_it()
×
64
    night = int(night)
1✔
65
    internal_id = (night - 20000000) * 1000
1✔
66
    return internal_id
1✔
67

68
class ProcessingParams():
1✔
69
    def __init__(self, dry_run_level=0, queue='realtime',
1✔
70
                 reservation=None, strictly_successful=True,
71
                 check_for_outputs=True,
72
                 resubmit_partial_complete=True,
73
                 system_name='perlmutter', use_specter=True):
74

75
        self.dry_run_level = dry_run_level
×
76
        self.system_name = system_name
×
77
        self.queue = queue
×
78
        self.reservation = reservation
×
79
        self.strictly_successful = strictly_successful
×
80
        self.check_for_outputs = check_for_outputs
×
81
        self.resubmit_partial_complete = resubmit_partial_complete
×
82
        self.use_specter = use_specter
×
83

84
#################################################
85
############ Script Functions ###################
86
#################################################
87
def batch_script_name(prow):
1✔
88
    """
89
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, JOBDESC, PROCCAMWORD defined)
90
    and determines the script file pathname as defined by desi_proc's helper functions.
91

92
    Args:
93
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
94

95
    Returns:
96
        str: The complete pathname to the script file, as it is defined within the desi_proc ecosystem.
97
    """
98
    expids = prow['EXPID']
1✔
99
    if len(expids) == 0:
1✔
100
        expids = None
1✔
101
    if prow['JOBDESC'] == 'tilenight':
1✔
102
        pathname = get_desi_proc_tilenight_batch_file_pathname(night = prow['NIGHT'], tileid=prow['TILEID'])
1✔
103
    else:
104
        pathname = get_desi_proc_batch_file_pathname(night = prow['NIGHT'], exp=expids, \
1✔
105
                                             jobdesc=prow['JOBDESC'], cameras=prow['PROCCAMWORD'])
106
    scriptfile =  pathname + '.slurm'
1✔
107
    return scriptfile
1✔
108

109
def get_jobdesc_to_file_map():
1✔
110
    """
111
    Returns a mapping of job descriptions to the filenames of the output files
112

113
    Args:
114
        None
115

116
    Returns:
117
        dict. Dictionary with keys as lowercase job descriptions and to the
118
            filename of their expected outputs.
119

120
    """
121
    return {'prestdstar': 'sframe',
1✔
122
            'stdstarfit': 'stdstars',
123
            'poststdstar': 'cframe',
124
            'nightlybias': 'biasnight',
125
            # 'ccdcalib': 'badcolumns',
126
            'badcol': 'badcolumns',
127
            'arc': 'fitpsf',
128
            'flat': 'fiberflat',
129
            'psfnight': 'psfnight',
130
            'nightlyflat': 'fiberflatnight',
131
            'spectra': 'spectra_tile',
132
            'coadds': 'coadds_tile',
133
            'redshift': 'redrock_tile'}
134

135
def get_file_to_jobdesc_map():
1✔
136
    """
137
    Returns a mapping of output filenames to job descriptions
138

139
    Args:
140
        None
141

142
    Returns:
143
        dict. Dictionary with keys as filename of their expected outputs to
144
            the lowercase job descriptions
145
            .
146

147
    """
148
    job_to_file_map = get_jobdesc_to_file_map()
×
149
    job_to_file_map.pop('badcol') # these files can also be in a ccdcalib job
×
150
    job_to_file_map.pop('nightlybias') # these files can also be in a ccdcalib job
×
151
    return {value: key for key, value in job_to_file_map.items()}
×
152

153
def check_for_outputs_on_disk(prow, resubmit_partial_complete=True):
1✔
154
    """
155
    Args:
156
        prow (Table.Row or dict): Must include keyword accessible definitions for processing_table columns found in
157
            desispect.workflow.proctable.get_processing_table_column_defs()
158
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
159
            jobs with some prior data are pruned using PROCCAMWORD to only process the
160
            remaining cameras not found to exist.
161

162
    Returns:
163
        Table.Row or dict: The same prow type and keywords as input except with modified values updated to reflect
164
        the change in job status after creating and submitting the job for processing.
165
    """
166
    prow['STATUS'] = 'UNKNOWN'
1✔
167
    log = get_logger()
1✔
168

169
    if prow['JOBDESC'] in ['linkcal', 'ccdcalib']:
1✔
170
        log.info(f"jobdesc={prow['JOBDESC']} has indeterminated outputs, so "
1✔
171
                + "not checking for files on disk.")
172
        return prow
1✔
173

174
    job_to_file_map = get_jobdesc_to_file_map()
1✔
175

176
    night = prow['NIGHT']
1✔
177
    if prow['JOBDESC'] in ['cumulative','pernight-v0','pernight','perexp']:
1✔
178
        filetype = 'redrock_tile'
1✔
179
    else:
180
        filetype = job_to_file_map[prow['JOBDESC']]
1✔
181
    orig_camword = prow['PROCCAMWORD']
1✔
182

183
    ## if spectro based, look for spectros, else look for cameras
184
    if prow['JOBDESC'] in ['stdstarfit','spectra','coadds','redshift']:
1✔
185
        ## Spectrograph based
186
        spectros = camword_to_spectros(prow['PROCCAMWORD'])
×
187
        n_desired = len(spectros)
×
188
        ## Suppress outputs about using tile based files in findfile if only looking for stdstarfits
189
        if prow['JOBDESC'] == 'stdstarfit':
×
190
            tileid = None
×
191
        else:
192
            tileid = prow['TILEID']
×
193
        expid = prow['EXPID'][0]
×
194
        existing_spectros = []
×
195
        for spectro in spectros:
×
196
            if os.path.exists(findfile(filetype=filetype, night=night, expid=expid, spectrograph=spectro, tile=tileid)):
×
197
                existing_spectros.append(spectro)
×
198
        completed = (len(existing_spectros) == n_desired)
×
199
        if not completed and resubmit_partial_complete and len(existing_spectros) > 0:
×
200
            existing_camword = 'a' + ''.join([str(spec) for spec in sorted(existing_spectros)])
×
201
            prow['PROCCAMWORD'] = difference_camwords(prow['PROCCAMWORD'],existing_camword)
×
202
    elif prow['JOBDESC'] in ['cumulative','pernight-v0','pernight','perexp']:
1✔
203
        ## Spectrograph based
204
        spectros = camword_to_spectros(prow['PROCCAMWORD'])
1✔
205
        n_desired = len(spectros)
1✔
206
        ## Suppress outputs about using tile based files in findfile if only looking for stdstarfits
207
        tileid = prow['TILEID']
1✔
208
        expid = prow['EXPID'][0]
1✔
209
        redux_dir = specprod_root()
1✔
210
        outdir = os.path.join(redux_dir,get_ztile_relpath(tileid,group=prow['JOBDESC'],night=night,expid=expid))
1✔
211
        suffix = get_ztile_script_suffix(tileid, group=prow['JOBDESC'], night=night, expid=expid)
1✔
212
        existing_spectros = []
1✔
213
        for spectro in spectros:
1✔
214
            if os.path.exists(os.path.join(outdir, f"redrock-{spectro}-{suffix}.fits")):
1✔
215
                existing_spectros.append(spectro)
×
216
        completed = (len(existing_spectros) == n_desired)
1✔
217
        if not completed and resubmit_partial_complete and len(existing_spectros) > 0:
1✔
218
            existing_camword = 'a' + ''.join([str(spec) for spec in sorted(existing_spectros)])
×
219
            prow['PROCCAMWORD'] = difference_camwords(prow['PROCCAMWORD'],existing_camword)
×
220
    else:
221
        ## Otheriwse camera based
222
        cameras = decode_camword(prow['PROCCAMWORD'])
1✔
223
        n_desired = len(cameras)
1✔
224
        if len(prow['EXPID']) > 0:
1✔
225
            expid = prow['EXPID'][0]
1✔
226
        else:
227
            expid = None
1✔
228
        if len(prow['EXPID']) > 1 and prow['JOBDESC'] not in ['psfnight','nightlyflat']:
1✔
229
            log.warning(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']}. This job type only makes " +
×
230
                     f"sense with a single exposure. Proceeding with {expid}.")
231
        missing_cameras = []
1✔
232
        for cam in cameras:
1✔
233
            if not os.path.exists(findfile(filetype=filetype, night=night, expid=expid, camera=cam)):
1✔
234
                missing_cameras.append(cam)
1✔
235
        completed = (len(missing_cameras) == 0)
1✔
236
        if not completed and resubmit_partial_complete and len(missing_cameras) < n_desired:
1✔
237
            prow['PROCCAMWORD'] = create_camword(missing_cameras)
×
238

239
    if completed:
1✔
240
        prow['STATUS'] = 'COMPLETED'
×
241
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} already has " +
×
242
                 f"the desired {n_desired} {filetype}'s. Not submitting this job.")
243
    elif resubmit_partial_complete and orig_camword != prow['PROCCAMWORD']:
1✔
244
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} already has " +
×
245
                 f"some {filetype}'s. Submitting smaller camword={prow['PROCCAMWORD']}.")
246
    elif not resubmit_partial_complete:
1✔
247
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} doesn't have all " +
×
248
                 f"{filetype}'s and resubmit_partial_complete=False. "+
249
                 f"Submitting full camword={prow['PROCCAMWORD']}.")
250
    else:
251
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} has no " +
1✔
252
                 f"existing {filetype}'s. Submitting full camword={prow['PROCCAMWORD']}.")
253
    return prow
1✔
254

255
def create_and_submit(prow, queue='realtime', reservation=None, dry_run=0,
1✔
256
                      joint=False, strictly_successful=False,
257
                      check_for_outputs=True, resubmit_partial_complete=True,
258
                      system_name=None, use_specter=False,
259
                      extra_job_args=None):
260
    """
261
    Wrapper script that takes a processing table row and three modifier keywords, creates a submission script for the
262
    compute nodes, and then submits that script to the Slurm scheduler with appropriate dependencies.
263

264
    Args:
265
        prow (Table.Row or dict): Must include keyword accessible definitions for processing_table columns found in
266
            desispect.workflow.proctable.get_processing_table_column_defs()
267
        queue (str, optional): The name of the NERSC Slurm queue to submit to. Default is the realtime queue.
268
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
269
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
270
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
271
            for testing as though scripts are being submitted. Default is 0 (false).
272
        joint (bool, optional): Whether this is a joint fitting job (the job involves multiple exposures) and therefore needs to be
273
            run with desi_proc_joint_fit. Default is False.
274
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
275
            less desirable because e.g. the sciences can run with SVN default calibrations rather
276
            than failing completely from failed calibrations. Default is False.
277
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
278
            data products for the script being submitted. If all files exist and this is True,
279
            then the script will not be submitted. If some files exist and this is True, only the
280
            subset of the cameras without the final data products will be generated and submitted.
281
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
282
            jobs with some prior data are pruned using PROCCAMWORD to only process the
283
            remaining cameras not found to exist.
284
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
285
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
286
        extra_job_args (dict): Dictionary with key-value pairs that specify additional
287
            information used for a specific type of job. Examples include refnight
288
            and include/exclude lists for linkcals, laststeps for for tilenight, etc.
289

290
    Returns:
291
        Table.Row or dict: The same prow type and keywords as input except with modified values updated to reflect
292
        the change in job status after creating and submitting the job for processing.
293

294
    Note:
295
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
296
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
297
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
298
    """
299
    orig_prow = prow.copy()
1✔
300
    if check_for_outputs:
1✔
301
        prow = check_for_outputs_on_disk(prow, resubmit_partial_complete)
1✔
302
        if prow['STATUS'].upper() == 'COMPLETED':
1✔
UNCOV
303
            return prow
×
304

305
    prow = create_batch_script(prow, queue=queue, dry_run=dry_run, joint=joint,
1✔
306
                               system_name=system_name, use_specter=use_specter,
307
                               extra_job_args=extra_job_args)
308
    prow = submit_batch_script(prow, reservation=reservation, dry_run=dry_run,
1✔
309
                               strictly_successful=strictly_successful)
310

311
    ## If resubmitted partial, the PROCCAMWORD and SCRIPTNAME will correspond
312
    ## to the pruned values. But we want to
313
    ## retain the full job's value, so get those from the old job.
314
    if resubmit_partial_complete:
1✔
315
        prow['PROCCAMWORD'] = orig_prow['PROCCAMWORD']
1✔
316
        prow['SCRIPTNAME'] = orig_prow['SCRIPTNAME']
1✔
317
    return prow
1✔
318

319
def desi_link_calibnight_command(prow, refnight, include=None):
1✔
320
    """
321
    Wrapper script that takes a processing table row (or dictionary with
322
    REFNIGHT, NIGHT, PROCCAMWORD defined) and determines the proper command
323
    line call to link data defined by the input row/dict.
324

325
    Args:
326
        prow (Table.Row or dict): Must include keyword accessible definitions
327
            for 'NIGHT', 'REFNIGHT', and 'PROCCAMWORD'.
328
        refnight (str or int): The night with a valid set of calibrations
329
            be created.
330
        include (list): The filetypes to include in the linking.
331
    Returns:
332
        str: The proper command to be submitted to desi_link_calibnight
333
            to process the job defined by the prow values.
334
    """
335
    cmd = 'desi_link_calibnight'
1✔
336
    cmd += f' --refnight={refnight}'
1✔
337
    cmd += f' --newnight={prow["NIGHT"]}'
1✔
338
    cmd += f' --cameras={prow["PROCCAMWORD"]}'
1✔
339
    if include is not None:
1✔
340
        cmd += f' --include=' + ','.join(list(include))
1✔
341
    return cmd
1✔
342

343
def desi_proc_command(prow, system_name, use_specter=False, queue=None):
1✔
344
    """
345
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, OBSTYPE, JOBDESC, PROCCAMWORD defined)
346
    and determines the proper command line call to process the data defined by the input row/dict.
347

348
    Args:
349
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
350
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, perlmutter-gpu
351
        queue (str, optional): The name of the NERSC Slurm queue to submit to. Default is None (which leaves it to the desi_proc default).
352
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
353

354
    Returns:
355
        str: The proper command to be submitted to desi_proc to process the job defined by the prow values.
356
    """
357
    cmd = 'desi_proc'
1✔
358
    cmd += ' --batch'
1✔
359
    cmd += ' --nosubmit'
1✔
360
    if queue is not None:
1✔
361
        cmd += f' -q {queue}'
1✔
362
    if prow['OBSTYPE'].lower() == 'science':
1✔
UNCOV
363
        if prow['JOBDESC'] == 'prestdstar':
×
UNCOV
364
            cmd += ' --nostdstarfit --nofluxcalib'
×
UNCOV
365
        elif prow['JOBDESC'] == 'poststdstar':
×
UNCOV
366
            cmd += ' --noprestdstarfit --nostdstarfit'
×
367

368
        if use_specter:
×
369
            cmd += ' --use-specter'
×
370
    elif prow['JOBDESC'] in ['flat', 'prestdstar'] and use_specter:
1✔
UNCOV
371
        cmd += ' --use-specter'
×
372
    pcamw = str(prow['PROCCAMWORD'])
1✔
373
    cmd += f" --cameras={pcamw} -n {prow['NIGHT']}"
1✔
374
    if len(prow['EXPID']) > 0:
1✔
375
        ## If ccdcalib job without a dark exposure, don't assign the flat expid
376
        ## since it would incorrectly process the flat using desi_proc
377
        if prow['OBSTYPE'].lower() != 'flat' or prow['JOBDESC'] != 'ccdcalib':
1✔
378
            cmd += f" -e {prow['EXPID'][0]}"
1✔
379
    if prow['BADAMPS'] != '':
1✔
UNCOV
380
        cmd += ' --badamps={}'.format(prow['BADAMPS'])
×
381
    return cmd
1✔
382

383
def desi_proc_joint_fit_command(prow, queue=None):
1✔
384
    """
385
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, OBSTYPE, PROCCAMWORD defined)
386
    and determines the proper command line call to process the data defined by the input row/dict.
387

388
    Args:
389
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
390
        queue (str): The name of the NERSC Slurm queue to submit to. Default is None (which leaves it to the desi_proc default).
391

392
    Returns:
393
        str: The proper command to be submitted to desi_proc_joint_fit
394
            to process the job defined by the prow values.
395
    """
396
    cmd = 'desi_proc_joint_fit'
1✔
397
    cmd += ' --batch'
1✔
398
    cmd += ' --nosubmit'
1✔
399
    if queue is not None:
1✔
400
        cmd += f' -q {queue}'
1✔
401

402
    descriptor = prow['OBSTYPE'].lower()
1✔
403

404
    night = prow['NIGHT']
1✔
405
    specs = str(prow['PROCCAMWORD'])
1✔
406
    expid_str = ','.join([str(eid) for eid in prow['EXPID']])
1✔
407

408
    cmd += f' --obstype {descriptor}'
1✔
409
    cmd += f' --cameras={specs} -n {night}'
1✔
410
    if len(expid_str) > 0:
1✔
411
        cmd += f' -e {expid_str}'
1✔
412
    return cmd
1✔
413

414

415
def create_batch_script(prow, queue='realtime', dry_run=0, joint=False,
1✔
416
                        system_name=None, use_specter=False, extra_job_args=None):
417
    """
418
    Wrapper script that takes a processing table row and three modifier keywords and creates a submission script for the
419
    compute nodes.
420

421
    Args:
422
        prow, Table.Row or dict. Must include keyword accessible definitions for processing_table columns found in
423
            desispect.workflow.proctable.get_processing_table_column_defs()
424
        queue, str. The name of the NERSC Slurm queue to submit to. Default is the realtime queue.
425
        dry_run, int. If nonzero, this is a simulated run. If dry_run=1 the scripts will be written but not submitted.
426
            If dry_run=2, the scripts will not be written nor submitted. Logging will remain the same
427
            for testing as though scripts are being submitted. Default is 0 (false).
428
        joint, bool. Whether this is a joint fitting job (the job involves multiple exposures) and therefore needs to be
429
            run with desi_proc_joint_fit when not using tilenight. Default is False.
430
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
431
        use_specter, bool, optional. Default is False. If True, use specter, otherwise use gpu_specter by default.
432
        extra_job_args (dict): Dictionary with key-value pairs that specify additional
433
            information used for a specific type of job. Examples include refnight
434
            and include/exclude lists for linkcal, laststeps for tilenight, etc.
435

436
    Returns:
437
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
438
        scriptname.
439

440
    Note:
441
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
442
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
443
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
444
    """
445
    log = get_logger()
1✔
446

447
    if extra_job_args is None:
1✔
448
        extra_job_args = {}
1✔
449

450
    if prow['JOBDESC'] in ['perexp','pernight','pernight-v0','cumulative']:
1✔
451
        if dry_run > 1:
1✔
452
            scriptpathname = get_ztile_script_pathname(tileid=prow['TILEID'],group=prow['JOBDESC'],
1✔
453
                                                               night=prow['NIGHT'], expid=prow['EXPID'][0])
454

455
            log.info("Output file would have been: {}".format(scriptpathname))
1✔
456
        else:
457
            #- run zmtl for cumulative redshifts but not others
458
            run_zmtl = (prow['JOBDESC'] == 'cumulative')
1✔
459
            no_afterburners = False
1✔
460
            print(f"entering tileredshiftscript: {prow}")
1✔
461
            scripts, failed_scripts = generate_tile_redshift_scripts(tileid=prow['TILEID'], group=prow['JOBDESC'],
1✔
462
                                                                     nights=[prow['NIGHT']], expids=prow['EXPID'],
463
                                                                     batch_queue=queue, system_name=system_name,
464
                                                                     run_zmtl=run_zmtl,
465
                                                                     no_afterburners=no_afterburners,
466
                                                                     nosubmit=True)
467
            if len(failed_scripts) > 0:
1✔
UNCOV
468
                log.error(f"Redshifts failed for group={prow['JOBDESC']}, night={prow['NIGHT']}, "+
×
469
                          f"tileid={prow['TILEID']}, expid={prow['EXPID']}.")
UNCOV
470
                log.info(f"Returned failed scriptname is {failed_scripts}")
×
471
            elif len(scripts) > 1:
1✔
UNCOV
472
                log.error(f"More than one redshifts returned for group={prow['JOBDESC']}, night={prow['NIGHT']}, "+
×
473
                          f"tileid={prow['TILEID']}, expid={prow['EXPID']}.")
UNCOV
474
                log.info(f"Returned scriptnames were {scripts}")
×
475
            elif len(scripts) == 0:
1✔
476
                msg = f'No scripts were generated for {prow=}'
×
UNCOV
477
                log.critical(prow)
×
478
                raise ValueError(msg)
×
479
            else:
480
                scriptpathname = scripts[0]
1✔
481

482
    elif prow['JOBDESC'] == 'linkcal':
1✔
483
        refnight, include, exclude = -99, None, None
1✔
484
        if 'refnight' in extra_job_args:
1✔
485
            refnight = extra_job_args['refnight']
1✔
486
        if 'include' in extra_job_args:
1✔
487
            include = extra_job_args['include']
1✔
488
        if 'exclude' in extra_job_args:
1✔
UNCOV
489
            exclude = extra_job_args['exclude']
×
490
        include, exclude = derive_include_exclude(include, exclude)
1✔
491
        ## Fiberflatnights shouldn't to be generated with psfs from same time, so
492
        ## shouldn't link psfs without also linking fiberflatnight
493
        ## However, this should be checked at a higher level. If set here,
494
        ## go ahead and do it
495
        # if 'psfnight' in include and not 'fiberflatnight' in include:
496
        #     err = "Must link fiberflatnight if linking psfnight"
497
        #     log.error(err)
498
        #     raise ValueError(err)
499
        if dry_run > 1:
1✔
500
            scriptpathname = batch_script_name(prow)
1✔
501
            log.info("Output file would have been: {}".format(scriptpathname))
1✔
502
            cmd = desi_link_calibnight_command(prow, refnight, include)
1✔
503
            log.info("Command to be run: {}".format(cmd.split()))
1✔
504
        else:
UNCOV
505
            if refnight == -99:
×
UNCOV
506
                err = f'For {prow=} asked to link calibration but not given' \
×
507
                      + ' a valid refnight'
UNCOV
508
                log.error(err)
×
UNCOV
509
                raise ValueError(err)
×
510

UNCOV
511
            cmd = desi_link_calibnight_command(prow, refnight, include)
×
UNCOV
512
            log.info(f"Running: {cmd.split()}")
×
513
            scriptpathname = create_linkcal_batch_script(newnight=prow['NIGHT'],
×
514
                                                        cameras=prow['PROCCAMWORD'],
515
                                                        queue=queue,
516
                                                        cmd=cmd,
517
                                                        system_name=system_name)
518
    else:
519
        if prow['JOBDESC'] != 'tilenight':
1✔
520
            nightlybias, nightlycte, cte_expids = False, False, None
1✔
521
            if 'nightlybias' in extra_job_args:
1✔
522
                nightlybias = extra_job_args['nightlybias']
1✔
523
            elif prow['JOBDESC'].lower() == 'nightlybias':
1✔
524
                nightlybias = True
1✔
525
            if 'nightlycte' in extra_job_args:
1✔
526
                nightlycte = extra_job_args['nightlycte']
1✔
527
            if 'cte_expids' in extra_job_args:
1✔
528
                cte_expids = extra_job_args['cte_expids']
1✔
529
            ## run known joint jobs as joint even if unspecified
530
            ## in the future we can eliminate the need for "joint"
531
            if joint or prow['JOBDESC'].lower() in ['psfnight', 'nightlyflat']:
1✔
532
                cmd = desi_proc_joint_fit_command(prow, queue=queue)
1✔
533
                ## For consistency with how we edit the other commands, do them
534
                ## here, but future TODO would be to move these into the command
535
                ## generation itself
536
                if 'extra_cmd_args' in extra_job_args:
1✔
537
                    cmd += ' ' + ' '.join(np.atleast_1d(extra_job_args['extra_cmd_args']))
1✔
538
            else:
539
                cmd = desi_proc_command(prow, system_name, use_specter, queue=queue)
1✔
540
                if nightlybias:
1✔
541
                    cmd += ' --nightlybias'
1✔
542
                if nightlycte:
1✔
543
                    cmd += ' --nightlycte'
1✔
544
                    if cte_expids is not None:
1✔
545
                        cmd += ' --cte-expids '
1✔
546
                        cmd += ','.join(np.atleast_1d(cte_expids).astype(str))
1✔
547
        if dry_run > 1:
1✔
548
            scriptpathname = batch_script_name(prow)
1✔
549
            log.info("Output file would have been: {}".format(scriptpathname))
1✔
550
            if prow['JOBDESC'] != 'tilenight':
1✔
551
                log.info("Command to be run: {}".format(cmd.split()))
1✔
552
        else:
553
            expids = prow['EXPID']
1✔
554
            if len(expids) == 0:
1✔
UNCOV
555
                expids = None
×
556

557
            if prow['JOBDESC'] == 'tilenight':
1✔
558
                log.info("Creating tilenight script for tile {}".format(prow['TILEID']))
1✔
559
                if 'laststeps' in extra_job_args:
1✔
560
                    laststeps = extra_job_args['laststeps']
1✔
561
                else:
UNCOV
562
                    err = f'{prow=} job did not specify last steps to tilenight'
×
563
                    log.error(err)
×
UNCOV
564
                    raise ValueError(err)
×
565
                ncameras = len(decode_camword(prow['PROCCAMWORD']))
1✔
566
                scriptpathname = create_desi_proc_tilenight_batch_script(
1✔
567
                                                               night=prow['NIGHT'], exp=expids,
568
                                                               tileid=prow['TILEID'],
569
                                                               ncameras=ncameras,
570
                                                               queue=queue,
571
                                                               mpistdstars=True,
572
                                                               use_specter=use_specter,
573
                                                               system_name=system_name,
574
                                                               laststeps=laststeps)
575
            else:
576
                if expids is not None and len(expids) > 1:
1✔
577
                    expids = expids[:1]
1✔
578
                log.info("Running: {}".format(cmd.split()))
1✔
579
                scriptpathname = create_desi_proc_batch_script(night=prow['NIGHT'], exp=expids,
1✔
580
                                                               cameras=prow['PROCCAMWORD'],
581
                                                               jobdesc=prow['JOBDESC'],
582
                                                               queue=queue, cmdline=cmd,
583
                                                               use_specter=use_specter,
584
                                                               system_name=system_name,
585
                                                               nightlybias=nightlybias,
586
                                                               nightlycte=nightlycte,
587
                                                               cte_expids=cte_expids)
588
    log.info("Outfile is: {}".format(scriptpathname))
1✔
589
    prow['SCRIPTNAME'] = os.path.basename(scriptpathname)
1✔
590
    return prow
1✔
591

592
_fake_qid = int(time.time() - 1.7e9)
1✔
593
def _get_fake_qid():
1✔
594
    """
595
    Return fake slurm queue jobid to use for dry-run testing
596
    """
597
    # Note: not implemented as a yield generator so that this returns a
598
    # genuine int, not a generator object
599
    global _fake_qid
600
    _fake_qid += 1
1✔
601
    return _fake_qid
1✔
602

603
def submit_batch_script(prow, dry_run=0, reservation=None, strictly_successful=False):
1✔
604
    """
605
    Wrapper script that takes a processing table row and three modifier keywords and submits the scripts to the Slurm
606
    scheduler.
607

608
    Args:
609
        prow, Table.Row or dict. Must include keyword accessible definitions for processing_table columns found in
610
            desispect.workflow.proctable.get_processing_table_column_defs()
611
        dry_run, int. If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
612
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
613
            for testing as though scripts are being submitted. Default is 0 (false).
614
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
615
        strictly_successful, bool. Whether all jobs require all inputs to have succeeded. For daily processing, this is
616
            less desirable because e.g. the sciences can run with SVN default calibrations rather
617
            than failing completely from failed calibrations. Default is False.
618

619
    Returns:
620
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
621
        scriptname.
622

623
    Note:
624
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
625
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
626
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
627
    """
628
    log = get_logger()
1✔
629
    dep_qids = prow['LATEST_DEP_QID']
1✔
630
    dep_list, dep_str = '', ''
1✔
631

632
    ## With desi_proc_night we now either resubmit failed jobs or exit, so this
633
    ## should no longer be necessary in the normal workflow.
634
    # workaround for sbatch --dependency bug not tracking jobs correctly
635
    # see NERSC TICKET INC0203024
636
    failed_dependency = False
1✔
637
    if len(dep_qids) > 0 and not dry_run:
1✔
UNCOV
638
        non_final_states = get_non_final_states()
×
UNCOV
639
        state_dict = get_queue_states_from_qids(dep_qids, dry_run=dry_run, use_cache=True)
×
UNCOV
640
        still_depids = []
×
UNCOV
641
        for depid in dep_qids:
×
UNCOV
642
            if depid in state_dict.keys():
×
UNCOV
643
                if state_dict[int(depid)] == 'COMPLETED':
×
UNCOV
644
                   log.info(f"removing completed jobid {depid}")
×
UNCOV
645
                elif state_dict[int(depid)] not in non_final_states:
×
UNCOV
646
                    failed_dependency = True
×
UNCOV
647
                    log.info("Found a dependency in a bad final state="
×
648
                             + f"{state_dict[int(depid)]} for depjobid={depid},"
649
                             + " not submitting this job.")
650
                    still_depids.append(depid)
×
651
                else:
652
                    still_depids.append(depid)
×
653
            else:
654
                still_depids.append(depid)
×
655
        dep_qids = np.array(still_depids)
×
656

657
    if len(dep_qids) > 0:
1✔
658
        jobtype = prow['JOBDESC']
1✔
659
        if strictly_successful:
1✔
660
            depcond = 'afterok'
1✔
UNCOV
661
        elif jobtype in ['arc', 'psfnight', 'prestdstar', 'stdstarfit']:
×
662
            ## (though psfnight and stdstarfit will require some inputs otherwise they'll go up in flames)
UNCOV
663
            depcond = 'afterany'
×
664
        else:
665
            ## if 'flat','nightlyflat','poststdstar', or any type of redshift, require strict success of inputs
666
            depcond = 'afterok'
×
667

668
        dep_str = f'--dependency={depcond}:'
1✔
669

670
        if np.isscalar(dep_qids):
1✔
UNCOV
671
            dep_list = str(dep_qids).strip(' \t')
×
UNCOV
672
            if dep_list == '':
×
673
                dep_str = ''
×
674
            else:
675
                dep_str += dep_list
×
676
        else:
677
            if len(dep_qids)>1:
1✔
678
                dep_list = ':'.join(np.array(dep_qids).astype(str))
1✔
679
                dep_str += dep_list
1✔
680
            elif len(dep_qids) == 1 and dep_qids[0] not in [None, 0]:
1✔
681
                dep_str += str(dep_qids[0])
1✔
682
            else:
683
                dep_str = ''
×
684

685
    # script = f'{jobname}.slurm'
686
    # script_path = pathjoin(batchdir, script)
687
    if prow['JOBDESC'] in ['pernight-v0','pernight','perexp','cumulative']:
1✔
688
        script_path = get_ztile_script_pathname(tileid=prow['TILEID'],group=prow['JOBDESC'],
1✔
689
                                                        night=prow['NIGHT'], expid=np.min(prow['EXPID']))
690
        jobname = os.path.basename(script_path)
1✔
691
    else:
692
        batchdir = get_desi_proc_batch_file_path(night=prow['NIGHT'])
1✔
693
        jobname = batch_script_name(prow)
1✔
694
        script_path = pathjoin(batchdir, jobname)
1✔
695

696
    batch_params = ['sbatch', '--parsable']
1✔
697
    if dep_str != '':
1✔
698
        batch_params.append(f'{dep_str}')
1✔
699

700
    reservation = parse_reservation(reservation, prow['JOBDESC'])
1✔
701
    if reservation is not None:
1✔
UNCOV
702
        batch_params.append(f'--reservation={reservation}')
×
703

704
    batch_params.append(f'{script_path}')
1✔
705
    submitted = True
1✔
706
    ## If dry_run give it a fake QID
707
    ## if a dependency has failed don't even try to submit the job because
708
    ## Slurm will refuse, instead just mark as unsubmitted.
709
    if dry_run:
1✔
710
        current_qid = _get_fake_qid()
1✔
UNCOV
711
    elif not failed_dependency:
×
712
        #- sbatch sometimes fails; try several times before giving up
UNCOV
713
        max_attempts = 3
×
714
        for attempt in range(max_attempts):
×
UNCOV
715
            try:
×
UNCOV
716
                current_qid = subprocess.check_output(batch_params, stderr=subprocess.STDOUT, text=True)
×
UNCOV
717
                current_qid = int(current_qid.strip(' \t\n'))
×
UNCOV
718
                break
×
UNCOV
719
            except subprocess.CalledProcessError as err:
×
UNCOV
720
                log.error(f'{jobname} submission failure at {datetime.datetime.now()}')
×
UNCOV
721
                log.error(f'{jobname}   {batch_params}')
×
UNCOV
722
                log.error(f'{jobname}   {err.output=}')
×
723
                if attempt < max_attempts - 1:
×
UNCOV
724
                    log.info('Sleeping 60 seconds then retrying')
×
725
                    time.sleep(60)
×
726
        else:  #- for/else happens if loop doesn't succeed
727
            msg = f'{jobname} submission failed {max_attempts} times.' \
×
728
                  + ' setting as unsubmitted and moving on'
729
            log.error(msg)
×
730
            current_qid = get_default_qid()
×
731
            submitted = False
×
732
    else:
733
        current_qid = get_default_qid()
×
734
        submitted = False
×
735

736
    ## Update prow with new information
737
    prow['LATEST_QID'] = current_qid
1✔
738

739
    ## If we didn't submit, don't say we did and don't add to ALL_QIDS
740
    if submitted:
1✔
741
        log.info(batch_params)
1✔
742
        log.info(f'Submitted {jobname} with dependencies {dep_str} and '
1✔
743
                 + f'reservation={reservation}. Returned qid: {current_qid}')
744

745
        ## Update prow with new information
746
        prow['ALL_QIDS'] = np.append(prow['ALL_QIDS'],current_qid)
1✔
747
        prow['STATUS'] = 'SUBMITTED'
1✔
748
        prow['SUBMIT_DATE'] = int(time.time())
1✔
749
    else:
UNCOV
750
        log.info(f"Would have submitted: {batch_params}")
×
UNCOV
751
        prow['STATUS'] = 'UNSUBMITTED'
×
752

753
        ## Update the Slurm jobid cache of job states
UNCOV
754
        update_queue_state_cache(qid=prow['LATEST_QID'], state=prow['STATUS'])
×
755

756
    return prow
1✔
757

758

759
#############################################
760
##########   Row Manipulations   ############
761
#############################################
762
def define_and_assign_dependency(prow, calibjobs, use_tilenight=False,
1✔
763
                                 refnight=None):
764
    """
765
    Given input processing row and possible calibjobs, this defines the
766
    JOBDESC keyword and assigns the dependency appropriate for the job type of
767
    prow.
768

769
    Args:
770
        prow, Table.Row or dict. Must include keyword accessible definitions for
771
            'OBSTYPE'. A row must have column names for
772
            'JOBDESC', 'INT_DEP_IDS', and 'LATEST_DEP_ID'.
773
        calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
774
            and 'nightlyflat'. Each key corresponds to a Table.Row or
775
            None. The table.Row() values are for the corresponding
776
            calibration job. Each value that isn't None must contain
777
            'INTID', and 'LATEST_QID'. If None, it assumes the
778
            dependency doesn't exist and no dependency is assigned.
779
        use_tilenight, bool. Default is False. If True, use desi_proc_tilenight
780
            for prestdstar, stdstar,and poststdstar steps for
781
            science exposures.
782
        refnight, int. The reference night for linking jobs
783

784
    Returns:
785
        Table.Row or dict: The same prow type and keywords as input except
786
        with modified values updated values for 'JOBDESC', 'INT_DEP_IDS'. and 'LATEST_DEP_ID'.
787

788
    Note:
789
        This modifies the input. Though Table.Row objects are generally copied
790
        on modification, so the change to the input object in memory may or may
791
        not be changed. As of writing, a row from a table given to this function
792
        will not change during the execution of this function (but can be
793
        overwritten explicitly with the returned row if desired).
794
    """
795
    if prow['OBSTYPE'] in ['science', 'twiflat']:
1✔
796
        if calibjobs['nightlyflat'] is not None:
1✔
797
            dependency = calibjobs['nightlyflat']
1✔
798
        elif calibjobs['psfnight'] is not None:
1✔
799
            dependency = calibjobs['psfnight']
1✔
800
        elif calibjobs['ccdcalib'] is not None:
1✔
801
            dependency = calibjobs['ccdcalib']
1✔
802
        elif calibjobs['nightlybias'] is not None:
1✔
UNCOV
803
            dependency = calibjobs['nightlybias']
×
804
        elif calibjobs['badcol'] is not None:
1✔
UNCOV
805
            dependency = calibjobs['badcol']
×
806
        else:
807
            dependency = calibjobs['linkcal']
1✔
808
        if not use_tilenight:
1✔
809
            prow['JOBDESC'] = 'prestdstar'
1✔
810
    elif prow['OBSTYPE'] == 'flat':
1✔
811
        if calibjobs['psfnight'] is not None:
1✔
812
            dependency = calibjobs['psfnight']
1✔
813
        elif calibjobs['ccdcalib'] is not None:
1✔
814
            dependency = calibjobs['ccdcalib']
1✔
815
        elif calibjobs['nightlybias'] is not None:
1✔
UNCOV
816
            dependency = calibjobs['nightlybias']
×
817
        elif calibjobs['badcol'] is not None:
1✔
UNCOV
818
            dependency = calibjobs['badcol']
×
819
        else:
820
            dependency = calibjobs['linkcal']
1✔
821
    elif prow['OBSTYPE'] == 'arc':
1✔
822
        if calibjobs['ccdcalib'] is not None:
1✔
823
            dependency = calibjobs['ccdcalib']
1✔
824
        elif calibjobs['nightlybias'] is not None:
1✔
825
            dependency = calibjobs['nightlybias']
1✔
826
        elif calibjobs['badcol'] is not None:
1✔
UNCOV
827
            dependency = calibjobs['badcol']
×
828
        else:
829
            dependency = calibjobs['linkcal']
1✔
830
    elif prow['JOBDESC'] in ['badcol', 'nightlybias', 'ccdcalib']:
1✔
831
        dependency = calibjobs['linkcal']
1✔
832
    elif prow['OBSTYPE'] == 'dark':
1✔
UNCOV
833
        if calibjobs['ccdcalib'] is not None:
×
UNCOV
834
            dependency = calibjobs['ccdcalib']
×
UNCOV
835
        elif calibjobs['nightlybias'] is not None:
×
UNCOV
836
            dependency = calibjobs['nightlybias']
×
UNCOV
837
        elif calibjobs['badcol'] is not None:
×
UNCOV
838
            dependency = calibjobs['badcol']
×
839
        else:
UNCOV
840
            dependency = calibjobs['linkcal']
×
841
    elif prow['JOBDESC'] == 'linkcal' and refnight is not None:
1✔
842
        dependency = None
1✔
843
        ## For link cals only, enable cross-night dependencies if available
844
        refproctable = findfile('proctable', night=refnight)
1✔
845
        if os.path.exists(refproctable):
1✔
846
            ptab = load_table(tablename=refproctable, tabletype='proctable')
×
847
            ## This isn't perfect because we may depend on jobs that aren't
848
            ## actually being linked
849
            ## Also allows us to proceed even if jobs don't exist yet
850
            deps = []
×
UNCOV
851
            for job in ['nightlybias', 'ccdcalib', 'psfnight', 'nightlyflat']:
×
852
                if job in ptab['JOBDESC']:
×
853
                    ## add prow to dependencies
UNCOV
854
                    deps.append(ptab[ptab['JOBDESC']==job][0])
×
UNCOV
855
            if len(deps) > 0:
×
UNCOV
856
                dependency = deps
×
857
    else:
858
        dependency = None
×
859

860
    prow = assign_dependency(prow, dependency)
1✔
861

862
    return prow
1✔
863

864

865
def assign_dependency(prow, dependency):
1✔
866
    """
867
    Given input processing row and possible arcjob (processing row for psfnight) and flatjob (processing row for
868
    nightlyflat), this defines the JOBDESC keyword and assigns the dependency appropriate for the job type of prow.
869

870
    Args:
871
        prow, Table.Row or dict. Must include keyword accessible definitions for 'OBSTYPE'. A row must have column names for
872
            'JOBDESC', 'INT_DEP_IDS', and 'LATEST_DEP_ID'.
873
        dependency, NoneType or scalar/list/array of Table.Row, dict. Processing row corresponding to the required input
874
            for the job in prow. This must contain keyword
875
            accessible values for 'INTID', and 'LATEST_QID'.
876
            If None, it assumes the dependency doesn't exist
877
            and no dependency is assigned.
878

879
    Returns:
880
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
881
        'JOBDESC', 'INT_DEP_IDS'. and 'LATEST_DEP_ID'.
882

883
    Note:
884
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
885
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
886
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
887
    """
888
    prow['INT_DEP_IDS'] = np.ndarray(shape=0).astype(int)
1✔
889
    prow['LATEST_DEP_QID'] = np.ndarray(shape=0).astype(int)
1✔
890
    if dependency is not None:
1✔
891
        if type(dependency) in [list, np.array]:
1✔
892
            ids, qids = [], []
1✔
893
            for curdep in dependency:
1✔
894
                ids.append(curdep['INTID'])
1✔
895
                if still_a_dependency(curdep):
1✔
896
                    # ids.append(curdep['INTID'])
897
                    qids.append(curdep['LATEST_QID'])
1✔
898
            prow['INT_DEP_IDS'] = np.array(ids, dtype=int)
1✔
899
            prow['LATEST_DEP_QID'] = np.array(qids, dtype=int)
1✔
900
        elif type(dependency) in [dict, OrderedDict, Table.Row]:
1✔
901
            prow['INT_DEP_IDS'] = np.array([dependency['INTID']], dtype=int)
1✔
902
            if still_a_dependency(dependency):
1✔
903
                prow['LATEST_DEP_QID'] = np.array([dependency['LATEST_QID']], dtype=int)
1✔
904
    return prow
1✔
905

906
def still_a_dependency(dependency):
1✔
907
    """
908
    Defines the criteria for which a dependency is deemed complete (and therefore no longer a dependency).
909

910
     Args:
911
        dependency, Table.Row or dict. Processing row corresponding to the required input for the job in prow.
912
            This must contain keyword accessible values for 'STATUS', and 'LATEST_QID'.
913

914
    Returns:
915
        bool: False if the criteria indicate that the dependency is completed and no longer a blocking factor (ie no longer
916
        a genuine dependency). Returns True if the dependency is still a blocking factor such that the slurm
917
        scheduler needs to be aware of the pending job.
918

919
    """
920
    return dependency['LATEST_QID'] > 0 and dependency['STATUS'] != 'COMPLETED'
1✔
921

922
def get_type_and_tile(erow):
1✔
923
    """
924
    Trivial function to return the OBSTYPE and the TILEID from an exposure table row
925

926
    Args:
927
        erow, Table.Row or dict. Must contain 'OBSTYPE' and 'TILEID' as keywords.
928

929
    Returns:
930
        tuple (str, str), corresponding to the OBSTYPE and TILEID values of the input erow.
931
    """
UNCOV
932
    return str(erow['OBSTYPE']).lower(), erow['TILEID']
×
933

934

935
#############################################
936
#########   Table manipulators   ############
937
#############################################
938
def parse_previous_tables(etable, ptable, night):
1✔
939
    """
940
    This takes in the exposure and processing tables and regenerates all the working memory variables needed for the
941
    daily processing script.
942

943
    Used by the daily processing to define most of its state-ful variables into working memory.
944
    If the processing table is empty, these are simply declared and returned for use.
945
    If the code had previously run and exited (or crashed), however, this will all the code to
946
    re-establish itself by redefining these values.
947

948
    Args:
949
        etable, Table, Exposure table of all exposures that have been dealt with thus far.
950
        ptable, Table, Processing table of all exposures that have been processed.
951
        night, str or int, the night the data was taken.
952

953
    Returns:
954
        tuple: A tuple containing:
955

956
        * arcs, list of dicts, list of the individual arc jobs used for the psfnight (NOT all
957
          the arcs, if multiple sets existed)
958
        * flats, list of dicts, list of the individual flat jobs used for the nightlyflat (NOT
959
          all the flats, if multiple sets existed)
960
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
961
          (if currently processing that tile)
962
        * calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'badcol', 'psfnight'
963
          and 'nightlyflat'. Each key corresponds to a Table.Row or
964
          None. The table.Row() values are for the corresponding
965
          calibration job.
966
        * curtype, None, the obstype of the current job being run. Always None as first new job will define this.
967
        * lasttype, str or None, the obstype of the last individual exposure row to be processed.
968
        * curtile, None, the tileid of the current job (if science). Otherwise None. Always None as first
969
          new job will define this.
970
        * lasttile, str or None, the tileid of the last job (if science). Otherwise None.
971
        * internal_id, int, an internal identifier unique to each job. Increments with each new job. This
972
          is the latest unassigned value.
973
    """
UNCOV
974
    log = get_logger()
×
UNCOV
975
    arcs, flats, sciences = [], [], []
×
UNCOV
976
    calibjobs = {'nightlybias': None, 'ccdcalib': None, 'badcol': None,
×
977
                 'psfnight': None, 'nightlyflat': None, 'linkcal': None,
978
                 'accounted_for': dict()}
UNCOV
979
    curtype,lasttype = None,None
×
UNCOV
980
    curtile,lasttile = None,None
×
981

UNCOV
982
    if len(ptable) > 0:
×
UNCOV
983
        prow = ptable[-1]
×
UNCOV
984
        internal_id = int(prow['INTID'])+1
×
UNCOV
985
        lasttype,lasttile = get_type_and_tile(ptable[-1])
×
986
        jobtypes = ptable['JOBDESC']
×
987

988
        if 'nightlybias' in jobtypes:
×
UNCOV
989
            calibjobs['nightlybias'] = table_row_to_dict(ptable[jobtypes=='nightlybias'][0])
×
UNCOV
990
            log.info("Located nightlybias job in exposure table: {}".format(calibjobs['nightlybias']))
×
991

992
        if 'ccdcalib' in jobtypes:
×
UNCOV
993
            calibjobs['ccdcalib'] = table_row_to_dict(ptable[jobtypes=='ccdcalib'][0])
×
994
            log.info("Located ccdcalib job in exposure table: {}".format(calibjobs['ccdcalib']))
×
995

996
        if 'psfnight' in jobtypes:
×
997
            calibjobs['psfnight'] = table_row_to_dict(ptable[jobtypes=='psfnight'][0])
×
998
            log.info("Located joint fit psfnight job in exposure table: {}".format(calibjobs['psfnight']))
×
UNCOV
999
        elif lasttype == 'arc':
×
1000
            seqnum = 10
×
1001
            for row in ptable[::-1]:
×
1002
                erow = etable[etable['EXPID']==row['EXPID'][0]]
×
UNCOV
1003
                if row['OBSTYPE'].lower() == 'arc' and int(erow['SEQNUM'])<seqnum:
×
1004
                    arcs.append(table_row_to_dict(row))
×
1005
                    seqnum = int(erow['SEQNUM'])
×
1006
                else:
UNCOV
1007
                    break
×
1008
            ## Because we work backword to fill in, we need to reverse them to get chronological order back
1009
            arcs = arcs[::-1]
×
1010

1011
        if 'nightlyflat' in jobtypes:
×
1012
            calibjobs['nightlyflat'] = table_row_to_dict(ptable[jobtypes=='nightlyflat'][0])
×
1013
            log.info("Located joint fit nightlyflat job in exposure table: {}".format(calibjobs['nightlyflat']))
×
1014
        elif lasttype == 'flat':
×
1015
            for row in ptable[::-1]:
×
1016
                erow = etable[etable['EXPID']==row['EXPID'][0]]
×
1017
                if row['OBSTYPE'].lower() == 'flat' and int(erow['SEQTOT']) < 5:
×
UNCOV
1018
                    if float(erow['EXPTIME']) > 100.:
×
1019
                        flats.append(table_row_to_dict(row))
×
1020
                else:
1021
                    break
×
UNCOV
1022
            flats = flats[::-1]
×
1023

1024
        if lasttype.lower() == 'science':
×
1025
            for row in ptable[::-1]:
×
1026
                if row['OBSTYPE'].lower() == 'science' and row['TILEID'] == lasttile and \
×
1027
                   row['JOBDESC'] == 'prestdstar' and row['LASTSTEP'] != 'skysub':
1028
                    sciences.append(table_row_to_dict(row))
×
1029
                else:
1030
                    break
×
1031
            sciences = sciences[::-1]
×
1032
    else:
1033
        internal_id = night_to_starting_iid(night)
×
1034

UNCOV
1035
    return arcs,flats,sciences, \
×
1036
           calibjobs, \
1037
           curtype, lasttype, \
1038
           curtile, lasttile,\
1039
           internal_id
1040

1041
def generate_calibration_dict(ptable, files_to_link=None):
1✔
1042
    """
1043
    This takes in a processing table and regenerates the working memory calibration
1044
    dictionary for dependency tracking. Used by the daily processing to define 
1045
    most of its state-ful variables into working memory.
1046
    If the processing table is empty, these are simply declared and returned for use.
1047
    If the code had previously run and exited (or crashed), however, this will all the code to
1048
    re-establish itself by redefining these values.
1049

1050
    Args:
1051
        ptable, Table, Processing table of all exposures that have been processed.
1052
        files_to_link, set, Set of filenames that the linkcal job will link.
1053

1054
    Returns:
1055
        calibjobs, dict. Dictionary containing 'nightlybias', 'badcol', 'ccdcalib',
1056
            'psfnight', 'nightlyflat', 'linkcal', and 'completed'. Each key corresponds to a
1057
            Table.Row or None. The table.Row() values are for the corresponding
1058
            calibration job.
1059
    """
1060
    log = get_logger()
1✔
1061
    job_to_file_map = get_jobdesc_to_file_map()
1✔
1062
    accounted_for = {'biasnight': False, 'badcolumns': False,
1✔
1063
                     'ctecorrnight': False, 'psfnight': False,
1064
                     'fiberflatnight': False}
1065
    calibjobs = {'nightlybias': None, 'ccdcalib': None, 'badcol': None,
1✔
1066
                 'psfnight': None, 'nightlyflat': None, 'linkcal': None}
1067

1068
    ptable_jobtypes = ptable['JOBDESC']
1✔
1069

1070
    for jobtype in calibjobs.keys():
1✔
1071
        if jobtype in ptable_jobtypes:
1✔
UNCOV
1072
            calibjobs[jobtype] = table_row_to_dict(ptable[ptable_jobtypes==jobtype][0])
×
UNCOV
1073
            log.info(f"Located {jobtype} job in exposure table: {calibjobs[jobtype]}")
×
UNCOV
1074
            if jobtype == 'linkcal':
×
UNCOV
1075
                if files_to_link is not None and len(files_to_link) > 0:
×
UNCOV
1076
                    log.info(f"Assuming existing linkcal job processed "
×
1077
                             + f"{files_to_link} since given in override file.")
UNCOV
1078
                    accounted_for = update_accounted_for_with_linking(accounted_for,
×
1079
                                                                  files_to_link)
1080
                else:
UNCOV
1081
                    err = f"linkcal job exists but no files given: {files_to_link=}"
×
UNCOV
1082
                    log.error(err)
×
UNCOV
1083
                    raise ValueError(err)
×
1084
            elif jobtype == 'ccdcalib':
×
1085
                possible_ccd_files = set(['biasnight', 'badcolumns', 'ctecorrnight'])
×
1086
                if files_to_link is None:
×
1087
                    files_accounted_for = possible_ccd_files
×
1088
                else:
UNCOV
1089
                    files_accounted_for = possible_ccd_files.difference(files_to_link)
×
1090
                    ccd_files_linked = possible_ccd_files.intersection(files_to_link)
×
UNCOV
1091
                    log.info(f"Assuming existing ccdcalib job processed "
×
1092
                             + f"{files_accounted_for} since {ccd_files_linked} "
1093
                             + f"are linked.")
1094
                for fil in files_accounted_for:
×
1095
                    accounted_for[fil] = True
×
1096
            else:
1097
                accounted_for[job_to_file_map[jobtype]] = True
×
1098

1099
    calibjobs['accounted_for'] = accounted_for
1✔
1100
    return calibjobs
1✔
1101

1102
def update_accounted_for_with_linking(accounted_for, files_to_link):
1✔
1103
    """
1104
    This takes in a dictionary summarizing the calibration files accounted for
1105
     and updates it based on the files_to_link, which are assumed to have
1106
     already been linked such that those files already exist on disk and
1107
     don't need ot be generated.
1108

1109
    Parameters
1110
    ----------
1111
        accounted_for: dict
1112
            Dictionary containing 'biasnight', 'badcolumns', 'ctecorrnight',
1113
            'psfnight', and 'fiberflatnight'. Each value is True if file is
1114
            accounted for and False if it is not.
1115
        files_to_link: set
1116
            Set of filenames that the linkcal job will link.
1117

1118
    Returns
1119
    -------
1120
        accounted_for: dict
1121
            Dictionary containing 'biasnight', 'badcolumns', 'ctecorrnight',
1122
            'psfnight', and 'fiberflatnight'. Each value is True if file is
1123
            accounted for and False if it is not.
1124
    """
1125
    log = get_logger()
1✔
1126
    
1127
    for fil in files_to_link:
1✔
1128
        if fil in accounted_for:
1✔
1129
            accounted_for[fil] = True
1✔
1130
        else:
UNCOV
1131
            err = f"{fil} doesn't match an expected filetype: "
×
UNCOV
1132
            err += f"{accounted_for.keys()}"
×
UNCOV
1133
            log.error(err)
×
UNCOV
1134
            raise ValueError(err)
×
1135

1136
    return accounted_for
1✔
1137

1138
def all_calibs_submitted(accounted_for, do_cte_flats):
1✔
1139
    """
1140
    Function that returns the boolean logic to determine if the necessary
1141
    calibration jobs have been submitted for calibration.
1142

1143
    Args:
1144
        accounted_for, dict, Dictionary with keys corresponding to the calibration
1145
            filenames and values of True or False.
1146
        do_cte_flats, bool, whether ctecorrnight files are expected or not.
1147

1148
    Returns:
1149
        bool, True if all necessary calibrations have been submitted or handled, False otherwise.
1150
    """
1151
    test_dict = accounted_for.copy()
1✔
1152
    if not do_cte_flats:
1✔
1153
        test_dict.pop('ctecorrnight')
1✔
1154

1155
    return np.all(list(test_dict.values()))
1✔
1156

1157
def update_and_recursively_submit(proc_table, submits=0, resubmission_states=None,
1✔
1158
                                  no_resub_failed=False, ptab_name=None,
1159
                                  dry_run=0, reservation=None):
1160
    """
1161
    Given an processing table, this loops over job rows and resubmits failed jobs (as defined by resubmission_states).
1162
    Before submitting a job, it checks the dependencies for failures. If a dependency needs to be resubmitted, it recursively
1163
    follows dependencies until it finds the first job without a failed dependency and resubmits that. Then resubmits the
1164
    other jobs with the new Slurm jobID's for proper dependency coordination within Slurm.
1165

1166
    Args:
1167
        proc_table, Table, the processing table with a row per job.
1168
        submits, int, the number of submissions made to the queue. Used for saving files and in not overloading the scheduler.
1169
        resubmission_states, list or array of strings, each element should be a capitalized string corresponding to a
1170
            possible Slurm scheduler state, where you wish for jobs with that
1171
            outcome to be resubmitted
1172
        no_resub_failed: bool. Set to True if you do NOT want to resubmit
1173
            jobs with Slurm status 'FAILED' by default. Default is False.
1174
        ptab_name, str, the full pathname where the processing table should be saved.
1175
        dry_run, int, If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1176
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1177
            for testing as though scripts are being submitted. Default is 0 (false).
1178
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
1179

1180
    Returns:
1181
        tuple: A tuple containing:
1182

1183
        * proc_table: Table, a table with the same rows as the input except that Slurm and jobid relevant columns have
1184
          been updated for those jobs that needed to be resubmitted.
1185
        * submits: int, the number of submissions made to the queue. This is incremented from the input submits, so it is
1186
          the number of submissions made from this function call plus the input submits value.
1187

1188
    Note:
1189
        This modifies the inputs of both proc_table and submits and returns them.
1190
    """
1191
    log = get_logger()
1✔
1192
    if resubmission_states is None:
1✔
1193
        resubmission_states = get_resubmission_states(no_resub_failed=no_resub_failed)
1✔
1194

1195
    log.info(f"Resubmitting jobs with current states in the following: {resubmission_states}")
1✔
1196
    proc_table = update_from_queue(proc_table, dry_run=dry_run)
1✔
1197
    log.info("Updated processing table queue information:")
1✔
1198
    cols = ['INTID', 'INT_DEP_IDS', 'EXPID', 'TILEID',
1✔
1199
            'OBSTYPE', 'JOBDESC', 'LATEST_QID', 'STATUS']
1200
    log.info(np.array(cols))
1✔
1201
    for row in proc_table:
1✔
1202
        log.info(np.array(row[cols]))
1✔
1203
    log.info("\n")
1✔
1204
    id_to_row_map = {row['INTID']: rown for rown, row in enumerate(proc_table)}
1✔
1205
    for rown in range(len(proc_table)):
1✔
1206
        if proc_table['STATUS'][rown] in resubmission_states:
1✔
1207
            proc_table, submits = recursive_submit_failed(rown, proc_table, submits,
1✔
1208
                                                          id_to_row_map, ptab_name,
1209
                                                          resubmission_states,
1210
                                                          reservation, dry_run)
1211
    proc_table = update_from_queue(proc_table, dry_run=dry_run)
1✔
1212
    return proc_table, submits
1✔
1213

1214
def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name=None,
1✔
1215
                            resubmission_states=None, reservation=None, dry_run=0):
1216
    """
1217
    Given a row of a processing table and the full processing table, this resubmits the given job.
1218
    Before submitting a job, it checks the dependencies for failures in the processing table. If a dependency needs to
1219
    be resubmitted, it recursively follows dependencies until it finds the first job without a failed dependency and
1220
    resubmits that. Then resubmits the other jobs with the new Slurm jobID's for proper dependency coordination within Slurm.
1221

1222
    Args:
1223
        rown, Table.Row, the row of the processing table that you want to resubmit.
1224
        proc_table, Table, the processing table with a row per job.
1225
        submits, int, the number of submissions made to the queue. Used for saving files and in not overloading the scheduler.
1226
        id_to_row_map, dict, lookup dictionary where the keys are internal ids (INTID's) and the values are the row position
1227
            in the processing table.
1228
        ptab_name, str, the full pathname where the processing table should be saved.
1229
        resubmission_states, list or array of strings, each element should be a capitalized string corresponding to a
1230
            possible Slurm scheduler state, where you wish for jobs with that
1231
            outcome to be resubmitted
1232
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
1233
        dry_run, int, If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1234
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1235
            for testing as though scripts are being submitted. Default is 0 (false).
1236

1237
    Returns:
1238
        tuple: A tuple containing:
1239

1240
        * proc_table: Table, a table with the same rows as the input except that Slurm and jobid relevant columns have
1241
          been updated for those jobs that needed to be resubmitted.
1242
        * submits: int, the number of submissions made to the queue. This is incremented from the input submits, so it is
1243
          the number of submissions made from this function call plus the input submits value.
1244

1245
    Note:
1246
        This modifies the inputs of both proc_table and submits and returns them.
1247
    """
1248
    log = get_logger()
1✔
1249
    row = proc_table[rown]
1✔
1250
    log.info(f"Identified row {row['INTID']} as needing resubmission.")
1✔
1251
    log.info(f"{row['INTID']}: Expid(s): {row['EXPID']}  Job: {row['JOBDESC']}")
1✔
1252
    if resubmission_states is None:
1✔
UNCOV
1253
        resubmission_states = get_resubmission_states()
×
1254
    ideps = proc_table['INT_DEP_IDS'][rown]
1✔
1255
    if ideps is None or len(ideps)==0:
1✔
UNCOV
1256
        proc_table['LATEST_DEP_QID'][rown] = np.ndarray(shape=0).astype(int)
×
1257
    else:
1258
        all_valid_states = list(resubmission_states.copy())
1✔
1259
        good_states = ['RUNNING','PENDING','SUBMITTED','COMPLETED']
1✔
1260
        all_valid_states.extend(good_states)
1✔
1261
        othernight_idep_qid_lookup = {}
1✔
1262
        for idep in np.sort(np.atleast_1d(ideps)):
1✔
1263
            if idep not in id_to_row_map:
1✔
1264
                if idep // 1000 != row['INTID'] // 1000:
1✔
1265
                    log.info(f"Internal ID: {idep} not in id_to_row_map. "
1✔
1266
                             + "This is expected since it's from another day. ")
1267
                    reference_night = 20000000 + (idep // 1000)
1✔
1268
                    reftab = read_minimal_full_proctab_cols(nights=[reference_night])
1✔
1269
                    if reftab is None:
1✔
UNCOV
1270
                        msg = f"The dependency is from night={reference_night}" \
×
1271
                              + f" but read_minimal_full_proctab_cols couldn't" \
1272
                              + f" locate that processing table, this is a " \
1273
                              +  f"fatal error."
UNCOV
1274
                        log.critical(msg)
×
UNCOV
1275
                        raise ValueError(msg)
×
1276
                    reftab = update_from_queue(reftab, dry_run=dry_run)
1✔
1277
                    entry = reftab[reftab['INTID'] == idep][0]
1✔
1278
                    if entry['STATUS'] not in good_states:
1✔
1279
                        msg = f"Internal ID: {idep} not in id_to_row_map. " \
1✔
1280
                              + f"Since the dependency is from night={reference_night} " \
1281
                              + f"and that job isn't in a good state this is an " \
1282
                              + f"error we can't overcome."
1283
                        log.error(msg)
1✔
1284
                        proc_table['STATUS'][rown] = "DEP_NOT_SUBD"
1✔
1285
                        return proc_table, submits
1✔
1286
                    else:
1287
                        ## otherwise all is good, just update the cache to use this
1288
                        ## in the next stage
1289
                        othernight_idep_qid_lookup[idep] = entry['LATEST_QID']
1✔
1290
                        update_full_ptab_cache(reftab)
1✔
1291
                else:
UNCOV
1292
                    msg = f"Internal ID: {idep} not in id_to_row_map. " \
×
1293
                         + f"Since the dependency is from the same night" \
1294
                         + f" and we can't find it, this is a fatal error."
UNCOV
1295
                    log.critical(msg)
×
UNCOV
1296
                    raise ValueError(msg)
×
1297
            elif proc_table['STATUS'][id_to_row_map[idep]] not in all_valid_states:
1✔
1298
                log.error(f"Proc INTID: {proc_table['INTID'][rown]} depended on" +
×
1299
                            f" INTID {proc_table['INTID'][id_to_row_map[idep]]}" +
1300
                            f" but that exposure has state" +
1301
                            f" {proc_table['STATUS'][id_to_row_map[idep]]} that" +
1302
                            f" isn't in the list of resubmission states." +
1303
                            f" Exiting this job's resubmission attempt.")
UNCOV
1304
                proc_table['STATUS'][rown] = "DEP_NOT_SUBD"
×
UNCOV
1305
                return proc_table, submits
×
1306
        qdeps = []
1✔
1307
        for idep in np.sort(np.atleast_1d(ideps)):
1✔
1308
            if idep in id_to_row_map:
1✔
1309
                if proc_table['STATUS'][id_to_row_map[idep]] in resubmission_states:
1✔
UNCOV
1310
                    proc_table, submits = recursive_submit_failed(id_to_row_map[idep],
×
1311
                                                                  proc_table, submits,
1312
                                                                  id_to_row_map,
1313
                                                                  reservation=reservation,
1314
                                                                  dry_run=dry_run)
1315
                ## Now that we've resubmitted the dependency if necessary,
1316
                ## add the most recent QID to the list
1317
                qdeps.append(proc_table['LATEST_QID'][id_to_row_map[idep]])
1✔
1318
            else:
1319
                ## Since we verified above that the cross night QID is still
1320
                ## either pending or successful, add that to the list of QID's
1321
                qdeps.append(othernight_idep_qid_lookup[idep])
1✔
1322

1323
        qdeps = np.atleast_1d(qdeps)
1✔
1324
        if len(qdeps) > 0:
1✔
1325
            proc_table['LATEST_DEP_QID'][rown] = qdeps
1✔
1326
        else:
1327
            log.error(f"number of qdeps should be 1 or more: Rown {rown}, ideps {ideps}")
×
1328

1329
    proc_table[rown] = submit_batch_script(proc_table[rown], reservation=reservation,
1✔
1330
                                           strictly_successful=True, dry_run=dry_run)
1331
    submits += 1
1✔
1332

1333
    if not dry_run:
1✔
UNCOV
1334
        if ptab_name is None:
×
UNCOV
1335
            write_table(proc_table, tabletype='processing', overwrite=True)
×
1336
        else:
UNCOV
1337
            write_table(proc_table, tablename=ptab_name, overwrite=True)
×
UNCOV
1338
        sleep_and_report(0.1 + 0.1*(submits % 10 == 0),
×
1339
                         message_suffix=f"after submitting job to queue and writing proctable")
1340
    return proc_table, submits
1✔
1341

1342

1343
#########################################
1344
########     Joint fit     ##############
1345
#########################################
1346
def joint_fit(ptable, prows, internal_id, queue, reservation, descriptor, z_submit_types=None,
1✔
1347
              dry_run=0, strictly_successful=False, check_for_outputs=True, resubmit_partial_complete=True,
1348
              system_name=None):
1349
    """
1350
    DEPRECATED
1351
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1352
    joint fitting job given by descriptor. If the joint fitting job is standard star fitting, the post standard star fits
1353
    for all the individual exposures also created and submitted. The returned ptable has all of these rows added to the
1354
    table given as input.
1355

1356
    Args:
1357
        ptable (Table): The processing table where each row is a processed job.
1358
        prows (list or array of dict): The rows corresponding to the individual exposure jobs that are
1359
            inputs to the joint fit.
1360
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1361
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1362
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1363
        descriptor (str): Description of the joint fitting job. Can either be 'science' or 'stdstarfit', 'arc' or 'psfnight',
1364
            or 'flat' or 'nightlyflat'.
1365
        z_submit_types (list of str, optional): The "group" types of redshifts that should be submitted with each
1366
            exposure. If not specified or None, then no redshifts are submitted.
1367
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1368
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1369
            for testing as though scripts are being submitted. Default is 0 (false).
1370
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1371
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1372
            than failing completely from failed calibrations. Default is False.
1373
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1374
            data products for the script being submitted. If all files exist and this is True,
1375
            then the script will not be submitted. If some files exist and this is True, only the
1376
            subset of the cameras without the final data products will be generated and submitted.
1377
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1378
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1379
            remaining cameras not found to exist.
1380
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1381

1382
    Returns:
1383
        tuple: A tuple containing:
1384

1385
        * ptable, Table. The same processing table as input except with added rows for the joint fit job and, in the case
1386
          of a stdstarfit, the poststdstar science exposure jobs.
1387
        * joint_prow, dict. Row of a processing table corresponding to the joint fit job.
1388
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1389
    """
UNCOV
1390
    log = get_logger()
×
UNCOV
1391
    if len(prows) < 1:
×
UNCOV
1392
        return ptable, None, internal_id
×
1393

UNCOV
1394
    if descriptor is None:
×
UNCOV
1395
        return ptable, None
×
UNCOV
1396
    elif descriptor == 'arc':
×
UNCOV
1397
        descriptor = 'psfnight'
×
UNCOV
1398
    elif descriptor == 'flat':
×
UNCOV
1399
        descriptor = 'nightlyflat'
×
UNCOV
1400
    elif descriptor == 'science':
×
UNCOV
1401
        if z_submit_types is None or len(z_submit_types) == 0:
×
UNCOV
1402
            descriptor = 'stdstarfit'
×
1403

UNCOV
1404
    if descriptor not in ['psfnight', 'nightlyflat', 'science','stdstarfit']:
×
UNCOV
1405
        return ptable, None, internal_id
×
1406

UNCOV
1407
    log.info(" ")
×
UNCOV
1408
    log.info(f"Joint fit criteria found. Running {descriptor}.\n")
×
1409

UNCOV
1410
    if descriptor == 'science':
×
UNCOV
1411
        joint_prow, internal_id = make_joint_prow(prows, descriptor='stdstarfit', internal_id=internal_id)
×
1412
    else:
UNCOV
1413
        joint_prow, internal_id = make_joint_prow(prows, descriptor=descriptor, internal_id=internal_id)
×
UNCOV
1414
    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1415
                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1416
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1417
    ptable.add_row(joint_prow)
×
1418

1419
    if descriptor in ['science','stdstarfit']:
×
UNCOV
1420
        if descriptor == 'science':
×
1421
            zprows = []
×
1422
        log.info(" ")
×
1423
        log.info(f"Submitting individual science exposures now that joint fitting of standard stars is submitted.\n")
×
1424
        for row in prows:
×
1425
            if row['LASTSTEP'] == 'stdstarfit':
×
1426
                continue
×
1427
            row['JOBDESC'] = 'poststdstar'
×
1428

1429
            # poststdstar job can't process cameras not included in its stdstar joint fit
UNCOV
1430
            stdcamword = joint_prow['PROCCAMWORD']
×
1431
            thiscamword = row['PROCCAMWORD']
×
1432
            proccamword = camword_intersection([stdcamword, thiscamword])
×
UNCOV
1433
            if proccamword != thiscamword:
×
1434
                dropcams = difference_camwords(thiscamword, proccamword)
×
1435
                assert dropcams != ''  #- i.e. if they differ, we should be dropping something
×
UNCOV
1436
                log.warning(f"Dropping exp {row['EXPID']} poststdstar cameras {dropcams} since they weren't included in stdstar fit {stdcamword}")
×
1437
                row['PROCCAMWORD'] = proccamword
×
1438

UNCOV
1439
            row['INTID'] = internal_id
×
1440
            internal_id += 1
×
1441
            row['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
×
UNCOV
1442
            row = assign_dependency(row, joint_prow)
×
UNCOV
1443
            row = create_and_submit(row, queue=queue, reservation=reservation, dry_run=dry_run,
×
1444
                                    strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1445
                                    resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1446
            ptable.add_row(row)
×
1447
            if descriptor == 'science' and row['LASTSTEP'] == 'all':
×
1448
                zprows.append(row)
×
1449

1450
    ## Now run redshifts
1451
    if descriptor == 'science' and len(zprows) > 0 and z_submit_types is not None:
×
1452
        prow_selection = (  (ptable['OBSTYPE'] == 'science')
×
1453
                          & (ptable['LASTSTEP'] == 'all')
1454
                          & (ptable['JOBDESC'] == 'poststdstar')
1455
                          & (ptable['TILEID'] == int(zprows[0]['TILEID'])) )
UNCOV
1456
        nightly_zprows = []
×
1457
        if np.sum(prow_selection) == len(zprows):
×
1458
            nightly_zprows = zprows.copy()
×
1459
        else:
1460
            for prow in ptable[prow_selection]:
×
1461
                nightly_zprows.append(table_row_to_dict(prow))
×
1462

1463
        for zsubtype in z_submit_types:
×
1464
            if zsubtype == 'perexp':
×
UNCOV
1465
                for zprow in zprows:
×
1466
                    log.info(" ")
×
1467
                    log.info(f"Submitting redshift fit of type {zsubtype} for TILEID {zprow['TILEID']} and EXPID {zprow['EXPID']}.\n")
×
1468
                    joint_prow, internal_id = make_joint_prow([zprow], descriptor=zsubtype, internal_id=internal_id)
×
1469
                    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1470
                                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1471
                                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
UNCOV
1472
                    ptable.add_row(joint_prow)
×
1473
            else:
1474
                log.info(" ")
×
1475
                log.info(f"Submitting joint redshift fits of type {zsubtype} for TILEID {nightly_zprows[0]['TILEID']}.")
×
UNCOV
1476
                expids = [prow['EXPID'][0] for prow in nightly_zprows]
×
UNCOV
1477
                log.info(f"Expids: {expids}.\n")
×
1478
                joint_prow, internal_id = make_joint_prow(nightly_zprows, descriptor=zsubtype, internal_id=internal_id)
×
1479
                joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1480
                                               strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1481
                                               resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
UNCOV
1482
                ptable.add_row(joint_prow)
×
1483

1484
    if descriptor in ['psfnight', 'nightlyflat']:
×
1485
        log.info(f"Setting the calibration exposures as calibrators in the processing table.\n")
×
UNCOV
1486
        ptable = set_calibrator_flag(prows, ptable)
×
1487

1488
    return ptable, joint_prow, internal_id
×
1489

1490
def joint_cal_fit(descriptor, ptable, prows, internal_id, queue, reservation,
1✔
1491
                  dry_run=0, strictly_successful=False, check_for_outputs=True,
1492
                  resubmit_partial_complete=True, system_name=None):
1493
    """
1494
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1495
    joint fitting job given by descriptor. If the joint fitting job is standard star fitting, the post standard star fits
1496
    for all the individual exposures also created and submitted. The returned ptable has all of these rows added to the
1497
    table given as input.
1498

1499
    Args:
1500
        descriptor (str): Description of the joint fitting job. Can either be 'science' or 'stdstarfit', 'arc' or 'psfnight',
1501
            or 'flat' or 'nightlyflat'.
1502
        prows (list or array of dict): The rows corresponding to the individual exposure jobs that are
1503
            inputs to the joint fit.
1504
        ptable (Table): The processing table where each row is a processed job.
1505
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1506
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1507
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1508
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1509
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1510
            for testing as though scripts are being submitted. Default is 0 (false).
1511
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1512
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1513
            than failing completely from failed calibrations. Default is False.
1514
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1515
            data products for the script being submitted. If all files exist and this is True,
1516
            then the script will not be submitted. If some files exist and this is True, only the
1517
            subset of the cameras without the final data products will be generated and submitted.
1518
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1519
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1520
            remaining cameras not found to exist.
1521
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1522

1523
    Returns:
1524
        tuple: A tuple containing:
1525

1526
        * ptable, Table. The same processing table as input except with added rows for the joint fit job and, in the case
1527
          of a stdstarfit, the poststdstar science exposure jobs.
1528
        * joint_prow, dict. Row of a processing table corresponding to the joint fit job.
1529
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1530
    """
UNCOV
1531
    log = get_logger()
×
UNCOV
1532
    if len(prows) < 1:
×
UNCOV
1533
        return ptable, None, internal_id
×
1534

UNCOV
1535
    if descriptor is None:
×
UNCOV
1536
        return ptable, None
×
UNCOV
1537
    elif descriptor == 'arc':
×
UNCOV
1538
        descriptor = 'psfnight'
×
UNCOV
1539
    elif descriptor == 'flat':
×
UNCOV
1540
        descriptor = 'nightlyflat'
×
1541

UNCOV
1542
    if descriptor not in ['psfnight', 'nightlyflat']:
×
UNCOV
1543
        return ptable, None, internal_id
×
1544

UNCOV
1545
    log.info(" ")
×
UNCOV
1546
    log.info(f"Joint fit criteria found. Running {descriptor}.\n")
×
1547

UNCOV
1548
    joint_prow, internal_id = make_joint_prow(prows, descriptor=descriptor, internal_id=internal_id)
×
UNCOV
1549
    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1550
                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1551
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
UNCOV
1552
    ptable.add_row(joint_prow)
×
1553

UNCOV
1554
    if descriptor in ['psfnight', 'nightlyflat']:
×
UNCOV
1555
        log.info(f"Setting the calibration exposures as calibrators in the processing table.\n")
×
UNCOV
1556
        ptable = set_calibrator_flag(prows, ptable)
×
1557

UNCOV
1558
    return ptable, joint_prow, internal_id
×
1559

1560

1561
#########################################
1562
########     Redshifts     ##############
1563
#########################################
1564
def submit_redshifts(ptable, prows, tnight, internal_id, queue, reservation,
1✔
1565
              dry_run=0, strictly_successful=False,
1566
              check_for_outputs=True, resubmit_partial_complete=True,
1567
              z_submit_types=None, system_name=None):
1568
    """
1569
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1570
    tilenight job given by descriptor. The returned ptable has all of these rows added to the
1571
    table given as input.
1572

1573
    Args:
1574
        ptable (Table): The processing table where each row is a processed job.
1575
        prows (list or array of dict): Unsubmitted prestdstar jobs that are first steps in tilenight.
1576
        tnight (Table.Row): The processing table row of the tilenight job on which the redshifts depend.
1577
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1578
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1579
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1580
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1581
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1582
            for testing as though scripts are being submitted. Default is 0 (false).
1583
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1584
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1585
            than failing completely from failed calibrations. Default is False.
1586
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1587
            data products for the script being submitted. If all files exist and this is True,
1588
            then the script will not be submitted. If some files exist and this is True, only the
1589
            subset of the cameras without the final data products will be generated and submitted.
1590
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1591
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1592
            remaining cameras not found to exist.
1593
        z_submit_types (list of str): The "group" types of redshifts that should be submitted with each
1594
            exposure. If not specified or None, then no redshifts are submitted.
1595
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1596

1597
    Returns:
1598
        tuple: A tuple containing:
1599

1600
        * ptable, Table. The same processing table as input except with added rows for the joint fit job.
1601
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1602
    """
1603
    log = get_logger()
1✔
1604
    if len(prows) < 1 or z_submit_types == None:
1✔
1605
        return ptable, internal_id
1✔
1606

1607
    log.info(" ")
1✔
1608
    log.info(f"Running redshifts.\n")
1✔
1609

1610
    ## Now run redshifts
1611
    zprows = []
1✔
1612
    for row in prows:
1✔
1613
        if row['LASTSTEP'] == 'all':
1✔
1614
            zprows.append(row)
1✔
1615

1616
    if len(zprows) > 0:
1✔
1617
        for zsubtype in z_submit_types:
1✔
1618
            log.info(" ")
1✔
1619
            log.info(f"Submitting joint redshift fits of type {zsubtype} for TILEID {zprows[0]['TILEID']}.")
1✔
1620
            if zsubtype == 'perexp':
1✔
UNCOV
1621
                for zprow in zprows:
×
UNCOV
1622
                    log.info(f"EXPID: {zprow['EXPID']}.\n")
×
UNCOV
1623
                    redshift_prow = make_redshift_prow([zprow], tnight, descriptor=zsubtype, internal_id=internal_id)
×
UNCOV
1624
                    internal_id += 1
×
UNCOV
1625
                    redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1626
                                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1627
                                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
UNCOV
1628
                    ptable.add_row(redshift_prow)
×
1629
            elif zsubtype == 'cumulative':
1✔
1630
                tileids = np.unique([prow['TILEID'] for prow in zprows])
1✔
1631
                if len(tileids) > 1:
1✔
UNCOV
1632
                    msg = f"Error, more than one tileid provided for cumulative redshift job: {tileids}"
×
UNCOV
1633
                    log.critical(msg)
×
UNCOV
1634
                    raise ValueError(msg)
×
1635
                nights = np.unique([prow['NIGHT'] for prow in zprows])
1✔
1636
                if len(nights) > 1:
1✔
UNCOV
1637
                    msg = f"Error, more than one night provided for cumulative redshift job: {nights}"
×
UNCOV
1638
                    log.critical(msg)
×
UNCOV
1639
                    raise ValueError(msg)
×
1640
                tileid, night = tileids[0], nights[0]
1✔
1641
                ## For cumulative redshifts, get any existing processing rows for tile
1642
                matched_prows = read_minimal_tilenight_proctab_cols(tileids=tileids)
1✔
1643
                ## Identify the processing rows that should be assigned as dependecies
1644
                ## tnight should be first such that the new job inherits the other metadata from it
1645
                tnights = [tnight]
1✔
1646
                if matched_prows is not None:
1✔
1647
                    matched_prows = matched_prows[matched_prows['NIGHT'] <= night]
1✔
1648
                    for prow in matched_prows:
1✔
1649
                        if prow['INTID'] != tnight['INTID']:
1✔
1650
                            tnights.append(prow)
1✔
1651
                log.info(f"Internal Processing IDs: {[prow['INTID'] for prow in tnights]}.\n")
1✔
1652
                ## Identify all exposures that should go into the fit
1653
                expids = [prow['EXPID'][0] for prow in zprows]
1✔
1654
                ## note we can actually get the full list of exposures, but for now
1655
                ## we'll stay consistent with old processing where we only list exposures
1656
                ## from the current night
1657
                ## For cumulative redshifts, get valid expids from exptables
1658
                #matched_erows = read_minimal_science_exptab_cols(tileids=tileids)
1659
                #matched_erows = matched_erows[matched_erows['NIGHT']<=night]
1660
                #expids = list(set([prow['EXPID'][0] for prow in zprows])+set(matched_erows['EXPID']))
1661
                log.info(f"Expids: {expids}.\n")
1✔
1662
                redshift_prow, internal_id = make_joint_prow(tnights, descriptor=zsubtype, internal_id=internal_id)
1✔
1663
                redshift_prow['EXPID'] = expids
1✔
1664
                redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
1✔
1665
                                               strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1666
                                               resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1667
                ptable.add_row(redshift_prow)
1✔
1668
            else: # pernight
1669
                expids = [prow['EXPID'][0] for prow in zprows]
×
UNCOV
1670
                log.info(f"Expids: {expids}.\n")
×
UNCOV
1671
                redshift_prow = make_redshift_prow(zprows, tnight, descriptor=zsubtype, internal_id=internal_id)
×
1672
                internal_id += 1
×
1673
                redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1674
                                               strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1675
                                               resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
UNCOV
1676
                ptable.add_row(redshift_prow)
×
1677

1678
    return ptable, internal_id
1✔
1679

1680
#########################################
1681
########     Tilenight     ##############
1682
#########################################
1683
def submit_tilenight(ptable, prows, calibjobs, internal_id, queue, reservation,
1✔
1684
              dry_run=0, strictly_successful=False, resubmit_partial_complete=True,
1685
              system_name=None, use_specter=False, extra_job_args=None):
1686
    """
1687
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1688
    tilenight job given by descriptor. The returned ptable has all of these rows added to the
1689
    table given as input.
1690

1691
    Args:
1692
        ptable (Table): The processing table where each row is a processed job.
1693
        prows (list or array of dict): Unsubmitted prestdstar jobs that are first steps in tilenight.
1694
        calibjobs (dict): Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
1695
            and 'nightlyflat'. Each key corresponds to a Table.Row or
1696
            None. The table.Row() values are for the corresponding
1697
            calibration job.
1698
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1699
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1700
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1701
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1702
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1703
            for testing as though scripts are being submitted. Default is 0 (false).
1704
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1705
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1706
            than failing completely from failed calibrations. Default is False.
1707
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1708
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1709
            remaining cameras not found to exist.
1710
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1711
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
1712
        extra_job_args (dict): Dictionary with key-value pairs that specify additional
1713
            information used for a specific type of job. Examples include
1714
            laststeps for for tilenight, etc.
1715

1716
    Returns:
1717
        tuple: A tuple containing:
1718

1719
        * ptable, Table. The same processing table as input except with added rows for the joint fit job.
1720
        * tnight_prow, dict. Row of a processing table corresponding to the tilenight job.
1721
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1722
    """
1723
    log = get_logger()
1✔
1724
    if len(prows) < 1:
1✔
UNCOV
1725
        return ptable, None, internal_id
×
1726

1727
    log.info(" ")
1✔
1728
    log.info(f"Running tilenight.\n")
1✔
1729

1730
    tnight_prow = make_tnight_prow(prows, calibjobs, internal_id=internal_id)
1✔
1731
    internal_id += 1
1✔
1732
    tnight_prow = create_and_submit(tnight_prow, queue=queue, reservation=reservation, dry_run=dry_run,
1✔
1733
                                   strictly_successful=strictly_successful, check_for_outputs=False,
1734
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name,
1735
                                   use_specter=use_specter, extra_job_args=extra_job_args)
1736
    ptable.add_row(tnight_prow)
1✔
1737

1738
    return ptable, tnight_prow, internal_id
1✔
1739

1740
## wrapper functions for joint fitting
1741
def science_joint_fit(ptable, sciences, internal_id, queue='realtime', reservation=None,
1✔
1742
                      z_submit_types=None, dry_run=0, strictly_successful=False,
1743
                      check_for_outputs=True, resubmit_partial_complete=True,
1744
                      system_name=None):
1745
    """
1746
    Wrapper function for desiproc.workflow.processing.joint_fit specific to the stdstarfit joint fit and redshift fitting.
1747

1748
    All variables are the same except::
1749

1750
        Arg 'sciences' is mapped to the prows argument of joint_fit.
1751
        The joint_fit argument descriptor is pre-defined as 'science'.
1752
    """
UNCOV
1753
    return joint_fit(ptable=ptable, prows=sciences, internal_id=internal_id, queue=queue, reservation=reservation,
×
1754
                     descriptor='science', z_submit_types=z_submit_types, dry_run=dry_run,
1755
                     strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1756
                     resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1757

1758

1759
def flat_joint_fit(ptable, flats, internal_id, queue='realtime',
1✔
1760
                   reservation=None, dry_run=0, strictly_successful=False,
1761
                   check_for_outputs=True, resubmit_partial_complete=True,
1762
                   system_name=None):
1763
    """
1764
    Wrapper function for desiproc.workflow.processing.joint_fit specific to the nightlyflat joint fit.
1765

1766
    All variables are the same except::
1767

1768
        Arg 'flats' is mapped to the prows argument of joint_fit.
1769
        The joint_fit argument descriptor is pre-defined as 'nightlyflat'.
1770
    """
UNCOV
1771
    return joint_fit(ptable=ptable, prows=flats, internal_id=internal_id, queue=queue, reservation=reservation,
×
1772
                     descriptor='nightlyflat', dry_run=dry_run, strictly_successful=strictly_successful,
1773
                     check_for_outputs=check_for_outputs, resubmit_partial_complete=resubmit_partial_complete,
1774
                     system_name=system_name)
1775

1776

1777
def arc_joint_fit(ptable, arcs, internal_id, queue='realtime',
1✔
1778
                  reservation=None, dry_run=0, strictly_successful=False,
1779
                  check_for_outputs=True, resubmit_partial_complete=True,
1780
                  system_name=None):
1781
    """
1782
    Wrapper function for desiproc.workflow.processing.joint_fit specific to the psfnight joint fit.
1783

1784
    All variables are the same except::
1785

1786
        Arg 'arcs' is mapped to the prows argument of joint_fit.
1787
        The joint_fit argument descriptor is pre-defined as 'psfnight'.
1788
    """
UNCOV
1789
    return joint_fit(ptable=ptable, prows=arcs, internal_id=internal_id, queue=queue, reservation=reservation,
×
1790
                     descriptor='psfnight', dry_run=dry_run, strictly_successful=strictly_successful,
1791
                     check_for_outputs=check_for_outputs, resubmit_partial_complete=resubmit_partial_complete,
1792
                     system_name=system_name)
1793

1794
def make_joint_prow(prows, descriptor, internal_id):
1✔
1795
    """
1796
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
1797
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
1798
    input prows).
1799

1800
    Args:
1801
        prows, list or array of dicts. The rows corresponding to the individual exposure jobs that are
1802
            inputs to the joint fit.
1803
        descriptor, str. Description of the joint fitting job. Can either be 'stdstarfit', 'psfnight', or 'nightlyflat'.
1804
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1805

1806
    Returns:
1807
        dict: Row of a processing table corresponding to the joint fit job.
1808
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1809
    """
1810
    log = get_logger()
1✔
1811
    first_row = table_row_to_dict(prows[0])
1✔
1812
    joint_prow = first_row.copy()
1✔
1813

1814
    joint_prow['INTID'] = internal_id
1✔
1815
    internal_id += 1
1✔
1816
    joint_prow['JOBDESC'] = descriptor
1✔
1817
    joint_prow['LATEST_QID'] = -99
1✔
1818
    joint_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
1✔
1819
    joint_prow['SUBMIT_DATE'] = -99
1✔
1820
    joint_prow['STATUS'] = 'UNSUBMITTED'
1✔
1821
    joint_prow['SCRIPTNAME'] = ''
1✔
1822
    joint_prow['EXPID'] = np.unique(np.concatenate([currow['EXPID'] for currow in prows])).astype(int)
1✔
1823

1824
    ## Assign the PROCCAMWORD based on the descriptor and the input exposures
1825
    ## UPDATE 2024-04-24: badamps are now included in arc/flat joint fits,
1826
    ## so grab all PROCCAMWORDs instead of filtering out BADAMP cameras
1827
    ## For flats we want any camera that exists in all 12 exposures
1828
    ## For arcs we want any camera that exists in at least 3 exposures
1829
    pcamwords = [prow['PROCCAMWORD'] for prow in prows]
1✔
1830
    if descriptor in 'stdstarfit':
1✔
UNCOV
1831
        joint_prow['PROCCAMWORD'] = camword_union(pcamwords,
×
1832
                                                  full_spectros_only=True)
1833
    elif descriptor in ['pernight', 'cumulative']:
1✔
1834
        joint_prow['PROCCAMWORD'] = camword_union(pcamwords,
1✔
1835
                                                  full_spectros_only=False)
1836
    elif descriptor == 'nightlyflat':
1✔
1837
        joint_prow['PROCCAMWORD'] = camword_intersection(pcamwords,
1✔
1838
                                                         full_spectros_only=False)
1839
    elif descriptor == 'psfnight':
1✔
1840
        ## Count number of exposures each camera is present for
1841
        camcheck = {}
1✔
1842
        for camword in pcamwords:
1✔
1843
            for cam in decode_camword(camword):
1✔
1844
                if cam in camcheck:
1✔
1845
                    camcheck[cam] += 1
1✔
1846
                else:
1847
                    camcheck[cam] = 1
1✔
1848
        ## if exists in 3 or more exposures, then include it
1849
        goodcams = []
1✔
1850
        for cam,camcount in camcheck.items():
1✔
1851
            if camcount >= 3:
1✔
1852
                goodcams.append(cam)
1✔
1853
        joint_prow['PROCCAMWORD'] = create_camword(goodcams)
1✔
1854
    else:
UNCOV
1855
        log.warning("Warning asked to produce joint proc table row for unknown"
×
1856
                    + f" job description {descriptor}")
1857

1858
    joint_prow = assign_dependency(joint_prow, dependency=prows)
1✔
1859
    return joint_prow, internal_id
1✔
1860

1861
def make_exposure_prow(erow, int_id, calibjobs, jobdesc=None):
1✔
1862
    prow = erow_to_prow(erow)
1✔
1863
    prow['INTID'] = int_id
1✔
1864
    int_id += 1
1✔
1865
    if jobdesc is None:
1✔
1866
        prow['JOBDESC'] = prow['OBSTYPE']
1✔
1867
    else:
1868
        prow['JOBDESC'] = jobdesc
1✔
1869
    prow = define_and_assign_dependency(prow, calibjobs)
1✔
1870
    return prow, int_id
1✔
1871

1872
def make_tnight_prow(prows, calibjobs, internal_id):
1✔
1873
    """
1874
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
1875
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
1876
    input prows).
1877

1878
    Args:
1879
        prows, list or array of dicts. Unsumbitted rows corresponding to the individual prestdstar jobs that are
1880
            the first steps of tilenight.
1881
        calibjobs, dict. Dictionary containing keys that each corresponds to a Table.Row or
1882
            None, with each table.Row() value corresponding to a calibration job
1883
            on which the tilenight job depends.
1884
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1885

1886
    Returns:
1887
        dict: Row of a processing table corresponding to the tilenight job.
1888
    """
1889
    first_row = table_row_to_dict(prows[0])
1✔
1890
    joint_prow = first_row.copy()
1✔
1891

1892
    joint_prow['INTID'] = internal_id
1✔
1893
    joint_prow['JOBDESC'] = 'tilenight'
1✔
1894
    joint_prow['LATEST_QID'] = -99
1✔
1895
    joint_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
1✔
1896
    joint_prow['SUBMIT_DATE'] = -99
1✔
1897
    joint_prow['STATUS'] = 'UNSUBMITTED'
1✔
1898
    joint_prow['SCRIPTNAME'] = ''
1✔
1899
    joint_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int)
1✔
1900

1901
    joint_prow = define_and_assign_dependency(joint_prow, calibjobs, use_tilenight=True)
1✔
1902

1903
    return joint_prow
1✔
1904

1905
def make_redshift_prow(prows, tnights, descriptor, internal_id):
1✔
1906
    """
1907
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
1908
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
1909
    input prows).
1910

1911
    Args:
1912
        prows, list or array of dicts. Unsumbitted rows corresponding to the individual prestdstar jobs that are
1913
            the first steps of tilenight.
1914
        tnights, list or array of Table.Row objects. Rows corresponding to the tilenight jobs on which the redshift job depends.
1915
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1916

1917
    Returns:
1918
        dict: Row of a processing table corresponding to the tilenight jobs.
1919
    """
UNCOV
1920
    first_row = table_row_to_dict(prows[0])
×
UNCOV
1921
    redshift_prow = first_row.copy()
×
1922

UNCOV
1923
    redshift_prow['INTID'] = internal_id
×
UNCOV
1924
    redshift_prow['JOBDESC'] = descriptor
×
UNCOV
1925
    redshift_prow['LATEST_QID'] = -99
×
UNCOV
1926
    redshift_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
×
UNCOV
1927
    redshift_prow['SUBMIT_DATE'] = -99
×
UNCOV
1928
    redshift_prow['STATUS'] = 'UNSUBMITTED'
×
UNCOV
1929
    redshift_prow['SCRIPTNAME'] = ''
×
UNCOV
1930
    redshift_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int)
×
1931

UNCOV
1932
    redshift_prow = assign_dependency(redshift_prow,dependency=tnights)
×
1933

UNCOV
1934
    return redshift_prow
×
1935

1936
def checkfor_and_submit_joint_job(ptable, arcs, flats, sciences, calibjobs,
1✔
1937
                                  lasttype, internal_id, z_submit_types=None, dry_run=0,
1938
                                  queue='realtime', reservation=None, strictly_successful=False,
1939
                                  check_for_outputs=True, resubmit_partial_complete=True,
1940
                                  system_name=None):
1941
    """
1942
    Takes all the state-ful data from daily processing and determines whether a joint fit needs to be submitted. Places
1943
    the decision criteria into a single function for easier maintainability over time. These are separate from the
1944
    new standard manifest*.json method of indicating a calibration sequence is complete. That is checked independently
1945
    elsewhere and doesn't interact with this.
1946

1947
    Args:
1948
        ptable (Table): Processing table of all exposures that have been processed.
1949
        arcs (list of dict): list of the individual arc jobs to be used for the psfnight (NOT all
1950
            the arcs, if multiple sets existed). May be empty if none identified yet.
1951
        flats (list of dict): list of the individual flat jobs to be used for the nightlyflat (NOT
1952
            all the flats, if multiple sets existed). May be empty if none identified yet.
1953
        sciences (list of dict): list of the most recent individual prestdstar science exposures
1954
            (if currently processing that tile). May be empty if none identified yet.
1955
        calibjobs (dict): Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
1956
            and 'nightlyflat'. Each key corresponds to a Table.Row or
1957
            None. The table.Row() values are for the corresponding
1958
            calibration job.
1959
        lasttype (str or None): the obstype of the last individual exposure row to be processed.
1960
        internal_id (int): an internal identifier unique to each job. Increments with each new job. This
1961
            is the smallest unassigned value.
1962
        z_submit_types (list of str): The "group" types of redshifts that should be submitted with each
1963
            exposure. If not specified or None, then no redshifts are submitted.
1964
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1965
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1966
            for testing as though scripts are being submitted. Default is 0 (false).
1967
        queue (str, optional): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1968
        reservation (str, optional): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1969
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1970
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1971
            than failing completely from failed calibrations. Default is False.
1972
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1973
            data products for the script being submitted. If all files exist and this is True,
1974
            then the script will not be submitted. If some files exist and this is True, only the
1975
            subset of the cameras without the final data products will be generated and submitted.
1976
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1977
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1978
            remaining cameras not found to exist.
1979
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, permutter-gpu
1980

1981
    Returns:
1982
        tuple: A tuple containing:
1983

1984
        * ptable, Table, Processing table of all exposures that have been processed.
1985
        * calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
1986
          and 'nightlyflat'. Each key corresponds to a Table.Row or
1987
          None. The table.Row() values are for the corresponding
1988
          calibration job.
1989
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
1990
          (if currently processing that tile). May be empty if none identified yet or
1991
          we just submitted them for processing.
1992
        * internal_id, int, if no job is submitted, this is the same as the input, otherwise it is incremented upward from
1993
          from the input such that it represents the smallest unused ID.
1994
    """
UNCOV
1995
    if lasttype == 'science' and len(sciences) > 0:
×
UNCOV
1996
        log = get_logger()
×
UNCOV
1997
        skysubonly = np.array([sci['LASTSTEP'] == 'skysub' for sci in sciences])
×
UNCOV
1998
        if np.all(skysubonly):
×
UNCOV
1999
            log.error("Identified all exposures in joint fitting request as skysub-only. Not submitting")
×
UNCOV
2000
            sciences = []
×
UNCOV
2001
            return ptable, calibjobs, sciences, internal_id
×
2002

UNCOV
2003
        if np.any(skysubonly):
×
UNCOV
2004
            log.error("Identified skysub-only exposures in joint fitting request")
×
UNCOV
2005
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
UNCOV
2006
            log.info("LASTSTEP's: {}".format([row['LASTSTEP'] for row in sciences]))
×
UNCOV
2007
            sciences = (np.array(sciences,dtype=object)[~skysubonly]).tolist()
×
UNCOV
2008
            log.info("Removed skysub only exposures in joint fitting:")
×
UNCOV
2009
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
UNCOV
2010
            log.info("LASTSTEP's: {}".format([row['LASTSTEP'] for row in sciences]))
×
2011

UNCOV
2012
        from collections import Counter
×
UNCOV
2013
        tiles = np.array([sci['TILEID'] for sci in sciences])
×
UNCOV
2014
        counts = Counter(tiles)
×
UNCOV
2015
        if len(counts.most_common()) > 1:
×
UNCOV
2016
            log.error("Identified more than one tile in a joint fitting request")
×
UNCOV
2017
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
UNCOV
2018
            log.info("Tileid's: {}".format(tiles))
×
UNCOV
2019
            log.info("Returning without joint fitting any of these exposures.")
×
2020
            # most_common, nmost_common = counts.most_common()[0]
2021
            # if most_common == -99:
2022
            #     most_common, nmost_common = counts.most_common()[1]
2023
            # log.warning(f"Given multiple tiles to jointly fit: {counts}. "+
2024
            #             "Only processing the most common non-default " +
2025
            #             f"tile: {most_common} with {nmost_common} exposures")
2026
            # sciences = (np.array(sciences,dtype=object)[tiles == most_common]).tolist()
2027
            # log.info("Tiles and exposure id's being submitted for joint fitting:")
2028
            # log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
2029
            # log.info("Tileid's: {}".format([row['TILEID'] for row in sciences]))
UNCOV
2030
            sciences = []
×
UNCOV
2031
            return ptable, calibjobs, sciences, internal_id
×
2032

UNCOV
2033
        ptable, tilejob, internal_id = science_joint_fit(ptable, sciences, internal_id, z_submit_types=z_submit_types,
×
2034
                                                         dry_run=dry_run, queue=queue, reservation=reservation,
2035
                                                         strictly_successful=strictly_successful,
2036
                                                         check_for_outputs=check_for_outputs,
2037
                                                         resubmit_partial_complete=resubmit_partial_complete,
2038
                                                         system_name=system_name)
2039
        if tilejob is not None:
×
2040
            sciences = []
×
2041

2042
    elif lasttype == 'flat' and calibjobs['nightlyflat'] is None and len(flats) == 12:
×
2043
        ## Note here we have an assumption about the number of expected flats being greater than 11
2044
        ptable, calibjobs['nightlyflat'], internal_id \
×
2045
            = flat_joint_fit(ptable, flats, internal_id, dry_run=dry_run, queue=queue,
2046
                             reservation=reservation, strictly_successful=strictly_successful,
2047
                             check_for_outputs=check_for_outputs,
2048
                             resubmit_partial_complete=resubmit_partial_complete,
2049
                             system_name=system_name
2050
                            )
2051

2052
    elif lasttype == 'arc' and calibjobs['psfnight'] is None and len(arcs) == 5:
×
2053
        ## Note here we have an assumption about the number of expected arcs being greater than 4
UNCOV
2054
        ptable, calibjobs['psfnight'], internal_id \
×
2055
            = arc_joint_fit(ptable, arcs, internal_id, dry_run=dry_run, queue=queue,
2056
                            reservation=reservation, strictly_successful=strictly_successful,
2057
                            check_for_outputs=check_for_outputs,
2058
                            resubmit_partial_complete=resubmit_partial_complete,
2059
                            system_name=system_name
2060
                            )
2061
    return ptable, calibjobs, sciences, internal_id
×
2062

2063
def submit_tilenight_and_redshifts(ptable, sciences, calibjobs, internal_id, dry_run=0,
1✔
2064
                                  queue='realtime', reservation=None, strictly_successful=False,
2065
                                  check_for_outputs=True, resubmit_partial_complete=True,
2066
                                  system_name=None,use_specter=False, extra_job_args=None):
2067
    """
2068
    Takes all the state-ful data from daily processing and determines whether a tilenight job needs to be submitted.
2069

2070
    Args:
2071
        ptable (Table): Processing table of all exposures that have been processed.
2072
        sciences (list of dict): list of the most recent individual prestdstar science exposures
2073
            (if currently processing that tile). May be empty if none identified yet.
2074
        internal_id (int): an internal identifier unique to each job. Increments with each new job. This
2075
            is the smallest unassigned value.
2076
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
2077
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
2078
            for testing as though scripts are being submitted. Default is 0 (false).
2079
        queue (str, optional): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
2080
        reservation (str, optional): The reservation to submit jobs to. If None, it is not submitted to a reservation.
2081
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
2082
            less desirable because e.g. the sciences can run with SVN default calibrations rather
2083
            than failing completely from failed calibrations. Default is False.
2084
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
2085
            data products for the script being submitted. If all files exist and this is True,
2086
            then the script will not be submitted. If some files exist and this is True, only the
2087
            subset of the cameras without the final data products will be generated and submitted.
2088
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
2089
            jobs with some prior data are pruned using PROCCAMWORD to only process the
2090
            remaining cameras not found to exist.
2091
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, permutter-gpu
2092
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
2093
        extra_job_args (dict, optional): Dictionary with key-value pairs that specify additional
2094
            information used for a specific type of job. Examples include
2095
            laststeps for tilenight, z_submit_types for redshifts, etc.
2096

2097
    Returns:
2098
        tuple: A tuple containing:
2099

2100
        * ptable, Table, Processing table of all exposures that have been processed.
2101
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
2102
          (if currently processing that tile). May be empty if none identified yet or
2103
          we just submitted them for processing.
2104
        * internal_id, int, if no job is submitted, this is the same as the input, otherwise it is incremented upward from
2105
          from the input such that it represents the smallest unused ID.
2106
    """
2107
    ptable, tnight, internal_id = submit_tilenight(ptable, sciences, calibjobs, internal_id,
1✔
2108
                                             queue=queue, reservation=reservation,
2109
                                             dry_run=dry_run, strictly_successful=strictly_successful,
2110
                                             resubmit_partial_complete=resubmit_partial_complete,
2111
                                             system_name=system_name,use_specter=use_specter,
2112
                                             extra_job_args=extra_job_args)
2113

2114
    z_submit_types = None
1✔
2115
    if 'z_submit_types'  in extra_job_args:
1✔
2116
        z_submit_types = extra_job_args['z_submit_types']
1✔
2117
        
2118
    ptable, internal_id = submit_redshifts(ptable, sciences, tnight, internal_id,
1✔
2119
                                    queue=queue, reservation=reservation,
2120
                                    dry_run=dry_run, strictly_successful=strictly_successful,
2121
                                    check_for_outputs=check_for_outputs,
2122
                                    resubmit_partial_complete=resubmit_partial_complete,
2123
                                    z_submit_types=z_submit_types,
2124
                                    system_name=system_name)
2125

2126
    if tnight is not None:
1✔
2127
        sciences = []
1✔
2128

2129
    return ptable, sciences, internal_id
1✔
2130

2131
def set_calibrator_flag(prows, ptable):
1✔
2132
    """
2133
    Sets the "CALIBRATOR" column of a procesing table row to 1 (integer representation of True)
2134
     for all input rows. Used within joint fitting code to flag the exposures that were input
2135
     to the psfnight or nightlyflat for later reference.
2136

2137
    Args:
2138
        prows, list or array of Table.Rows or dicts. The rows corresponding to the individual exposure jobs that are
2139
            inputs to the joint fit.
2140
        ptable, Table. The processing table where each row is a processed job.
2141

2142
    Returns:
2143
        Table: The same processing table as input except with added rows for the joint fit job and, in the case
2144
        of a stdstarfit, the poststdstar science exposure jobs.
2145
    """
2146
    for prow in prows:
1✔
2147
        ptable['CALIBRATOR'][ptable['INTID'] == prow['INTID']] = 1
1✔
2148
    return ptable
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc