• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

desihub / desispec / 10531518171

23 Aug 2024 07:50PM UTC coverage: 30.134%. First build
10531518171

push

github

akremin
fix pernight redshift intid bug

0 of 2 new or added lines in 1 file covered. (0.0%)

14586 of 48404 relevant lines covered (30.13%)

0.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.94
/py/desispec/workflow/processing.py
1
"""
2
desispec.workflow.processing
3
============================
4

5
"""
6
import sys, os, glob
1✔
7
import json
1✔
8
from astropy.io import fits
1✔
9
from astropy.table import Table, join
1✔
10
import numpy as np
1✔
11

12
import time, datetime
1✔
13
from collections import OrderedDict
1✔
14
import subprocess
1✔
15

16
from desispec.scripts.link_calibnight import derive_include_exclude
1✔
17
from desispec.scripts.tile_redshifts import generate_tile_redshift_scripts
1✔
18
from desispec.workflow.redshifts import get_ztile_script_pathname, \
1✔
19
    get_ztile_relpath, \
20
    get_ztile_script_suffix
21
from desispec.workflow.exptable import read_minimal_science_exptab_cols
1✔
22
from desispec.workflow.queue import get_resubmission_states, update_from_queue, \
1✔
23
    queue_info_from_qids, get_queue_states_from_qids, update_queue_state_cache
24
from desispec.workflow.timing import what_night_is_it
1✔
25
from desispec.workflow.desi_proc_funcs import get_desi_proc_batch_file_pathname, \
1✔
26
    create_desi_proc_batch_script, \
27
    get_desi_proc_batch_file_path, \
28
    get_desi_proc_tilenight_batch_file_pathname, \
29
    create_desi_proc_tilenight_batch_script, create_linkcal_batch_script
30
from desispec.workflow.batch import parse_reservation
1✔
31
from desispec.workflow.utils import pathjoin, sleep_and_report, \
1✔
32
    load_override_file
33
from desispec.workflow.tableio import write_table, load_table
1✔
34
from desispec.workflow.proctable import table_row_to_dict, erow_to_prow, \
1✔
35
    read_minimal_tilenight_proctab_cols, read_minimal_full_proctab_cols, \
36
    update_full_ptab_cache, default_prow, get_default_qid
37
from desiutil.log import get_logger
1✔
38

39
from desispec.io import findfile, specprod_root
1✔
40
from desispec.io.util import decode_camword, create_camword, \
1✔
41
    difference_camwords, \
42
    camword_to_spectros, camword_union, camword_intersection, parse_badamps
43

44

45
#################################################
46
############## Misc Functions ###################
47
#################################################
48
def night_to_starting_iid(night=None):
1✔
49
    """
50
    Creates an internal ID for a given night. The resulting integer is an 8 digit number.
51
    The digits are YYMMDDxxx where YY is the years since 2000, MM and DD are the month and day. xxx are 000,
52
    and are incremented for up to 1000 unique job ID's for a given night.
53

54
    Args:
55
        night (str or int): YYYYMMDD of the night to get the starting internal ID for.
56

57
    Returns:
58
        int: 9 digit number consisting of YYMMDD000. YY is years after 2000, MMDD is month and day.
59
        000 being the starting job number (0).
60
    """
61
    if night is None:
1✔
62
        night = what_night_is_it()
×
63
    night = int(night)
1✔
64
    internal_id = (night - 20000000) * 1000
1✔
65
    return internal_id
1✔
66

67
class ProcessingParams():
1✔
68
    def __init__(self, dry_run_level=0, queue='realtime',
1✔
69
                 reservation=None, strictly_successful=True,
70
                 check_for_outputs=True,
71
                 resubmit_partial_complete=True,
72
                 system_name='perlmutter', use_specter=True):
73

74
        self.dry_run_level = dry_run_level
×
75
        self.system_name = system_name
×
76
        self.queue = queue
×
77
        self.reservation = reservation
×
78
        self.strictly_successful = strictly_successful
×
79
        self.check_for_outputs = check_for_outputs
×
80
        self.resubmit_partial_complete = resubmit_partial_complete
×
81
        self.use_specter = use_specter
×
82

83
#################################################
84
############ Script Functions ###################
85
#################################################
86
def batch_script_name(prow):
1✔
87
    """
88
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, JOBDESC, PROCCAMWORD defined)
89
    and determines the script file pathname as defined by desi_proc's helper functions.
90

91
    Args:
92
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
93

94
    Returns:
95
        str: The complete pathname to the script file, as it is defined within the desi_proc ecosystem.
96
    """
97
    expids = prow['EXPID']
1✔
98
    if len(expids) == 0:
1✔
99
        expids = None
1✔
100
    if prow['JOBDESC'] == 'tilenight':
1✔
101
        pathname = get_desi_proc_tilenight_batch_file_pathname(night = prow['NIGHT'], tileid=prow['TILEID'])
1✔
102
    else:
103
        pathname = get_desi_proc_batch_file_pathname(night = prow['NIGHT'], exp=expids, \
1✔
104
                                             jobdesc=prow['JOBDESC'], cameras=prow['PROCCAMWORD'])
105
    scriptfile =  pathname + '.slurm'
1✔
106
    return scriptfile
1✔
107

108
def get_jobdesc_to_file_map():
1✔
109
    """
110
    Returns a mapping of job descriptions to the filenames of the output files
111

112
    Args:
113
        None
114

115
    Returns:
116
        dict. Dictionary with keys as lowercase job descriptions and to the
117
            filename of their expected outputs.
118

119
    """
120
    return {'prestdstar': 'sframe',
1✔
121
            'stdstarfit': 'stdstars',
122
            'poststdstar': 'cframe',
123
            'nightlybias': 'biasnight',
124
            # 'ccdcalib': 'badcolumns',
125
            'badcol': 'badcolumns',
126
            'arc': 'fitpsf',
127
            'flat': 'fiberflat',
128
            'psfnight': 'psfnight',
129
            'nightlyflat': 'fiberflatnight',
130
            'spectra': 'spectra_tile',
131
            'coadds': 'coadds_tile',
132
            'redshift': 'redrock_tile'}
133

134
def get_file_to_jobdesc_map():
1✔
135
    """
136
    Returns a mapping of output filenames to job descriptions
137

138
    Args:
139
        None
140

141
    Returns:
142
        dict. Dictionary with keys as filename of their expected outputs to
143
            the lowercase job descriptions
144
            .
145

146
    """
147
    job_to_file_map = get_jobdesc_to_file_map()
×
148
    job_to_file_map.pop('badcol') # these files can also be in a ccdcalib job
×
149
    job_to_file_map.pop('nightlybias') # these files can also be in a ccdcalib job
×
150
    return {value: key for key, value in job_to_file_map.items()}
×
151

152
def check_for_outputs_on_disk(prow, resubmit_partial_complete=True):
1✔
153
    """
154
    Args:
155
        prow (Table.Row or dict): Must include keyword accessible definitions for processing_table columns found in
156
            desispect.workflow.proctable.get_processing_table_column_defs()
157
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
158
            jobs with some prior data are pruned using PROCCAMWORD to only process the
159
            remaining cameras not found to exist.
160

161
    Returns:
162
        Table.Row or dict: The same prow type and keywords as input except with modified values updated to reflect
163
        the change in job status after creating and submitting the job for processing.
164
    """
165
    prow['STATUS'] = 'UNKNOWN'
1✔
166
    log = get_logger()
1✔
167

168
    if prow['JOBDESC'] in ['linkcal', 'ccdcalib']:
1✔
169
        log.info(f"jobdesc={prow['JOBDESC']} has indeterminated outputs, so "
1✔
170
                + "not checking for files on disk.")
171
        return prow
1✔
172

173
    job_to_file_map = get_jobdesc_to_file_map()
1✔
174

175
    night = prow['NIGHT']
1✔
176
    if prow['JOBDESC'] in ['cumulative','pernight-v0','pernight','perexp']:
1✔
177
        filetype = 'redrock_tile'
1✔
178
    else:
179
        filetype = job_to_file_map[prow['JOBDESC']]
1✔
180
    orig_camword = prow['PROCCAMWORD']
1✔
181

182
    ## if spectro based, look for spectros, else look for cameras
183
    if prow['JOBDESC'] in ['stdstarfit','spectra','coadds','redshift']:
1✔
184
        ## Spectrograph based
185
        spectros = camword_to_spectros(prow['PROCCAMWORD'])
×
186
        n_desired = len(spectros)
×
187
        ## Suppress outputs about using tile based files in findfile if only looking for stdstarfits
188
        if prow['JOBDESC'] == 'stdstarfit':
×
189
            tileid = None
×
190
        else:
191
            tileid = prow['TILEID']
×
192
        expid = prow['EXPID'][0]
×
193
        existing_spectros = []
×
194
        for spectro in spectros:
×
195
            if os.path.exists(findfile(filetype=filetype, night=night, expid=expid, spectrograph=spectro, tile=tileid)):
×
196
                existing_spectros.append(spectro)
×
197
        completed = (len(existing_spectros) == n_desired)
×
198
        if not completed and resubmit_partial_complete and len(existing_spectros) > 0:
×
199
            existing_camword = 'a' + ''.join([str(spec) for spec in sorted(existing_spectros)])
×
200
            prow['PROCCAMWORD'] = difference_camwords(prow['PROCCAMWORD'],existing_camword)
×
201
    elif prow['JOBDESC'] in ['cumulative','pernight-v0','pernight','perexp']:
1✔
202
        ## Spectrograph based
203
        spectros = camword_to_spectros(prow['PROCCAMWORD'])
1✔
204
        n_desired = len(spectros)
1✔
205
        ## Suppress outputs about using tile based files in findfile if only looking for stdstarfits
206
        tileid = prow['TILEID']
1✔
207
        expid = prow['EXPID'][0]
1✔
208
        redux_dir = specprod_root()
1✔
209
        outdir = os.path.join(redux_dir,get_ztile_relpath(tileid,group=prow['JOBDESC'],night=night,expid=expid))
1✔
210
        suffix = get_ztile_script_suffix(tileid, group=prow['JOBDESC'], night=night, expid=expid)
1✔
211
        existing_spectros = []
1✔
212
        for spectro in spectros:
1✔
213
            if os.path.exists(os.path.join(outdir, f"redrock-{spectro}-{suffix}.fits")):
1✔
214
                existing_spectros.append(spectro)
×
215
        completed = (len(existing_spectros) == n_desired)
1✔
216
        if not completed and resubmit_partial_complete and len(existing_spectros) > 0:
1✔
217
            existing_camword = 'a' + ''.join([str(spec) for spec in sorted(existing_spectros)])
×
218
            prow['PROCCAMWORD'] = difference_camwords(prow['PROCCAMWORD'],existing_camword)
×
219
    else:
220
        ## Otheriwse camera based
221
        cameras = decode_camword(prow['PROCCAMWORD'])
1✔
222
        n_desired = len(cameras)
1✔
223
        if len(prow['EXPID']) > 0:
1✔
224
            expid = prow['EXPID'][0]
1✔
225
        else:
226
            expid = None
1✔
227
        if len(prow['EXPID']) > 1 and prow['JOBDESC'] not in ['psfnight','nightlyflat']:
1✔
228
            log.warning(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']}. This job type only makes " +
×
229
                     f"sense with a single exposure. Proceeding with {expid}.")
230
        missing_cameras = []
1✔
231
        for cam in cameras:
1✔
232
            if not os.path.exists(findfile(filetype=filetype, night=night, expid=expid, camera=cam)):
1✔
233
                missing_cameras.append(cam)
1✔
234
        completed = (len(missing_cameras) == 0)
1✔
235
        if not completed and resubmit_partial_complete and len(missing_cameras) < n_desired:
1✔
236
            prow['PROCCAMWORD'] = create_camword(missing_cameras)
×
237

238
    if completed:
1✔
239
        prow['STATUS'] = 'COMPLETED'
×
240
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} already has " +
×
241
                 f"the desired {n_desired} {filetype}'s. Not submitting this job.")
242
    elif resubmit_partial_complete and orig_camword != prow['PROCCAMWORD']:
1✔
243
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} already has " +
×
244
                 f"some {filetype}'s. Submitting smaller camword={prow['PROCCAMWORD']}.")
245
    elif not resubmit_partial_complete:
1✔
246
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} doesn't have all " +
×
247
                 f"{filetype}'s and resubmit_partial_complete=False. "+
248
                 f"Submitting full camword={prow['PROCCAMWORD']}.")
249
    else:
250
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} has no " +
1✔
251
                 f"existing {filetype}'s. Submitting full camword={prow['PROCCAMWORD']}.")
252
    return prow
1✔
253

254
def create_and_submit(prow, queue='realtime', reservation=None, dry_run=0,
1✔
255
                      joint=False, strictly_successful=False,
256
                      check_for_outputs=True, resubmit_partial_complete=True,
257
                      system_name=None, use_specter=False,
258
                      extra_job_args=None):
259
    """
260
    Wrapper script that takes a processing table row and three modifier keywords, creates a submission script for the
261
    compute nodes, and then submits that script to the Slurm scheduler with appropriate dependencies.
262

263
    Args:
264
        prow (Table.Row or dict): Must include keyword accessible definitions for processing_table columns found in
265
            desispect.workflow.proctable.get_processing_table_column_defs()
266
        queue (str, optional): The name of the NERSC Slurm queue to submit to. Default is the realtime queue.
267
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
268
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
269
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
270
            for testing as though scripts are being submitted. Default is 0 (false).
271
        joint (bool, optional): Whether this is a joint fitting job (the job involves multiple exposures) and therefore needs to be
272
            run with desi_proc_joint_fit. Default is False.
273
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
274
            less desirable because e.g. the sciences can run with SVN default calibrations rather
275
            than failing completely from failed calibrations. Default is False.
276
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
277
            data products for the script being submitted. If all files exist and this is True,
278
            then the script will not be submitted. If some files exist and this is True, only the
279
            subset of the cameras without the final data products will be generated and submitted.
280
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
281
            jobs with some prior data are pruned using PROCCAMWORD to only process the
282
            remaining cameras not found to exist.
283
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
284
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
285
        extra_job_args (dict): Dictionary with key-value pairs that specify additional
286
            information used for a specific type of job. Examples include refnight
287
            and include/exclude lists for linkcals, laststeps for for tilenight, etc.
288

289
    Returns:
290
        Table.Row or dict: The same prow type and keywords as input except with modified values updated to reflect
291
        the change in job status after creating and submitting the job for processing.
292

293
    Note:
294
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
295
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
296
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
297
    """
298
    orig_prow = prow.copy()
1✔
299
    if check_for_outputs:
1✔
300
        prow = check_for_outputs_on_disk(prow, resubmit_partial_complete)
1✔
301
        if prow['STATUS'].upper() == 'COMPLETED':
1✔
302
            return prow
×
303

304
    prow = create_batch_script(prow, queue=queue, dry_run=dry_run, joint=joint,
1✔
305
                               system_name=system_name, use_specter=use_specter,
306
                               extra_job_args=extra_job_args)
307
    prow = submit_batch_script(prow, reservation=reservation, dry_run=dry_run,
1✔
308
                               strictly_successful=strictly_successful)
309

310
    ## If resubmitted partial, the PROCCAMWORD and SCRIPTNAME will correspond
311
    ## to the pruned values. But we want to
312
    ## retain the full job's value, so get those from the old job.
313
    if resubmit_partial_complete:
1✔
314
        prow['PROCCAMWORD'] = orig_prow['PROCCAMWORD']
1✔
315
        prow['SCRIPTNAME'] = orig_prow['SCRIPTNAME']
1✔
316
    return prow
1✔
317

318
def desi_link_calibnight_command(prow, refnight, include=None):
1✔
319
    """
320
    Wrapper script that takes a processing table row (or dictionary with
321
    REFNIGHT, NIGHT, PROCCAMWORD defined) and determines the proper command
322
    line call to link data defined by the input row/dict.
323

324
    Args:
325
        prow (Table.Row or dict): Must include keyword accessible definitions
326
            for 'NIGHT', 'REFNIGHT', and 'PROCCAMWORD'.
327
        refnight (str or int): The night with a valid set of calibrations
328
            be created.
329
        include (list): The filetypes to include in the linking.
330
    Returns:
331
        str: The proper command to be submitted to desi_link_calibnight
332
            to process the job defined by the prow values.
333
    """
334
    cmd = 'desi_link_calibnight'
1✔
335
    cmd += f' --refnight={refnight}'
1✔
336
    cmd += f' --newnight={prow["NIGHT"]}'
1✔
337
    cmd += f' --cameras={prow["PROCCAMWORD"]}'
1✔
338
    if include is not None:
1✔
339
        cmd += f' --include=' + ','.join(list(include))
1✔
340
    return cmd
1✔
341

342
def desi_proc_command(prow, system_name, use_specter=False, queue=None):
1✔
343
    """
344
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, OBSTYPE, JOBDESC, PROCCAMWORD defined)
345
    and determines the proper command line call to process the data defined by the input row/dict.
346

347
    Args:
348
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
349
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, perlmutter-gpu
350
        queue (str, optional): The name of the NERSC Slurm queue to submit to. Default is None (which leaves it to the desi_proc default).
351
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
352

353
    Returns:
354
        str: The proper command to be submitted to desi_proc to process the job defined by the prow values.
355
    """
356
    cmd = 'desi_proc'
1✔
357
    cmd += ' --batch'
1✔
358
    cmd += ' --nosubmit'
1✔
359
    if queue is not None:
1✔
360
        cmd += f' -q {queue}'
1✔
361
    if prow['OBSTYPE'].lower() == 'science':
1✔
362
        if prow['JOBDESC'] == 'prestdstar':
×
363
            cmd += ' --nostdstarfit --nofluxcalib'
×
364
        elif prow['JOBDESC'] == 'poststdstar':
×
365
            cmd += ' --noprestdstarfit --nostdstarfit'
×
366

367
        if use_specter:
×
368
            cmd += ' --use-specter'
×
369
    elif prow['JOBDESC'] in ['flat', 'prestdstar'] and use_specter:
1✔
370
        cmd += ' --use-specter'
×
371
    pcamw = str(prow['PROCCAMWORD'])
1✔
372
    cmd += f" --cameras={pcamw} -n {prow['NIGHT']}"
1✔
373
    if len(prow['EXPID']) > 0:
1✔
374
        ## If ccdcalib job without a dark exposure, don't assign the flat expid
375
        ## since it would incorrectly process the flat using desi_proc
376
        if prow['OBSTYPE'].lower() != 'flat' or prow['JOBDESC'] != 'ccdcalib':
1✔
377
            cmd += f" -e {prow['EXPID'][0]}"
1✔
378
    if prow['BADAMPS'] != '':
1✔
379
        cmd += ' --badamps={}'.format(prow['BADAMPS'])
×
380
    return cmd
1✔
381

382
def desi_proc_joint_fit_command(prow, queue=None):
1✔
383
    """
384
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, OBSTYPE, PROCCAMWORD defined)
385
    and determines the proper command line call to process the data defined by the input row/dict.
386

387
    Args:
388
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
389
        queue (str): The name of the NERSC Slurm queue to submit to. Default is None (which leaves it to the desi_proc default).
390

391
    Returns:
392
        str: The proper command to be submitted to desi_proc_joint_fit
393
            to process the job defined by the prow values.
394
    """
395
    cmd = 'desi_proc_joint_fit'
1✔
396
    cmd += ' --batch'
1✔
397
    cmd += ' --nosubmit'
1✔
398
    if queue is not None:
1✔
399
        cmd += f' -q {queue}'
1✔
400

401
    descriptor = prow['OBSTYPE'].lower()
1✔
402

403
    night = prow['NIGHT']
1✔
404
    specs = str(prow['PROCCAMWORD'])
1✔
405
    expid_str = ','.join([str(eid) for eid in prow['EXPID']])
1✔
406

407
    cmd += f' --obstype {descriptor}'
1✔
408
    cmd += f' --cameras={specs} -n {night}'
1✔
409
    if len(expid_str) > 0:
1✔
410
        cmd += f' -e {expid_str}'
1✔
411
    return cmd
1✔
412

413

414
def create_batch_script(prow, queue='realtime', dry_run=0, joint=False,
1✔
415
                        system_name=None, use_specter=False, extra_job_args=None):
416
    """
417
    Wrapper script that takes a processing table row and three modifier keywords and creates a submission script for the
418
    compute nodes.
419

420
    Args:
421
        prow, Table.Row or dict. Must include keyword accessible definitions for processing_table columns found in
422
            desispect.workflow.proctable.get_processing_table_column_defs()
423
        queue, str. The name of the NERSC Slurm queue to submit to. Default is the realtime queue.
424
        dry_run, int. If nonzero, this is a simulated run. If dry_run=1 the scripts will be written but not submitted.
425
            If dry_run=2, the scripts will not be written nor submitted. Logging will remain the same
426
            for testing as though scripts are being submitted. Default is 0 (false).
427
        joint, bool. Whether this is a joint fitting job (the job involves multiple exposures) and therefore needs to be
428
            run with desi_proc_joint_fit when not using tilenight. Default is False.
429
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
430
        use_specter, bool, optional. Default is False. If True, use specter, otherwise use gpu_specter by default.
431
        extra_job_args (dict): Dictionary with key-value pairs that specify additional
432
            information used for a specific type of job. Examples include refnight
433
            and include/exclude lists for linkcal, laststeps for tilenight, etc.
434

435
    Returns:
436
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
437
        scriptname.
438

439
    Note:
440
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
441
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
442
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
443
    """
444
    log = get_logger()
1✔
445

446
    if extra_job_args is None:
1✔
447
        extra_job_args = {}
1✔
448

449
    if prow['JOBDESC'] in ['perexp','pernight','pernight-v0','cumulative']:
1✔
450
        if dry_run > 1:
1✔
451
            scriptpathname = get_ztile_script_pathname(tileid=prow['TILEID'],group=prow['JOBDESC'],
1✔
452
                                                               night=prow['NIGHT'], expid=prow['EXPID'][0])
453

454
            log.info("Output file would have been: {}".format(scriptpathname))
1✔
455
        else:
456
            #- run zmtl for cumulative redshifts but not others
457
            run_zmtl = (prow['JOBDESC'] == 'cumulative')
1✔
458
            no_afterburners = False
1✔
459
            print(f"entering tileredshiftscript: {prow}")
1✔
460
            scripts, failed_scripts = generate_tile_redshift_scripts(tileid=prow['TILEID'], group=prow['JOBDESC'],
1✔
461
                                                                     nights=[prow['NIGHT']], expids=prow['EXPID'],
462
                                                                     batch_queue=queue, system_name=system_name,
463
                                                                     run_zmtl=run_zmtl,
464
                                                                     no_afterburners=no_afterburners,
465
                                                                     nosubmit=True)
466
            if len(failed_scripts) > 0:
1✔
467
                log.error(f"Redshifts failed for group={prow['JOBDESC']}, night={prow['NIGHT']}, "+
×
468
                          f"tileid={prow['TILEID']}, expid={prow['EXPID']}.")
469
                log.info(f"Returned failed scriptname is {failed_scripts}")
×
470
            elif len(scripts) > 1:
1✔
471
                log.error(f"More than one redshifts returned for group={prow['JOBDESC']}, night={prow['NIGHT']}, "+
×
472
                          f"tileid={prow['TILEID']}, expid={prow['EXPID']}.")
473
                log.info(f"Returned scriptnames were {scripts}")
×
474
            elif len(scripts) == 0:
1✔
475
                msg = f'No scripts were generated for {prow=}'
×
476
                log.critical(prow)
×
477
                raise ValueError(msg)
×
478
            else:
479
                scriptpathname = scripts[0]
1✔
480

481
    elif prow['JOBDESC'] == 'linkcal':
1✔
482
        refnight, include, exclude = -99, None, None
1✔
483
        if 'refnight' in extra_job_args:
1✔
484
            refnight = extra_job_args['refnight']
1✔
485
        if 'include' in extra_job_args:
1✔
486
            include = extra_job_args['include']
1✔
487
        if 'exclude' in extra_job_args:
1✔
488
            exclude = extra_job_args['exclude']
×
489
        include, exclude = derive_include_exclude(include, exclude)
1✔
490
        ## Fiberflatnights shouldn't to be generated with psfs from same time, so
491
        ## shouldn't link psfs without also linking fiberflatnight
492
        ## However, this should be checked at a higher level. If set here,
493
        ## go ahead and do it
494
        # if 'psfnight' in include and not 'fiberflatnight' in include:
495
        #     err = "Must link fiberflatnight if linking psfnight"
496
        #     log.error(err)
497
        #     raise ValueError(err)
498
        if dry_run > 1:
1✔
499
            scriptpathname = batch_script_name(prow)
1✔
500
            log.info("Output file would have been: {}".format(scriptpathname))
1✔
501
            cmd = desi_link_calibnight_command(prow, refnight, include)
1✔
502
            log.info("Command to be run: {}".format(cmd.split()))
1✔
503
        else:
504
            if refnight == -99:
×
505
                err = f'For {prow=} asked to link calibration but not given' \
×
506
                      + ' a valid refnight'
507
                log.error(err)
×
508
                raise ValueError(err)
×
509

510
            cmd = desi_link_calibnight_command(prow, refnight, include)
×
511
            log.info(f"Running: {cmd.split()}")
×
512
            scriptpathname = create_linkcal_batch_script(newnight=prow['NIGHT'],
×
513
                                                        cameras=prow['PROCCAMWORD'],
514
                                                        queue=queue,
515
                                                        cmd=cmd,
516
                                                        system_name=system_name)
517
    else:
518
        if prow['JOBDESC'] != 'tilenight':
1✔
519
            nightlybias, nightlycte, cte_expids = False, False, None
1✔
520
            if 'nightlybias' in extra_job_args:
1✔
521
                nightlybias = extra_job_args['nightlybias']
1✔
522
            elif prow['JOBDESC'].lower() == 'nightlybias':
1✔
523
                nightlybias = True
1✔
524
            if 'nightlycte' in extra_job_args:
1✔
525
                nightlycte = extra_job_args['nightlycte']
1✔
526
            if 'cte_expids' in extra_job_args:
1✔
527
                cte_expids = extra_job_args['cte_expids']
1✔
528
            ## run known joint jobs as joint even if unspecified
529
            ## in the future we can eliminate the need for "joint"
530
            if joint or prow['JOBDESC'].lower() in ['psfnight', 'nightlyflat']:
1✔
531
                cmd = desi_proc_joint_fit_command(prow, queue=queue)
1✔
532
                ## For consistency with how we edit the other commands, do them
533
                ## here, but future TODO would be to move these into the command
534
                ## generation itself
535
                if 'extra_cmd_args' in extra_job_args:
1✔
536
                    cmd += ' ' + ' '.join(np.atleast_1d(extra_job_args['extra_cmd_args']))
1✔
537
            else:
538
                cmd = desi_proc_command(prow, system_name, use_specter, queue=queue)
1✔
539
                if nightlybias:
1✔
540
                    cmd += ' --nightlybias'
1✔
541
                if nightlycte:
1✔
542
                    cmd += ' --nightlycte'
1✔
543
                    if cte_expids is not None:
1✔
544
                        cmd += ' --cte-expids '
1✔
545
                        cmd += ','.join(np.atleast_1d(cte_expids).astype(str))
1✔
546
        if dry_run > 1:
1✔
547
            scriptpathname = batch_script_name(prow)
1✔
548
            log.info("Output file would have been: {}".format(scriptpathname))
1✔
549
            if prow['JOBDESC'] != 'tilenight':
1✔
550
                log.info("Command to be run: {}".format(cmd.split()))
1✔
551
        else:
552
            expids = prow['EXPID']
1✔
553
            if len(expids) == 0:
1✔
554
                expids = None
×
555

556
            if prow['JOBDESC'] == 'tilenight':
1✔
557
                log.info("Creating tilenight script for tile {}".format(prow['TILEID']))
1✔
558
                if 'laststeps' in extra_job_args:
1✔
559
                    laststeps = extra_job_args['laststeps']
1✔
560
                else:
561
                    err = f'{prow=} job did not specify last steps to tilenight'
×
562
                    log.error(err)
×
563
                    raise ValueError(err)
×
564
                ncameras = len(decode_camword(prow['PROCCAMWORD']))
1✔
565
                scriptpathname = create_desi_proc_tilenight_batch_script(
1✔
566
                                                               night=prow['NIGHT'], exp=expids,
567
                                                               tileid=prow['TILEID'],
568
                                                               ncameras=ncameras,
569
                                                               queue=queue,
570
                                                               mpistdstars=True,
571
                                                               use_specter=use_specter,
572
                                                               system_name=system_name,
573
                                                               laststeps=laststeps)
574
            else:
575
                if expids is not None and len(expids) > 1:
1✔
576
                    expids = expids[:1]
1✔
577
                log.info("Running: {}".format(cmd.split()))
1✔
578
                scriptpathname = create_desi_proc_batch_script(night=prow['NIGHT'], exp=expids,
1✔
579
                                                               cameras=prow['PROCCAMWORD'],
580
                                                               jobdesc=prow['JOBDESC'],
581
                                                               queue=queue, cmdline=cmd,
582
                                                               use_specter=use_specter,
583
                                                               system_name=system_name,
584
                                                               nightlybias=nightlybias,
585
                                                               nightlycte=nightlycte,
586
                                                               cte_expids=cte_expids)
587
    log.info("Outfile is: {}".format(scriptpathname))
1✔
588
    prow['SCRIPTNAME'] = os.path.basename(scriptpathname)
1✔
589
    return prow
1✔
590

591
_fake_qid = int(time.time() - 1.7e9)
1✔
592
def _get_fake_qid():
1✔
593
    """
594
    Return fake slurm queue jobid to use for dry-run testing
595
    """
596
    # Note: not implemented as a yield generator so that this returns a
597
    # genuine int, not a generator object
598
    global _fake_qid
599
    _fake_qid += 1
1✔
600
    return _fake_qid
1✔
601

602
def submit_batch_script(prow, dry_run=0, reservation=None, strictly_successful=False):
1✔
603
    """
604
    Wrapper script that takes a processing table row and three modifier keywords and submits the scripts to the Slurm
605
    scheduler.
606

607
    Args:
608
        prow, Table.Row or dict. Must include keyword accessible definitions for processing_table columns found in
609
            desispect.workflow.proctable.get_processing_table_column_defs()
610
        dry_run, int. If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
611
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
612
            for testing as though scripts are being submitted. Default is 0 (false).
613
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
614
        strictly_successful, bool. Whether all jobs require all inputs to have succeeded. For daily processing, this is
615
            less desirable because e.g. the sciences can run with SVN default calibrations rather
616
            than failing completely from failed calibrations. Default is False.
617

618
    Returns:
619
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
620
        scriptname.
621

622
    Note:
623
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
624
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
625
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
626
    """
627
    log = get_logger()
1✔
628
    dep_qids = prow['LATEST_DEP_QID']
1✔
629
    dep_list, dep_str = '', ''
1✔
630

631
    ## With desi_proc_night we now either resubmit failed jobs or exit, so this
632
    ## should no longer be necessary in the normal workflow.
633
    # workaround for sbatch --dependency bug not tracking jobs correctly
634
    # see NERSC TICKET INC0203024
635
    if len(dep_qids) > 0 and not dry_run:
1✔
636
        state_dict = get_queue_states_from_qids(dep_qids, dry_run=dry_run, use_cache=True)
×
637
        still_depids = []
×
638
        for depid in dep_qids:
×
639
            if depid in state_dict.keys() and state_dict[int(depid)] == 'COMPLETED':
×
640
                log.info(f"removing completed jobid {depid}")
×
641
            else:
642
                still_depids.append(depid)
×
643
        dep_qids = np.array(still_depids)
×
644

645
    if len(dep_qids) > 0:
1✔
646
        jobtype = prow['JOBDESC']
1✔
647
        if strictly_successful:
1✔
648
            depcond = 'afterok'
1✔
649
        elif jobtype in ['arc', 'psfnight', 'prestdstar', 'stdstarfit']:
×
650
            ## (though psfnight and stdstarfit will require some inputs otherwise they'll go up in flames)
651
            depcond = 'afterany'
×
652
        else:
653
            ## if 'flat','nightlyflat','poststdstar', or any type of redshift, require strict success of inputs
654
            depcond = 'afterok'
×
655

656
        dep_str = f'--dependency={depcond}:'
1✔
657

658
        if np.isscalar(dep_qids):
1✔
659
            dep_list = str(dep_qids).strip(' \t')
×
660
            if dep_list == '':
×
661
                dep_str = ''
×
662
            else:
663
                dep_str += dep_list
×
664
        else:
665
            if len(dep_qids)>1:
1✔
666
                dep_list = ':'.join(np.array(dep_qids).astype(str))
1✔
667
                dep_str += dep_list
1✔
668
            elif len(dep_qids) == 1 and dep_qids[0] not in [None, 0]:
1✔
669
                dep_str += str(dep_qids[0])
1✔
670
            else:
671
                dep_str = ''
×
672

673
    # script = f'{jobname}.slurm'
674
    # script_path = pathjoin(batchdir, script)
675
    if prow['JOBDESC'] in ['pernight-v0','pernight','perexp','cumulative']:
1✔
676
        script_path = get_ztile_script_pathname(tileid=prow['TILEID'],group=prow['JOBDESC'],
1✔
677
                                                        night=prow['NIGHT'], expid=np.min(prow['EXPID']))
678
        jobname = os.path.basename(script_path)
1✔
679
    else:
680
        batchdir = get_desi_proc_batch_file_path(night=prow['NIGHT'])
1✔
681
        jobname = batch_script_name(prow)
1✔
682
        script_path = pathjoin(batchdir, jobname)
1✔
683

684
    batch_params = ['sbatch', '--parsable']
1✔
685
    if dep_str != '':
1✔
686
        batch_params.append(f'{dep_str}')
1✔
687

688
    reservation = parse_reservation(reservation, prow['JOBDESC'])
1✔
689
    if reservation is not None:
1✔
690
        batch_params.append(f'--reservation={reservation}')
×
691

692
    batch_params.append(f'{script_path}')
1✔
693
    submitted = True
1✔
694
    if dry_run:
1✔
695
        current_qid = _get_fake_qid()
1✔
696
    else:
697
        #- sbatch sometimes fails; try several times before giving up
698
        max_attempts = 3
×
699
        for attempt in range(max_attempts):
×
700
            try:
×
701
                current_qid = subprocess.check_output(batch_params, stderr=subprocess.STDOUT, text=True)
×
702
                current_qid = int(current_qid.strip(' \t\n'))
×
703
                break
×
704
            except subprocess.CalledProcessError as err:
×
705
                log.error(f'{jobname} submission failure at {datetime.datetime.now()}')
×
706
                log.error(f'{jobname}   {batch_params}')
×
707
                log.error(f'{jobname}   {err.output=}')
×
708
                if attempt < max_attempts - 1:
×
709
                    log.info('Sleeping 60 seconds then retrying')
×
710
                    time.sleep(60)
×
711
        else:  #- for/else happens if loop doesn't succeed
712
            msg = f'{jobname} submission failed {max_attempts} times.' \
×
713
                  + ' setting as unsubmitted and moving on'
714
            log.error(msg)
×
715
            current_qid = get_default_qid()
×
716
            submitted = False
×
717

718
    log.info(batch_params)
1✔
719

720
    ## Update prow with new information
721
    prow['LATEST_QID'] = current_qid
1✔
722

723
    ## If we didn't submit, don't say we did and don't add to ALL_QIDS
724
    if submitted:
1✔
725
        log.info(f'Submitted {jobname} with dependencies {dep_str} and '
1✔
726
                 + f'reservation={reservation}. Returned qid: {current_qid}')
727

728
        ## Update prow with new information
729
        prow['ALL_QIDS'] = np.append(prow['ALL_QIDS'],current_qid)
1✔
730
        prow['STATUS'] = 'SUBMITTED'
1✔
731
        prow['SUBMIT_DATE'] = int(time.time())
1✔
732
    else:
733
        prow['STATUS'] = 'UNSUBMITTED'
×
734

735
        ## Update the Slurm jobid cache of job states
736
        update_queue_state_cache(qid=prow['LATEST_QID'], state=prow['STATUS'])
×
737

738
    return prow
1✔
739

740

741
#############################################
742
##########   Row Manipulations   ############
743
#############################################
744
def define_and_assign_dependency(prow, calibjobs, use_tilenight=False,
1✔
745
                                 refnight=None):
746
    """
747
    Given input processing row and possible calibjobs, this defines the
748
    JOBDESC keyword and assigns the dependency appropriate for the job type of
749
    prow.
750

751
    Args:
752
        prow, Table.Row or dict. Must include keyword accessible definitions for
753
            'OBSTYPE'. A row must have column names for
754
            'JOBDESC', 'INT_DEP_IDS', and 'LATEST_DEP_ID'.
755
        calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
756
            and 'nightlyflat'. Each key corresponds to a Table.Row or
757
            None. The table.Row() values are for the corresponding
758
            calibration job. Each value that isn't None must contain
759
            'INTID', and 'LATEST_QID'. If None, it assumes the
760
            dependency doesn't exist and no dependency is assigned.
761
        use_tilenight, bool. Default is False. If True, use desi_proc_tilenight
762
            for prestdstar, stdstar,and poststdstar steps for
763
            science exposures.
764
        refnight, int. The reference night for linking jobs
765

766
    Returns:
767
        Table.Row or dict: The same prow type and keywords as input except
768
        with modified values updated values for 'JOBDESC', 'INT_DEP_IDS'. and 'LATEST_DEP_ID'.
769

770
    Note:
771
        This modifies the input. Though Table.Row objects are generally copied
772
        on modification, so the change to the input object in memory may or may
773
        not be changed. As of writing, a row from a table given to this function
774
        will not change during the execution of this function (but can be
775
        overwritten explicitly with the returned row if desired).
776
    """
777
    if prow['OBSTYPE'] in ['science', 'twiflat']:
1✔
778
        if calibjobs['nightlyflat'] is not None:
1✔
779
            dependency = calibjobs['nightlyflat']
1✔
780
        elif calibjobs['psfnight'] is not None:
1✔
781
            dependency = calibjobs['psfnight']
1✔
782
        elif calibjobs['ccdcalib'] is not None:
1✔
783
            dependency = calibjobs['ccdcalib']
1✔
784
        elif calibjobs['nightlybias'] is not None:
1✔
785
            dependency = calibjobs['nightlybias']
×
786
        elif calibjobs['badcol'] is not None:
1✔
787
            dependency = calibjobs['badcol']
×
788
        else:
789
            dependency = calibjobs['linkcal']
1✔
790
        if not use_tilenight:
1✔
791
            prow['JOBDESC'] = 'prestdstar'
1✔
792
    elif prow['OBSTYPE'] == 'flat':
1✔
793
        if calibjobs['psfnight'] is not None:
1✔
794
            dependency = calibjobs['psfnight']
1✔
795
        elif calibjobs['ccdcalib'] is not None:
1✔
796
            dependency = calibjobs['ccdcalib']
1✔
797
        elif calibjobs['nightlybias'] is not None:
1✔
798
            dependency = calibjobs['nightlybias']
×
799
        elif calibjobs['badcol'] is not None:
1✔
800
            dependency = calibjobs['badcol']
×
801
        else:
802
            dependency = calibjobs['linkcal']
1✔
803
    elif prow['OBSTYPE'] == 'arc':
1✔
804
        if calibjobs['ccdcalib'] is not None:
1✔
805
            dependency = calibjobs['ccdcalib']
1✔
806
        elif calibjobs['nightlybias'] is not None:
1✔
807
            dependency = calibjobs['nightlybias']
1✔
808
        elif calibjobs['badcol'] is not None:
1✔
809
            dependency = calibjobs['badcol']
×
810
        else:
811
            dependency = calibjobs['linkcal']
1✔
812
    elif prow['JOBDESC'] in ['badcol', 'nightlybias', 'ccdcalib']:
1✔
813
        dependency = calibjobs['linkcal']
1✔
814
    elif prow['OBSTYPE'] == 'dark':
1✔
815
        if calibjobs['ccdcalib'] is not None:
×
816
            dependency = calibjobs['ccdcalib']
×
817
        elif calibjobs['nightlybias'] is not None:
×
818
            dependency = calibjobs['nightlybias']
×
819
        elif calibjobs['badcol'] is not None:
×
820
            dependency = calibjobs['badcol']
×
821
        else:
822
            dependency = calibjobs['linkcal']
×
823
    elif prow['JOBDESC'] == 'linkcal' and refnight is not None:
1✔
824
        dependency = None
1✔
825
        ## For link cals only, enable cross-night dependencies if available
826
        refproctable = findfile('proctable', night=refnight)
1✔
827
        if os.path.exists(refproctable):
1✔
828
            ptab = load_table(tablename=refproctable, tabletype='proctable')
×
829
            ## This isn't perfect because we may depend on jobs that aren't
830
            ## actually being linked
831
            ## Also allows us to proceed even if jobs don't exist yet
832
            deps = []
×
833
            for job in ['nightlybias', 'ccdcalib', 'psfnight', 'nightlyflat']:
×
834
                if job in ptab['JOBDESC']:
×
835
                    ## add prow to dependencies
836
                    deps.append(ptab[ptab['JOBDESC']==job][0])
×
837
            if len(deps) > 0:
×
838
                dependency = deps
×
839
    else:
840
        dependency = None
×
841

842
    prow = assign_dependency(prow, dependency)
1✔
843

844
    return prow
1✔
845

846

847
def assign_dependency(prow, dependency):
1✔
848
    """
849
    Given input processing row and possible arcjob (processing row for psfnight) and flatjob (processing row for
850
    nightlyflat), this defines the JOBDESC keyword and assigns the dependency appropriate for the job type of prow.
851

852
    Args:
853
        prow, Table.Row or dict. Must include keyword accessible definitions for 'OBSTYPE'. A row must have column names for
854
            'JOBDESC', 'INT_DEP_IDS', and 'LATEST_DEP_ID'.
855
        dependency, NoneType or scalar/list/array of Table.Row, dict. Processing row corresponding to the required input
856
            for the job in prow. This must contain keyword
857
            accessible values for 'INTID', and 'LATEST_QID'.
858
            If None, it assumes the dependency doesn't exist
859
            and no dependency is assigned.
860

861
    Returns:
862
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
863
        'JOBDESC', 'INT_DEP_IDS'. and 'LATEST_DEP_ID'.
864

865
    Note:
866
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
867
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
868
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
869
    """
870
    prow['INT_DEP_IDS'] = np.ndarray(shape=0).astype(int)
1✔
871
    prow['LATEST_DEP_QID'] = np.ndarray(shape=0).astype(int)
1✔
872
    if dependency is not None:
1✔
873
        if type(dependency) in [list, np.array]:
1✔
874
            ids, qids = [], []
1✔
875
            for curdep in dependency:
1✔
876
                ids.append(curdep['INTID'])
1✔
877
                if still_a_dependency(curdep):
1✔
878
                    # ids.append(curdep['INTID'])
879
                    qids.append(curdep['LATEST_QID'])
1✔
880
            prow['INT_DEP_IDS'] = np.array(ids, dtype=int)
1✔
881
            prow['LATEST_DEP_QID'] = np.array(qids, dtype=int)
1✔
882
        elif type(dependency) in [dict, OrderedDict, Table.Row]:
1✔
883
            prow['INT_DEP_IDS'] = np.array([dependency['INTID']], dtype=int)
1✔
884
            if still_a_dependency(dependency):
1✔
885
                prow['LATEST_DEP_QID'] = np.array([dependency['LATEST_QID']], dtype=int)
1✔
886
    return prow
1✔
887

888
def still_a_dependency(dependency):
1✔
889
    """
890
    Defines the criteria for which a dependency is deemed complete (and therefore no longer a dependency).
891

892
     Args:
893
        dependency, Table.Row or dict. Processing row corresponding to the required input for the job in prow.
894
            This must contain keyword accessible values for 'STATUS', and 'LATEST_QID'.
895

896
    Returns:
897
        bool: False if the criteria indicate that the dependency is completed and no longer a blocking factor (ie no longer
898
        a genuine dependency). Returns True if the dependency is still a blocking factor such that the slurm
899
        scheduler needs to be aware of the pending job.
900

901
    """
902
    return dependency['LATEST_QID'] > 0 and dependency['STATUS'] != 'COMPLETED'
1✔
903

904
def get_type_and_tile(erow):
1✔
905
    """
906
    Trivial function to return the OBSTYPE and the TILEID from an exposure table row
907

908
    Args:
909
        erow, Table.Row or dict. Must contain 'OBSTYPE' and 'TILEID' as keywords.
910

911
    Returns:
912
        tuple (str, str), corresponding to the OBSTYPE and TILEID values of the input erow.
913
    """
914
    return str(erow['OBSTYPE']).lower(), erow['TILEID']
×
915

916

917
#############################################
918
#########   Table manipulators   ############
919
#############################################
920
def parse_previous_tables(etable, ptable, night):
1✔
921
    """
922
    This takes in the exposure and processing tables and regenerates all the working memory variables needed for the
923
    daily processing script.
924

925
    Used by the daily processing to define most of its state-ful variables into working memory.
926
    If the processing table is empty, these are simply declared and returned for use.
927
    If the code had previously run and exited (or crashed), however, this will all the code to
928
    re-establish itself by redefining these values.
929

930
    Args:
931
        etable, Table, Exposure table of all exposures that have been dealt with thus far.
932
        ptable, Table, Processing table of all exposures that have been processed.
933
        night, str or int, the night the data was taken.
934

935
    Returns:
936
        tuple: A tuple containing:
937

938
        * arcs, list of dicts, list of the individual arc jobs used for the psfnight (NOT all
939
          the arcs, if multiple sets existed)
940
        * flats, list of dicts, list of the individual flat jobs used for the nightlyflat (NOT
941
          all the flats, if multiple sets existed)
942
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
943
          (if currently processing that tile)
944
        * calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'badcol', 'psfnight'
945
          and 'nightlyflat'. Each key corresponds to a Table.Row or
946
          None. The table.Row() values are for the corresponding
947
          calibration job.
948
        * curtype, None, the obstype of the current job being run. Always None as first new job will define this.
949
        * lasttype, str or None, the obstype of the last individual exposure row to be processed.
950
        * curtile, None, the tileid of the current job (if science). Otherwise None. Always None as first
951
          new job will define this.
952
        * lasttile, str or None, the tileid of the last job (if science). Otherwise None.
953
        * internal_id, int, an internal identifier unique to each job. Increments with each new job. This
954
          is the latest unassigned value.
955
    """
956
    log = get_logger()
×
957
    arcs, flats, sciences = [], [], []
×
958
    calibjobs = {'nightlybias': None, 'ccdcalib': None, 'badcol': None,
×
959
                 'psfnight': None, 'nightlyflat': None, 'linkcal': None,
960
                 'accounted_for': dict()}
961
    curtype,lasttype = None,None
×
962
    curtile,lasttile = None,None
×
963

964
    if len(ptable) > 0:
×
965
        prow = ptable[-1]
×
966
        internal_id = int(prow['INTID'])+1
×
967
        lasttype,lasttile = get_type_and_tile(ptable[-1])
×
968
        jobtypes = ptable['JOBDESC']
×
969

970
        if 'nightlybias' in jobtypes:
×
971
            calibjobs['nightlybias'] = table_row_to_dict(ptable[jobtypes=='nightlybias'][0])
×
972
            log.info("Located nightlybias job in exposure table: {}".format(calibjobs['nightlybias']))
×
973

974
        if 'ccdcalib' in jobtypes:
×
975
            calibjobs['ccdcalib'] = table_row_to_dict(ptable[jobtypes=='ccdcalib'][0])
×
976
            log.info("Located ccdcalib job in exposure table: {}".format(calibjobs['ccdcalib']))
×
977

978
        if 'psfnight' in jobtypes:
×
979
            calibjobs['psfnight'] = table_row_to_dict(ptable[jobtypes=='psfnight'][0])
×
980
            log.info("Located joint fit psfnight job in exposure table: {}".format(calibjobs['psfnight']))
×
981
        elif lasttype == 'arc':
×
982
            seqnum = 10
×
983
            for row in ptable[::-1]:
×
984
                erow = etable[etable['EXPID']==row['EXPID'][0]]
×
985
                if row['OBSTYPE'].lower() == 'arc' and int(erow['SEQNUM'])<seqnum:
×
986
                    arcs.append(table_row_to_dict(row))
×
987
                    seqnum = int(erow['SEQNUM'])
×
988
                else:
989
                    break
×
990
            ## Because we work backword to fill in, we need to reverse them to get chronological order back
991
            arcs = arcs[::-1]
×
992

993
        if 'nightlyflat' in jobtypes:
×
994
            calibjobs['nightlyflat'] = table_row_to_dict(ptable[jobtypes=='nightlyflat'][0])
×
995
            log.info("Located joint fit nightlyflat job in exposure table: {}".format(calibjobs['nightlyflat']))
×
996
        elif lasttype == 'flat':
×
997
            for row in ptable[::-1]:
×
998
                erow = etable[etable['EXPID']==row['EXPID'][0]]
×
999
                if row['OBSTYPE'].lower() == 'flat' and int(erow['SEQTOT']) < 5:
×
1000
                    if float(erow['EXPTIME']) > 100.:
×
1001
                        flats.append(table_row_to_dict(row))
×
1002
                else:
1003
                    break
×
1004
            flats = flats[::-1]
×
1005

1006
        if lasttype.lower() == 'science':
×
1007
            for row in ptable[::-1]:
×
1008
                if row['OBSTYPE'].lower() == 'science' and row['TILEID'] == lasttile and \
×
1009
                   row['JOBDESC'] == 'prestdstar' and row['LASTSTEP'] != 'skysub':
1010
                    sciences.append(table_row_to_dict(row))
×
1011
                else:
1012
                    break
×
1013
            sciences = sciences[::-1]
×
1014
    else:
1015
        internal_id = night_to_starting_iid(night)
×
1016

1017
    return arcs,flats,sciences, \
×
1018
           calibjobs, \
1019
           curtype, lasttype, \
1020
           curtile, lasttile,\
1021
           internal_id
1022

1023
def generate_calibration_dict(ptable, files_to_link=None):
1✔
1024
    """
1025
    This takes in a processing table and regenerates the working memory calibration
1026
    dictionary for dependency tracking. Used by the daily processing to define 
1027
    most of its state-ful variables into working memory.
1028
    If the processing table is empty, these are simply declared and returned for use.
1029
    If the code had previously run and exited (or crashed), however, this will all the code to
1030
    re-establish itself by redefining these values.
1031

1032
    Args:
1033
        ptable, Table, Processing table of all exposures that have been processed.
1034
        files_to_link, set, Set of filenames that the linkcal job will link.
1035

1036
    Returns:
1037
        calibjobs, dict. Dictionary containing 'nightlybias', 'badcol', 'ccdcalib',
1038
            'psfnight', 'nightlyflat', 'linkcal', and 'completed'. Each key corresponds to a
1039
            Table.Row or None. The table.Row() values are for the corresponding
1040
            calibration job.
1041
    """
1042
    log = get_logger()
1✔
1043
    job_to_file_map = get_jobdesc_to_file_map()
1✔
1044
    accounted_for = {'biasnight': False, 'badcolumns': False,
1✔
1045
                     'ctecorrnight': False, 'psfnight': False,
1046
                     'fiberflatnight': False}
1047
    calibjobs = {'nightlybias': None, 'ccdcalib': None, 'badcol': None,
1✔
1048
                 'psfnight': None, 'nightlyflat': None, 'linkcal': None}
1049

1050
    ptable_jobtypes = ptable['JOBDESC']
1✔
1051

1052
    for jobtype in calibjobs.keys():
1✔
1053
        if jobtype in ptable_jobtypes:
1✔
1054
            calibjobs[jobtype] = table_row_to_dict(ptable[ptable_jobtypes==jobtype][0])
×
1055
            log.info(f"Located {jobtype} job in exposure table: {calibjobs[jobtype]}")
×
1056
            if jobtype == 'linkcal':
×
1057
                if files_to_link is not None and len(files_to_link) > 0:
×
1058
                    log.info(f"Assuming existing linkcal job processed "
×
1059
                             + f"{files_to_link} since given in override file.")
1060
                    accounted_for = update_accounted_for_with_linking(accounted_for,
×
1061
                                                                  files_to_link)
1062
                else:
1063
                    err = f"linkcal job exists but no files given: {files_to_link=}"
×
1064
                    log.error(err)
×
1065
                    raise ValueError(err)
×
1066
            elif jobtype == 'ccdcalib':
×
1067
                possible_ccd_files = set(['biasnight', 'badcolumns', 'ctecorrnight'])
×
1068
                if files_to_link is None:
×
1069
                    files_accounted_for = possible_ccd_files
×
1070
                else:
1071
                    files_accounted_for = possible_ccd_files.difference(files_to_link)
×
1072
                    ccd_files_linked = possible_ccd_files.intersection(files_to_link)
×
1073
                    log.info(f"Assuming existing ccdcalib job processed "
×
1074
                             + f"{files_accounted_for} since {ccd_files_linked} "
1075
                             + f"are linked.")
1076
                for fil in files_accounted_for:
×
1077
                    accounted_for[fil] = True
×
1078
            else:
1079
                accounted_for[job_to_file_map[jobtype]] = True
×
1080

1081
    calibjobs['accounted_for'] = accounted_for
1✔
1082
    return calibjobs
1✔
1083

1084
def update_accounted_for_with_linking(accounted_for, files_to_link):
1✔
1085
    """
1086
    This takes in a dictionary summarizing the calibration files accounted for
1087
     and updates it based on the files_to_link, which are assumed to have
1088
     already been linked such that those files already exist on disk and
1089
     don't need ot be generated.
1090

1091
    Parameters
1092
    ----------
1093
        accounted_for: dict
1094
            Dictionary containing 'biasnight', 'badcolumns', 'ctecorrnight',
1095
            'psfnight', and 'fiberflatnight'. Each value is True if file is
1096
            accounted for and False if it is not.
1097
        files_to_link: set
1098
            Set of filenames that the linkcal job will link.
1099

1100
    Returns
1101
    -------
1102
        accounted_for: dict
1103
            Dictionary containing 'biasnight', 'badcolumns', 'ctecorrnight',
1104
            'psfnight', and 'fiberflatnight'. Each value is True if file is
1105
            accounted for and False if it is not.
1106
    """
1107
    log = get_logger()
1✔
1108
    
1109
    for fil in files_to_link:
1✔
1110
        if fil in accounted_for:
1✔
1111
            accounted_for[fil] = True
1✔
1112
        else:
1113
            err = f"{fil} doesn't match an expected filetype: "
×
1114
            err += f"{accounted_for.keys()}"
×
1115
            log.error(err)
×
1116
            raise ValueError(err)
×
1117

1118
    return accounted_for
1✔
1119

1120
def all_calibs_submitted(accounted_for, do_cte_flats):
1✔
1121
    """
1122
    Function that returns the boolean logic to determine if the necessary
1123
    calibration jobs have been submitted for calibration.
1124

1125
    Args:
1126
        accounted_for, dict, Dictionary with keys corresponding to the calibration
1127
            filenames and values of True or False.
1128
        do_cte_flats, bool, whether ctecorrnight files are expected or not.
1129

1130
    Returns:
1131
        bool, True if all necessary calibrations have been submitted or handled, False otherwise.
1132
    """
1133
    test_dict = accounted_for.copy()
1✔
1134
    if not do_cte_flats:
1✔
1135
        test_dict.pop('ctecorrnight')
1✔
1136

1137
    return np.all(list(test_dict.values()))
1✔
1138

1139
def update_and_recursively_submit(proc_table, submits=0, resubmission_states=None,
1✔
1140
                                  ptab_name=None, dry_run=0, reservation=None):
1141
    """
1142
    Given an processing table, this loops over job rows and resubmits failed jobs (as defined by resubmission_states).
1143
    Before submitting a job, it checks the dependencies for failures. If a dependency needs to be resubmitted, it recursively
1144
    follows dependencies until it finds the first job without a failed dependency and resubmits that. Then resubmits the
1145
    other jobs with the new Slurm jobID's for proper dependency coordination within Slurm.
1146

1147
    Args:
1148
        proc_table, Table, the processing table with a row per job.
1149
        submits, int, the number of submissions made to the queue. Used for saving files and in not overloading the scheduler.
1150
        resubmission_states, list or array of strings, each element should be a capitalized string corresponding to a
1151
            possible Slurm scheduler state, where you wish for jobs with that
1152
            outcome to be resubmitted
1153
        ptab_name, str, the full pathname where the processing table should be saved.
1154
        dry_run, int, If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1155
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1156
            for testing as though scripts are being submitted. Default is 0 (false).
1157
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
1158

1159
    Returns:
1160
        tuple: A tuple containing:
1161

1162
        * proc_table: Table, a table with the same rows as the input except that Slurm and jobid relevant columns have
1163
          been updated for those jobs that needed to be resubmitted.
1164
        * submits: int, the number of submissions made to the queue. This is incremented from the input submits, so it is
1165
          the number of submissions made from this function call plus the input submits value.
1166

1167
    Note:
1168
        This modifies the inputs of both proc_table and submits and returns them.
1169
    """
1170
    log = get_logger()
1✔
1171
    if resubmission_states is None:
1✔
1172
        resubmission_states = get_resubmission_states()
1✔
1173
    log.info(f"Resubmitting jobs with current states in the following: {resubmission_states}")
1✔
1174
    proc_table = update_from_queue(proc_table, dry_run=dry_run)
1✔
1175
    log.info("Updated processing table queue information:")
1✔
1176
    cols = ['INTID', 'INT_DEP_IDS', 'EXPID', 'TILEID',
1✔
1177
            'OBSTYPE', 'JOBDESC', 'LATEST_QID', 'STATUS']
1178
    log.info(np.array(cols))
1✔
1179
    for row in proc_table:
1✔
1180
        log.info(np.array(row[cols]))
1✔
1181
    log.info("\n")
1✔
1182
    id_to_row_map = {row['INTID']: rown for rown, row in enumerate(proc_table)}
1✔
1183
    for rown in range(len(proc_table)):
1✔
1184
        if proc_table['STATUS'][rown] in resubmission_states:
1✔
1185
            proc_table, submits = recursive_submit_failed(rown, proc_table, submits,
1✔
1186
                                                          id_to_row_map, ptab_name,
1187
                                                          resubmission_states,
1188
                                                          reservation, dry_run)
1189
    proc_table = update_from_queue(proc_table, dry_run=dry_run)
1✔
1190
    return proc_table, submits
1✔
1191

1192
def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name=None,
1✔
1193
                            resubmission_states=None, reservation=None, dry_run=0):
1194
    """
1195
    Given a row of a processing table and the full processing table, this resubmits the given job.
1196
    Before submitting a job, it checks the dependencies for failures in the processing table. If a dependency needs to
1197
    be resubmitted, it recursively follows dependencies until it finds the first job without a failed dependency and
1198
    resubmits that. Then resubmits the other jobs with the new Slurm jobID's for proper dependency coordination within Slurm.
1199

1200
    Args:
1201
        rown, Table.Row, the row of the processing table that you want to resubmit.
1202
        proc_table, Table, the processing table with a row per job.
1203
        submits, int, the number of submissions made to the queue. Used for saving files and in not overloading the scheduler.
1204
        id_to_row_map, dict, lookup dictionary where the keys are internal ids (INTID's) and the values are the row position
1205
            in the processing table.
1206
        ptab_name, str, the full pathname where the processing table should be saved.
1207
        resubmission_states, list or array of strings, each element should be a capitalized string corresponding to a
1208
            possible Slurm scheduler state, where you wish for jobs with that
1209
            outcome to be resubmitted
1210
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
1211
        dry_run, int, If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1212
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1213
            for testing as though scripts are being submitted. Default is 0 (false).
1214

1215
    Returns:
1216
        tuple: A tuple containing:
1217

1218
        * proc_table: Table, a table with the same rows as the input except that Slurm and jobid relevant columns have
1219
          been updated for those jobs that needed to be resubmitted.
1220
        * submits: int, the number of submissions made to the queue. This is incremented from the input submits, so it is
1221
          the number of submissions made from this function call plus the input submits value.
1222

1223
    Note:
1224
        This modifies the inputs of both proc_table and submits and returns them.
1225
    """
1226
    log = get_logger()
1✔
1227
    row = proc_table[rown]
1✔
1228
    log.info(f"Identified row {row['INTID']} as needing resubmission.")
1✔
1229
    log.info(f"{row['INTID']}: Expid(s): {row['EXPID']}  Job: {row['JOBDESC']}")
1✔
1230
    if resubmission_states is None:
1✔
1231
        resubmission_states = get_resubmission_states()
×
1232
    ideps = proc_table['INT_DEP_IDS'][rown]
1✔
1233
    if ideps is None or len(ideps)==0:
1✔
1234
        proc_table['LATEST_DEP_QID'][rown] = np.ndarray(shape=0).astype(int)
×
1235
    else:
1236
        all_valid_states = list(resubmission_states.copy())
1✔
1237
        good_states = ['RUNNING','PENDING','SUBMITTED','COMPLETED']
1✔
1238
        all_valid_states.extend(good_states)
1✔
1239
        othernight_idep_qid_lookup = {}
1✔
1240
        for idep in np.sort(np.atleast_1d(ideps)):
1✔
1241
            if idep not in id_to_row_map:
1✔
1242
                if idep // 1000 != row['INTID'] // 1000:
1✔
1243
                    log.info(f"Internal ID: {idep} not in id_to_row_map. "
1✔
1244
                             + "This is expected since it's from another day. ")
1245
                    reference_night = 20000000 + (idep // 1000)
1✔
1246
                    reftab = read_minimal_full_proctab_cols(nights=[reference_night])
1✔
1247
                    if reftab is None:
1✔
1248
                        msg = f"The dependency is from night={reference_night}" \
×
1249
                              + f" but read_minimal_full_proctab_cols couldn't" \
1250
                              + f" locate that processing table, this is a " \
1251
                              +  f"fatal error."
1252
                        log.critical(msg)
×
1253
                        raise ValueError(msg)
×
1254
                    reftab = update_from_queue(reftab, dry_run=dry_run)
1✔
1255
                    entry = reftab[reftab['INTID'] == idep][0]
1✔
1256
                    if entry['STATUS'] not in good_states:
1✔
1257
                        msg = f"Internal ID: {idep} not in id_to_row_map. " \
1✔
1258
                              + f"Since the dependency is from night={reference_night} " \
1259
                              + f"and that job isn't in a good state this is an " \
1260
                              + f"error we can't overcome."
1261
                        log.error(msg)
1✔
1262
                        proc_table['STATUS'][rown] = "DEP_NOT_SUBD"
1✔
1263
                        return proc_table, submits
1✔
1264
                    else:
1265
                        ## otherwise all is good, just update the cache to use this
1266
                        ## in the next stage
1267
                        othernight_idep_qid_lookup[idep] = entry['LATEST_QID']
1✔
1268
                        update_full_ptab_cache(reftab)
1✔
1269
                else:
1270
                    msg = f"Internal ID: {idep} not in id_to_row_map. " \
×
1271
                         + f"Since the dependency is from the same night" \
1272
                         + f" and we can't find it, this is a fatal error."
1273
                    log.critical(msg)
×
1274
                    raise ValueError(msg)
×
1275
            elif proc_table['STATUS'][id_to_row_map[idep]] not in all_valid_states:
1✔
1276
                log.error(f"Proc INTID: {proc_table['INTID'][rown]} depended on" +
×
1277
                            f" INTID {proc_table['INTID'][id_to_row_map[idep]]}" +
1278
                            f" but that exposure has state" +
1279
                            f" {proc_table['STATUS'][id_to_row_map[idep]]} that" +
1280
                            f" isn't in the list of resubmission states." +
1281
                            f" Exiting this job's resubmission attempt.")
1282
                proc_table['STATUS'][rown] = "DEP_NOT_SUBD"
×
1283
                return proc_table, submits
×
1284
        qdeps = []
1✔
1285
        for idep in np.sort(np.atleast_1d(ideps)):
1✔
1286
            if idep in id_to_row_map:
1✔
1287
                if proc_table['STATUS'][id_to_row_map[idep]] in resubmission_states:
1✔
1288
                    proc_table, submits = recursive_submit_failed(id_to_row_map[idep],
×
1289
                                                                  proc_table, submits,
1290
                                                                  id_to_row_map,
1291
                                                                  reservation=reservation,
1292
                                                                  dry_run=dry_run)
1293
                ## Now that we've resubmitted the dependency if necessary,
1294
                ## add the most recent QID to the list
1295
                qdeps.append(proc_table['LATEST_QID'][id_to_row_map[idep]])
1✔
1296
            else:
1297
                ## Since we verified above that the cross night QID is still
1298
                ## either pending or successful, add that to the list of QID's
1299
                qdeps.append(othernight_idep_qid_lookup[idep])
1✔
1300

1301
        qdeps = np.atleast_1d(qdeps)
1✔
1302
        if len(qdeps) > 0:
1✔
1303
            proc_table['LATEST_DEP_QID'][rown] = qdeps
1✔
1304
        else:
1305
            log.error(f"number of qdeps should be 1 or more: Rown {rown}, ideps {ideps}")
×
1306

1307
    proc_table[rown] = submit_batch_script(proc_table[rown], reservation=reservation,
1✔
1308
                                           strictly_successful=True, dry_run=dry_run)
1309
    submits += 1
1✔
1310

1311
    if not dry_run:
1✔
1312
        if ptab_name is None:
×
1313
            write_table(proc_table, tabletype='processing', overwrite=True)
×
1314
        else:
1315
            write_table(proc_table, tablename=ptab_name, overwrite=True)
×
1316
        sleep_and_report(0.1 + 0.1*(submits % 10 == 0),
×
1317
                         message_suffix=f"after submitting job to queue and writing proctable")
1318
    return proc_table, submits
1✔
1319

1320

1321
#########################################
1322
########     Joint fit     ##############
1323
#########################################
1324
def joint_fit(ptable, prows, internal_id, queue, reservation, descriptor, z_submit_types=None,
1✔
1325
              dry_run=0, strictly_successful=False, check_for_outputs=True, resubmit_partial_complete=True,
1326
              system_name=None):
1327
    """
1328
    DEPRECATED
1329
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1330
    joint fitting job given by descriptor. If the joint fitting job is standard star fitting, the post standard star fits
1331
    for all the individual exposures also created and submitted. The returned ptable has all of these rows added to the
1332
    table given as input.
1333

1334
    Args:
1335
        ptable (Table): The processing table where each row is a processed job.
1336
        prows (list or array of dict): The rows corresponding to the individual exposure jobs that are
1337
            inputs to the joint fit.
1338
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1339
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1340
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1341
        descriptor (str): Description of the joint fitting job. Can either be 'science' or 'stdstarfit', 'arc' or 'psfnight',
1342
            or 'flat' or 'nightlyflat'.
1343
        z_submit_types (list of str, optional): The "group" types of redshifts that should be submitted with each
1344
            exposure. If not specified or None, then no redshifts are submitted.
1345
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1346
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1347
            for testing as though scripts are being submitted. Default is 0 (false).
1348
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1349
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1350
            than failing completely from failed calibrations. Default is False.
1351
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1352
            data products for the script being submitted. If all files exist and this is True,
1353
            then the script will not be submitted. If some files exist and this is True, only the
1354
            subset of the cameras without the final data products will be generated and submitted.
1355
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1356
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1357
            remaining cameras not found to exist.
1358
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1359

1360
    Returns:
1361
        tuple: A tuple containing:
1362

1363
        * ptable, Table. The same processing table as input except with added rows for the joint fit job and, in the case
1364
          of a stdstarfit, the poststdstar science exposure jobs.
1365
        * joint_prow, dict. Row of a processing table corresponding to the joint fit job.
1366
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1367
    """
1368
    log = get_logger()
×
1369
    if len(prows) < 1:
×
1370
        return ptable, None, internal_id
×
1371

1372
    if descriptor is None:
×
1373
        return ptable, None
×
1374
    elif descriptor == 'arc':
×
1375
        descriptor = 'psfnight'
×
1376
    elif descriptor == 'flat':
×
1377
        descriptor = 'nightlyflat'
×
1378
    elif descriptor == 'science':
×
1379
        if z_submit_types is None or len(z_submit_types) == 0:
×
1380
            descriptor = 'stdstarfit'
×
1381

1382
    if descriptor not in ['psfnight', 'nightlyflat', 'science','stdstarfit']:
×
1383
        return ptable, None, internal_id
×
1384

1385
    log.info(" ")
×
1386
    log.info(f"Joint fit criteria found. Running {descriptor}.\n")
×
1387

1388
    if descriptor == 'science':
×
1389
        joint_prow, internal_id = make_joint_prow(prows, descriptor='stdstarfit', internal_id=internal_id)
×
1390
    else:
1391
        joint_prow, internal_id = make_joint_prow(prows, descriptor=descriptor, internal_id=internal_id)
×
1392
    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1393
                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1394
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1395
    ptable.add_row(joint_prow)
×
1396

1397
    if descriptor in ['science','stdstarfit']:
×
1398
        if descriptor == 'science':
×
1399
            zprows = []
×
1400
        log.info(" ")
×
1401
        log.info(f"Submitting individual science exposures now that joint fitting of standard stars is submitted.\n")
×
1402
        for row in prows:
×
1403
            if row['LASTSTEP'] == 'stdstarfit':
×
1404
                continue
×
1405
            row['JOBDESC'] = 'poststdstar'
×
1406

1407
            # poststdstar job can't process cameras not included in its stdstar joint fit
1408
            stdcamword = joint_prow['PROCCAMWORD']
×
1409
            thiscamword = row['PROCCAMWORD']
×
1410
            proccamword = camword_intersection([stdcamword, thiscamword])
×
1411
            if proccamword != thiscamword:
×
1412
                dropcams = difference_camwords(thiscamword, proccamword)
×
1413
                assert dropcams != ''  #- i.e. if they differ, we should be dropping something
×
1414
                log.warning(f"Dropping exp {row['EXPID']} poststdstar cameras {dropcams} since they weren't included in stdstar fit {stdcamword}")
×
1415
                row['PROCCAMWORD'] = proccamword
×
1416

1417
            row['INTID'] = internal_id
×
1418
            internal_id += 1
×
1419
            row['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
×
1420
            row = assign_dependency(row, joint_prow)
×
1421
            row = create_and_submit(row, queue=queue, reservation=reservation, dry_run=dry_run,
×
1422
                                    strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1423
                                    resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1424
            ptable.add_row(row)
×
1425
            if descriptor == 'science' and row['LASTSTEP'] == 'all':
×
1426
                zprows.append(row)
×
1427

1428
    ## Now run redshifts
1429
    if descriptor == 'science' and len(zprows) > 0 and z_submit_types is not None:
×
1430
        prow_selection = (  (ptable['OBSTYPE'] == 'science')
×
1431
                          & (ptable['LASTSTEP'] == 'all')
1432
                          & (ptable['JOBDESC'] == 'poststdstar')
1433
                          & (ptable['TILEID'] == int(zprows[0]['TILEID'])) )
1434
        nightly_zprows = []
×
1435
        if np.sum(prow_selection) == len(zprows):
×
1436
            nightly_zprows = zprows.copy()
×
1437
        else:
1438
            for prow in ptable[prow_selection]:
×
1439
                nightly_zprows.append(table_row_to_dict(prow))
×
1440

1441
        for zsubtype in z_submit_types:
×
1442
            if zsubtype == 'perexp':
×
1443
                for zprow in zprows:
×
1444
                    log.info(" ")
×
1445
                    log.info(f"Submitting redshift fit of type {zsubtype} for TILEID {zprow['TILEID']} and EXPID {zprow['EXPID']}.\n")
×
1446
                    joint_prow, internal_id = make_joint_prow([zprow], descriptor=zsubtype, internal_id=internal_id)
×
1447
                    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1448
                                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1449
                                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1450
                    ptable.add_row(joint_prow)
×
1451
            else:
1452
                log.info(" ")
×
1453
                log.info(f"Submitting joint redshift fits of type {zsubtype} for TILEID {nightly_zprows[0]['TILEID']}.")
×
1454
                expids = [prow['EXPID'][0] for prow in nightly_zprows]
×
1455
                log.info(f"Expids: {expids}.\n")
×
1456
                joint_prow, internal_id = make_joint_prow(nightly_zprows, descriptor=zsubtype, internal_id=internal_id)
×
1457
                joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1458
                                               strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1459
                                               resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1460
                ptable.add_row(joint_prow)
×
1461

1462
    if descriptor in ['psfnight', 'nightlyflat']:
×
1463
        log.info(f"Setting the calibration exposures as calibrators in the processing table.\n")
×
1464
        ptable = set_calibrator_flag(prows, ptable)
×
1465

1466
    return ptable, joint_prow, internal_id
×
1467

1468
def joint_cal_fit(descriptor, ptable, prows, internal_id, queue, reservation,
1✔
1469
                  dry_run=0, strictly_successful=False, check_for_outputs=True,
1470
                  resubmit_partial_complete=True, system_name=None):
1471
    """
1472
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1473
    joint fitting job given by descriptor. If the joint fitting job is standard star fitting, the post standard star fits
1474
    for all the individual exposures also created and submitted. The returned ptable has all of these rows added to the
1475
    table given as input.
1476

1477
    Args:
1478
        descriptor (str): Description of the joint fitting job. Can either be 'science' or 'stdstarfit', 'arc' or 'psfnight',
1479
            or 'flat' or 'nightlyflat'.
1480
        prows (list or array of dict): The rows corresponding to the individual exposure jobs that are
1481
            inputs to the joint fit.
1482
        ptable (Table): The processing table where each row is a processed job.
1483
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1484
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1485
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1486
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1487
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1488
            for testing as though scripts are being submitted. Default is 0 (false).
1489
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1490
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1491
            than failing completely from failed calibrations. Default is False.
1492
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1493
            data products for the script being submitted. If all files exist and this is True,
1494
            then the script will not be submitted. If some files exist and this is True, only the
1495
            subset of the cameras without the final data products will be generated and submitted.
1496
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1497
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1498
            remaining cameras not found to exist.
1499
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1500

1501
    Returns:
1502
        tuple: A tuple containing:
1503

1504
        * ptable, Table. The same processing table as input except with added rows for the joint fit job and, in the case
1505
          of a stdstarfit, the poststdstar science exposure jobs.
1506
        * joint_prow, dict. Row of a processing table corresponding to the joint fit job.
1507
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1508
    """
1509
    log = get_logger()
×
1510
    if len(prows) < 1:
×
1511
        return ptable, None, internal_id
×
1512

1513
    if descriptor is None:
×
1514
        return ptable, None
×
1515
    elif descriptor == 'arc':
×
1516
        descriptor = 'psfnight'
×
1517
    elif descriptor == 'flat':
×
1518
        descriptor = 'nightlyflat'
×
1519

1520
    if descriptor not in ['psfnight', 'nightlyflat']:
×
1521
        return ptable, None, internal_id
×
1522

1523
    log.info(" ")
×
1524
    log.info(f"Joint fit criteria found. Running {descriptor}.\n")
×
1525

1526
    joint_prow, internal_id = make_joint_prow(prows, descriptor=descriptor, internal_id=internal_id)
×
1527
    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1528
                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1529
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1530
    ptable.add_row(joint_prow)
×
1531

1532
    if descriptor in ['psfnight', 'nightlyflat']:
×
1533
        log.info(f"Setting the calibration exposures as calibrators in the processing table.\n")
×
1534
        ptable = set_calibrator_flag(prows, ptable)
×
1535

1536
    return ptable, joint_prow, internal_id
×
1537

1538

1539
#########################################
1540
########     Redshifts     ##############
1541
#########################################
1542
def submit_redshifts(ptable, prows, tnight, internal_id, queue, reservation,
1✔
1543
              dry_run=0, strictly_successful=False,
1544
              check_for_outputs=True, resubmit_partial_complete=True,
1545
              z_submit_types=None, system_name=None):
1546
    """
1547
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1548
    tilenight job given by descriptor. The returned ptable has all of these rows added to the
1549
    table given as input.
1550

1551
    Args:
1552
        ptable (Table): The processing table where each row is a processed job.
1553
        prows (list or array of dict): Unsubmitted prestdstar jobs that are first steps in tilenight.
1554
        tnight (Table.Row): The processing table row of the tilenight job on which the redshifts depend.
1555
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1556
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1557
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1558
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1559
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1560
            for testing as though scripts are being submitted. Default is 0 (false).
1561
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1562
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1563
            than failing completely from failed calibrations. Default is False.
1564
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1565
            data products for the script being submitted. If all files exist and this is True,
1566
            then the script will not be submitted. If some files exist and this is True, only the
1567
            subset of the cameras without the final data products will be generated and submitted.
1568
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1569
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1570
            remaining cameras not found to exist.
1571
        z_submit_types (list of str): The "group" types of redshifts that should be submitted with each
1572
            exposure. If not specified or None, then no redshifts are submitted.
1573
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1574

1575
    Returns:
1576
        tuple: A tuple containing:
1577

1578
        * ptable, Table. The same processing table as input except with added rows for the joint fit job.
1579
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1580
    """
1581
    log = get_logger()
1✔
1582
    if len(prows) < 1 or z_submit_types == None:
1✔
1583
        return ptable, internal_id
1✔
1584

1585
    log.info(" ")
1✔
1586
    log.info(f"Running redshifts.\n")
1✔
1587

1588
    ## Now run redshifts
1589
    zprows = []
1✔
1590
    for row in prows:
1✔
1591
        if row['LASTSTEP'] == 'all':
1✔
1592
            zprows.append(row)
1✔
1593

1594
    if len(zprows) > 0:
1✔
1595
        for zsubtype in z_submit_types:
1✔
1596
            log.info(" ")
1✔
1597
            log.info(f"Submitting joint redshift fits of type {zsubtype} for TILEID {zprows[0]['TILEID']}.")
1✔
1598
            if zsubtype == 'perexp':
1✔
1599
                for zprow in zprows:
×
1600
                    log.info(f"EXPID: {zprow['EXPID']}.\n")
×
1601
                    redshift_prow = make_redshift_prow([zprow], tnight, descriptor=zsubtype, internal_id=internal_id)
×
1602
                    internal_id += 1
×
1603
                    redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1604
                                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1605
                                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1606
                    ptable.add_row(redshift_prow)
×
1607
            elif zsubtype == 'cumulative':
1✔
1608
                tileids = np.unique([prow['TILEID'] for prow in zprows])
1✔
1609
                if len(tileids) > 1:
1✔
1610
                    msg = f"Error, more than one tileid provided for cumulative redshift job: {tileids}"
×
1611
                    log.critical(msg)
×
1612
                    raise ValueError(msg)
×
1613
                nights = np.unique([prow['NIGHT'] for prow in zprows])
1✔
1614
                if len(nights) > 1:
1✔
1615
                    msg = f"Error, more than one night provided for cumulative redshift job: {nights}"
×
1616
                    log.critical(msg)
×
1617
                    raise ValueError(msg)
×
1618
                tileid, night = tileids[0], nights[0]
1✔
1619
                ## For cumulative redshifts, get any existing processing rows for tile
1620
                matched_prows = read_minimal_tilenight_proctab_cols(tileids=tileids)
1✔
1621
                ## Identify the processing rows that should be assigned as dependecies
1622
                ## tnight should be first such that the new job inherits the other metadata from it
1623
                tnights = [tnight]
1✔
1624
                if matched_prows is not None:
1✔
1625
                    matched_prows = matched_prows[matched_prows['NIGHT'] <= night]
1✔
1626
                    for prow in matched_prows:
1✔
1627
                        if prow['INTID'] != tnight['INTID']:
1✔
1628
                            tnights.append(prow)
1✔
1629
                log.info(f"Internal Processing IDs: {[prow['INTID'] for prow in tnights]}.\n")
1✔
1630
                ## Identify all exposures that should go into the fit
1631
                expids = [prow['EXPID'][0] for prow in zprows]
1✔
1632
                ## note we can actually get the full list of exposures, but for now
1633
                ## we'll stay consistent with old processing where we only list exposures
1634
                ## from the current night
1635
                ## For cumulative redshifts, get valid expids from exptables
1636
                #matched_erows = read_minimal_science_exptab_cols(tileids=tileids)
1637
                #matched_erows = matched_erows[matched_erows['NIGHT']<=night]
1638
                #expids = list(set([prow['EXPID'][0] for prow in zprows])+set(matched_erows['EXPID']))
1639
                log.info(f"Expids: {expids}.\n")
1✔
1640
                redshift_prow, internal_id = make_joint_prow(tnights, descriptor=zsubtype, internal_id=internal_id)
1✔
1641
                redshift_prow['EXPID'] = expids
1✔
1642
                redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
1✔
1643
                                               strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1644
                                               resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1645
                ptable.add_row(redshift_prow)
1✔
1646
            else: # pernight
1647
                expids = [prow['EXPID'][0] for prow in zprows]
×
1648
                log.info(f"Expids: {expids}.\n")
×
NEW
1649
                redshift_prow = make_redshift_prow(zprows, tnight, descriptor=zsubtype, internal_id=internal_id)
×
NEW
1650
                internal_id += 1
×
1651
                redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1652
                                               strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1653
                                               resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1654
                ptable.add_row(redshift_prow)
×
1655

1656
    return ptable, internal_id
1✔
1657

1658
#########################################
1659
########     Tilenight     ##############
1660
#########################################
1661
def submit_tilenight(ptable, prows, calibjobs, internal_id, queue, reservation,
1✔
1662
              dry_run=0, strictly_successful=False, resubmit_partial_complete=True,
1663
              system_name=None, use_specter=False, extra_job_args=None):
1664
    """
1665
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1666
    tilenight job given by descriptor. The returned ptable has all of these rows added to the
1667
    table given as input.
1668

1669
    Args:
1670
        ptable (Table): The processing table where each row is a processed job.
1671
        prows (list or array of dict): Unsubmitted prestdstar jobs that are first steps in tilenight.
1672
        calibjobs (dict): Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
1673
            and 'nightlyflat'. Each key corresponds to a Table.Row or
1674
            None. The table.Row() values are for the corresponding
1675
            calibration job.
1676
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1677
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1678
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1679
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1680
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1681
            for testing as though scripts are being submitted. Default is 0 (false).
1682
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1683
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1684
            than failing completely from failed calibrations. Default is False.
1685
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1686
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1687
            remaining cameras not found to exist.
1688
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1689
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
1690
        extra_job_args (dict): Dictionary with key-value pairs that specify additional
1691
            information used for a specific type of job. Examples include
1692
            laststeps for for tilenight, etc.
1693

1694
    Returns:
1695
        tuple: A tuple containing:
1696

1697
        * ptable, Table. The same processing table as input except with added rows for the joint fit job.
1698
        * tnight_prow, dict. Row of a processing table corresponding to the tilenight job.
1699
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1700
    """
1701
    log = get_logger()
1✔
1702
    if len(prows) < 1:
1✔
1703
        return ptable, None, internal_id
×
1704

1705
    log.info(" ")
1✔
1706
    log.info(f"Running tilenight.\n")
1✔
1707

1708
    tnight_prow = make_tnight_prow(prows, calibjobs, internal_id=internal_id)
1✔
1709
    internal_id += 1
1✔
1710
    tnight_prow = create_and_submit(tnight_prow, queue=queue, reservation=reservation, dry_run=dry_run,
1✔
1711
                                   strictly_successful=strictly_successful, check_for_outputs=False,
1712
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name,
1713
                                   use_specter=use_specter, extra_job_args=extra_job_args)
1714
    ptable.add_row(tnight_prow)
1✔
1715

1716
    return ptable, tnight_prow, internal_id
1✔
1717

1718
## wrapper functions for joint fitting
1719
def science_joint_fit(ptable, sciences, internal_id, queue='realtime', reservation=None,
1✔
1720
                      z_submit_types=None, dry_run=0, strictly_successful=False,
1721
                      check_for_outputs=True, resubmit_partial_complete=True,
1722
                      system_name=None):
1723
    """
1724
    Wrapper function for desiproc.workflow.processing.joint_fit specific to the stdstarfit joint fit and redshift fitting.
1725

1726
    All variables are the same except::
1727

1728
        Arg 'sciences' is mapped to the prows argument of joint_fit.
1729
        The joint_fit argument descriptor is pre-defined as 'science'.
1730
    """
1731
    return joint_fit(ptable=ptable, prows=sciences, internal_id=internal_id, queue=queue, reservation=reservation,
×
1732
                     descriptor='science', z_submit_types=z_submit_types, dry_run=dry_run,
1733
                     strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1734
                     resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1735

1736

1737
def flat_joint_fit(ptable, flats, internal_id, queue='realtime',
1✔
1738
                   reservation=None, dry_run=0, strictly_successful=False,
1739
                   check_for_outputs=True, resubmit_partial_complete=True,
1740
                   system_name=None):
1741
    """
1742
    Wrapper function for desiproc.workflow.processing.joint_fit specific to the nightlyflat joint fit.
1743

1744
    All variables are the same except::
1745

1746
        Arg 'flats' is mapped to the prows argument of joint_fit.
1747
        The joint_fit argument descriptor is pre-defined as 'nightlyflat'.
1748
    """
1749
    return joint_fit(ptable=ptable, prows=flats, internal_id=internal_id, queue=queue, reservation=reservation,
×
1750
                     descriptor='nightlyflat', dry_run=dry_run, strictly_successful=strictly_successful,
1751
                     check_for_outputs=check_for_outputs, resubmit_partial_complete=resubmit_partial_complete,
1752
                     system_name=system_name)
1753

1754

1755
def arc_joint_fit(ptable, arcs, internal_id, queue='realtime',
1✔
1756
                  reservation=None, dry_run=0, strictly_successful=False,
1757
                  check_for_outputs=True, resubmit_partial_complete=True,
1758
                  system_name=None):
1759
    """
1760
    Wrapper function for desiproc.workflow.processing.joint_fit specific to the psfnight joint fit.
1761

1762
    All variables are the same except::
1763

1764
        Arg 'arcs' is mapped to the prows argument of joint_fit.
1765
        The joint_fit argument descriptor is pre-defined as 'psfnight'.
1766
    """
1767
    return joint_fit(ptable=ptable, prows=arcs, internal_id=internal_id, queue=queue, reservation=reservation,
×
1768
                     descriptor='psfnight', dry_run=dry_run, strictly_successful=strictly_successful,
1769
                     check_for_outputs=check_for_outputs, resubmit_partial_complete=resubmit_partial_complete,
1770
                     system_name=system_name)
1771

1772
def make_joint_prow(prows, descriptor, internal_id):
1✔
1773
    """
1774
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
1775
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
1776
    input prows).
1777

1778
    Args:
1779
        prows, list or array of dicts. The rows corresponding to the individual exposure jobs that are
1780
            inputs to the joint fit.
1781
        descriptor, str. Description of the joint fitting job. Can either be 'stdstarfit', 'psfnight', or 'nightlyflat'.
1782
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1783

1784
    Returns:
1785
        dict: Row of a processing table corresponding to the joint fit job.
1786
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1787
    """
1788
    log = get_logger()
1✔
1789
    first_row = table_row_to_dict(prows[0])
1✔
1790
    joint_prow = first_row.copy()
1✔
1791

1792
    joint_prow['INTID'] = internal_id
1✔
1793
    internal_id += 1
1✔
1794
    joint_prow['JOBDESC'] = descriptor
1✔
1795
    joint_prow['LATEST_QID'] = -99
1✔
1796
    joint_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
1✔
1797
    joint_prow['SUBMIT_DATE'] = -99
1✔
1798
    joint_prow['STATUS'] = 'UNSUBMITTED'
1✔
1799
    joint_prow['SCRIPTNAME'] = ''
1✔
1800
    joint_prow['EXPID'] = np.unique(np.concatenate([currow['EXPID'] for currow in prows])).astype(int)
1✔
1801

1802
    ## Assign the PROCCAMWORD based on the descriptor and the input exposures
1803
    ## UPDATE 2024-04-24: badamps are now included in arc/flat joint fits,
1804
    ## so grab all PROCCAMWORDs instead of filtering out BADAMP cameras
1805
    ## For flats we want any camera that exists in all 12 exposures
1806
    ## For arcs we want any camera that exists in at least 3 exposures
1807
    pcamwords = [prow['PROCCAMWORD'] for prow in prows]
1✔
1808
    if descriptor in 'stdstarfit':
1✔
1809
        joint_prow['PROCCAMWORD'] = camword_union(pcamwords,
×
1810
                                                  full_spectros_only=True)
1811
    elif descriptor in ['pernight', 'cumulative']:
1✔
1812
        joint_prow['PROCCAMWORD'] = camword_union(pcamwords,
1✔
1813
                                                  full_spectros_only=False)
1814
    elif descriptor == 'nightlyflat':
1✔
1815
        joint_prow['PROCCAMWORD'] = camword_intersection(pcamwords,
1✔
1816
                                                         full_spectros_only=False)
1817
    elif descriptor == 'psfnight':
1✔
1818
        ## Count number of exposures each camera is present for
1819
        camcheck = {}
1✔
1820
        for camword in pcamwords:
1✔
1821
            for cam in decode_camword(camword):
1✔
1822
                if cam in camcheck:
1✔
1823
                    camcheck[cam] += 1
1✔
1824
                else:
1825
                    camcheck[cam] = 1
1✔
1826
        ## if exists in 3 or more exposures, then include it
1827
        goodcams = []
1✔
1828
        for cam,camcount in camcheck.items():
1✔
1829
            if camcount >= 3:
1✔
1830
                goodcams.append(cam)
1✔
1831
        joint_prow['PROCCAMWORD'] = create_camword(goodcams)
1✔
1832
    else:
1833
        log.warning("Warning asked to produce joint proc table row for unknown"
×
1834
                    + f" job description {descriptor}")
1835

1836
    joint_prow = assign_dependency(joint_prow, dependency=prows)
1✔
1837
    return joint_prow, internal_id
1✔
1838

1839
def make_exposure_prow(erow, int_id, calibjobs, jobdesc=None):
1✔
1840
    prow = erow_to_prow(erow)
1✔
1841
    prow['INTID'] = int_id
1✔
1842
    int_id += 1
1✔
1843
    if jobdesc is None:
1✔
1844
        prow['JOBDESC'] = prow['OBSTYPE']
1✔
1845
    else:
1846
        prow['JOBDESC'] = jobdesc
1✔
1847
    prow = define_and_assign_dependency(prow, calibjobs)
1✔
1848
    return prow, int_id
1✔
1849

1850
def make_tnight_prow(prows, calibjobs, internal_id):
1✔
1851
    """
1852
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
1853
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
1854
    input prows).
1855

1856
    Args:
1857
        prows, list or array of dicts. Unsumbitted rows corresponding to the individual prestdstar jobs that are
1858
            the first steps of tilenight.
1859
        calibjobs, dict. Dictionary containing keys that each corresponds to a Table.Row or
1860
            None, with each table.Row() value corresponding to a calibration job
1861
            on which the tilenight job depends.
1862
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1863

1864
    Returns:
1865
        dict: Row of a processing table corresponding to the tilenight job.
1866
    """
1867
    first_row = table_row_to_dict(prows[0])
1✔
1868
    joint_prow = first_row.copy()
1✔
1869

1870
    joint_prow['INTID'] = internal_id
1✔
1871
    joint_prow['JOBDESC'] = 'tilenight'
1✔
1872
    joint_prow['LATEST_QID'] = -99
1✔
1873
    joint_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
1✔
1874
    joint_prow['SUBMIT_DATE'] = -99
1✔
1875
    joint_prow['STATUS'] = 'UNSUBMITTED'
1✔
1876
    joint_prow['SCRIPTNAME'] = ''
1✔
1877
    joint_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int)
1✔
1878

1879
    joint_prow = define_and_assign_dependency(joint_prow, calibjobs, use_tilenight=True)
1✔
1880

1881
    return joint_prow
1✔
1882

1883
def make_redshift_prow(prows, tnights, descriptor, internal_id):
1✔
1884
    """
1885
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
1886
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
1887
    input prows).
1888

1889
    Args:
1890
        prows, list or array of dicts. Unsumbitted rows corresponding to the individual prestdstar jobs that are
1891
            the first steps of tilenight.
1892
        tnights, list or array of Table.Row objects. Rows corresponding to the tilenight jobs on which the redshift job depends.
1893
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1894

1895
    Returns:
1896
        dict: Row of a processing table corresponding to the tilenight jobs.
1897
    """
1898
    first_row = table_row_to_dict(prows[0])
×
1899
    redshift_prow = first_row.copy()
×
1900

1901
    redshift_prow['INTID'] = internal_id
×
1902
    redshift_prow['JOBDESC'] = descriptor
×
1903
    redshift_prow['LATEST_QID'] = -99
×
1904
    redshift_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
×
1905
    redshift_prow['SUBMIT_DATE'] = -99
×
1906
    redshift_prow['STATUS'] = 'UNSUBMITTED'
×
1907
    redshift_prow['SCRIPTNAME'] = ''
×
1908
    redshift_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int)
×
1909

1910
    redshift_prow = assign_dependency(redshift_prow,dependency=tnights)
×
1911

1912
    return redshift_prow
×
1913

1914
def checkfor_and_submit_joint_job(ptable, arcs, flats, sciences, calibjobs,
1✔
1915
                                  lasttype, internal_id, z_submit_types=None, dry_run=0,
1916
                                  queue='realtime', reservation=None, strictly_successful=False,
1917
                                  check_for_outputs=True, resubmit_partial_complete=True,
1918
                                  system_name=None):
1919
    """
1920
    Takes all the state-ful data from daily processing and determines whether a joint fit needs to be submitted. Places
1921
    the decision criteria into a single function for easier maintainability over time. These are separate from the
1922
    new standard manifest*.json method of indicating a calibration sequence is complete. That is checked independently
1923
    elsewhere and doesn't interact with this.
1924

1925
    Args:
1926
        ptable (Table): Processing table of all exposures that have been processed.
1927
        arcs (list of dict): list of the individual arc jobs to be used for the psfnight (NOT all
1928
            the arcs, if multiple sets existed). May be empty if none identified yet.
1929
        flats (list of dict): list of the individual flat jobs to be used for the nightlyflat (NOT
1930
            all the flats, if multiple sets existed). May be empty if none identified yet.
1931
        sciences (list of dict): list of the most recent individual prestdstar science exposures
1932
            (if currently processing that tile). May be empty if none identified yet.
1933
        calibjobs (dict): Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
1934
            and 'nightlyflat'. Each key corresponds to a Table.Row or
1935
            None. The table.Row() values are for the corresponding
1936
            calibration job.
1937
        lasttype (str or None): the obstype of the last individual exposure row to be processed.
1938
        internal_id (int): an internal identifier unique to each job. Increments with each new job. This
1939
            is the smallest unassigned value.
1940
        z_submit_types (list of str): The "group" types of redshifts that should be submitted with each
1941
            exposure. If not specified or None, then no redshifts are submitted.
1942
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1943
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1944
            for testing as though scripts are being submitted. Default is 0 (false).
1945
        queue (str, optional): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1946
        reservation (str, optional): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1947
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1948
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1949
            than failing completely from failed calibrations. Default is False.
1950
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1951
            data products for the script being submitted. If all files exist and this is True,
1952
            then the script will not be submitted. If some files exist and this is True, only the
1953
            subset of the cameras without the final data products will be generated and submitted.
1954
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1955
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1956
            remaining cameras not found to exist.
1957
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, permutter-gpu
1958

1959
    Returns:
1960
        tuple: A tuple containing:
1961

1962
        * ptable, Table, Processing table of all exposures that have been processed.
1963
        * calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
1964
          and 'nightlyflat'. Each key corresponds to a Table.Row or
1965
          None. The table.Row() values are for the corresponding
1966
          calibration job.
1967
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
1968
          (if currently processing that tile). May be empty if none identified yet or
1969
          we just submitted them for processing.
1970
        * internal_id, int, if no job is submitted, this is the same as the input, otherwise it is incremented upward from
1971
          from the input such that it represents the smallest unused ID.
1972
    """
1973
    if lasttype == 'science' and len(sciences) > 0:
×
1974
        log = get_logger()
×
1975
        skysubonly = np.array([sci['LASTSTEP'] == 'skysub' for sci in sciences])
×
1976
        if np.all(skysubonly):
×
1977
            log.error("Identified all exposures in joint fitting request as skysub-only. Not submitting")
×
1978
            sciences = []
×
1979
            return ptable, calibjobs, sciences, internal_id
×
1980

1981
        if np.any(skysubonly):
×
1982
            log.error("Identified skysub-only exposures in joint fitting request")
×
1983
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
1984
            log.info("LASTSTEP's: {}".format([row['LASTSTEP'] for row in sciences]))
×
1985
            sciences = (np.array(sciences,dtype=object)[~skysubonly]).tolist()
×
1986
            log.info("Removed skysub only exposures in joint fitting:")
×
1987
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
1988
            log.info("LASTSTEP's: {}".format([row['LASTSTEP'] for row in sciences]))
×
1989

1990
        from collections import Counter
×
1991
        tiles = np.array([sci['TILEID'] for sci in sciences])
×
1992
        counts = Counter(tiles)
×
1993
        if len(counts.most_common()) > 1:
×
1994
            log.error("Identified more than one tile in a joint fitting request")
×
1995
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
1996
            log.info("Tileid's: {}".format(tiles))
×
1997
            log.info("Returning without joint fitting any of these exposures.")
×
1998
            # most_common, nmost_common = counts.most_common()[0]
1999
            # if most_common == -99:
2000
            #     most_common, nmost_common = counts.most_common()[1]
2001
            # log.warning(f"Given multiple tiles to jointly fit: {counts}. "+
2002
            #             "Only processing the most common non-default " +
2003
            #             f"tile: {most_common} with {nmost_common} exposures")
2004
            # sciences = (np.array(sciences,dtype=object)[tiles == most_common]).tolist()
2005
            # log.info("Tiles and exposure id's being submitted for joint fitting:")
2006
            # log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
2007
            # log.info("Tileid's: {}".format([row['TILEID'] for row in sciences]))
2008
            sciences = []
×
2009
            return ptable, calibjobs, sciences, internal_id
×
2010

2011
        ptable, tilejob, internal_id = science_joint_fit(ptable, sciences, internal_id, z_submit_types=z_submit_types,
×
2012
                                                         dry_run=dry_run, queue=queue, reservation=reservation,
2013
                                                         strictly_successful=strictly_successful,
2014
                                                         check_for_outputs=check_for_outputs,
2015
                                                         resubmit_partial_complete=resubmit_partial_complete,
2016
                                                         system_name=system_name)
2017
        if tilejob is not None:
×
2018
            sciences = []
×
2019

2020
    elif lasttype == 'flat' and calibjobs['nightlyflat'] is None and len(flats) == 12:
×
2021
        ## Note here we have an assumption about the number of expected flats being greater than 11
2022
        ptable, calibjobs['nightlyflat'], internal_id \
×
2023
            = flat_joint_fit(ptable, flats, internal_id, dry_run=dry_run, queue=queue,
2024
                             reservation=reservation, strictly_successful=strictly_successful,
2025
                             check_for_outputs=check_for_outputs,
2026
                             resubmit_partial_complete=resubmit_partial_complete,
2027
                             system_name=system_name
2028
                            )
2029

2030
    elif lasttype == 'arc' and calibjobs['psfnight'] is None and len(arcs) == 5:
×
2031
        ## Note here we have an assumption about the number of expected arcs being greater than 4
2032
        ptable, calibjobs['psfnight'], internal_id \
×
2033
            = arc_joint_fit(ptable, arcs, internal_id, dry_run=dry_run, queue=queue,
2034
                            reservation=reservation, strictly_successful=strictly_successful,
2035
                            check_for_outputs=check_for_outputs,
2036
                            resubmit_partial_complete=resubmit_partial_complete,
2037
                            system_name=system_name
2038
                            )
2039
    return ptable, calibjobs, sciences, internal_id
×
2040

2041
def submit_tilenight_and_redshifts(ptable, sciences, calibjobs, internal_id, dry_run=0,
1✔
2042
                                  queue='realtime', reservation=None, strictly_successful=False,
2043
                                  check_for_outputs=True, resubmit_partial_complete=True,
2044
                                  system_name=None,use_specter=False, extra_job_args=None):
2045
    """
2046
    Takes all the state-ful data from daily processing and determines whether a tilenight job needs to be submitted.
2047

2048
    Args:
2049
        ptable (Table): Processing table of all exposures that have been processed.
2050
        sciences (list of dict): list of the most recent individual prestdstar science exposures
2051
            (if currently processing that tile). May be empty if none identified yet.
2052
        internal_id (int): an internal identifier unique to each job. Increments with each new job. This
2053
            is the smallest unassigned value.
2054
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
2055
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
2056
            for testing as though scripts are being submitted. Default is 0 (false).
2057
        queue (str, optional): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
2058
        reservation (str, optional): The reservation to submit jobs to. If None, it is not submitted to a reservation.
2059
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
2060
            less desirable because e.g. the sciences can run with SVN default calibrations rather
2061
            than failing completely from failed calibrations. Default is False.
2062
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
2063
            data products for the script being submitted. If all files exist and this is True,
2064
            then the script will not be submitted. If some files exist and this is True, only the
2065
            subset of the cameras without the final data products will be generated and submitted.
2066
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
2067
            jobs with some prior data are pruned using PROCCAMWORD to only process the
2068
            remaining cameras not found to exist.
2069
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, permutter-gpu
2070
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
2071
        extra_job_args (dict, optional): Dictionary with key-value pairs that specify additional
2072
            information used for a specific type of job. Examples include
2073
            laststeps for tilenight, z_submit_types for redshifts, etc.
2074

2075
    Returns:
2076
        tuple: A tuple containing:
2077

2078
        * ptable, Table, Processing table of all exposures that have been processed.
2079
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
2080
          (if currently processing that tile). May be empty if none identified yet or
2081
          we just submitted them for processing.
2082
        * internal_id, int, if no job is submitted, this is the same as the input, otherwise it is incremented upward from
2083
          from the input such that it represents the smallest unused ID.
2084
    """
2085
    ptable, tnight, internal_id = submit_tilenight(ptable, sciences, calibjobs, internal_id,
1✔
2086
                                             queue=queue, reservation=reservation,
2087
                                             dry_run=dry_run, strictly_successful=strictly_successful,
2088
                                             resubmit_partial_complete=resubmit_partial_complete,
2089
                                             system_name=system_name,use_specter=use_specter,
2090
                                             extra_job_args=extra_job_args)
2091

2092
    z_submit_types = None
1✔
2093
    if 'z_submit_types'  in extra_job_args:
1✔
2094
        z_submit_types = extra_job_args['z_submit_types']
1✔
2095
        
2096
    ptable, internal_id = submit_redshifts(ptable, sciences, tnight, internal_id,
1✔
2097
                                    queue=queue, reservation=reservation,
2098
                                    dry_run=dry_run, strictly_successful=strictly_successful,
2099
                                    check_for_outputs=check_for_outputs,
2100
                                    resubmit_partial_complete=resubmit_partial_complete,
2101
                                    z_submit_types=z_submit_types,
2102
                                    system_name=system_name)
2103

2104
    if tnight is not None:
1✔
2105
        sciences = []
1✔
2106

2107
    return ptable, sciences, internal_id
1✔
2108

2109
def set_calibrator_flag(prows, ptable):
1✔
2110
    """
2111
    Sets the "CALIBRATOR" column of a procesing table row to 1 (integer representation of True)
2112
     for all input rows. Used within joint fitting code to flag the exposures that were input
2113
     to the psfnight or nightlyflat for later reference.
2114

2115
    Args:
2116
        prows, list or array of Table.Rows or dicts. The rows corresponding to the individual exposure jobs that are
2117
            inputs to the joint fit.
2118
        ptable, Table. The processing table where each row is a processed job.
2119

2120
    Returns:
2121
        Table: The same processing table as input except with added rows for the joint fit job and, in the case
2122
        of a stdstarfit, the poststdstar science exposure jobs.
2123
    """
2124
    for prow in prows:
1✔
2125
        ptable['CALIBRATOR'][ptable['INTID'] == prow['INTID']] = 1
1✔
2126
    return ptable
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc