• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

desihub / desispec / 8425040410

25 Mar 2024 06:20PM UTC coverage: 28.153% (+3.1%) from 25.01%
8425040410

Pull #2187

github

web-flow
Merge branch 'main' into pipelinerefactor
Pull Request #2187: Introduce desi_proc_night to unify and simplify processing scripts

769 of 1167 new or added lines in 20 files covered. (65.9%)

10 existing lines in 6 files now uncovered.

13250 of 47065 relevant lines covered (28.15%)

0.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.58
/py/desispec/workflow/processing.py
1
"""
2
desispec.workflow.processing
3
============================
4

5
"""
6
import sys, os, glob
1✔
7
import json
1✔
8
from astropy.io import fits
1✔
9
from astropy.table import Table, join
1✔
10
import numpy as np
1✔
11

12
import time, datetime
1✔
13
from collections import OrderedDict
1✔
14
import subprocess
1✔
15

16
from desispec.scripts.link_calibnight import derive_include_exclude
1✔
17
from desispec.scripts.tile_redshifts import generate_tile_redshift_scripts
1✔
18
from desispec.workflow.redshifts import get_ztile_script_pathname, \
1✔
19
                                        get_ztile_relpath, \
20
                                        get_ztile_script_suffix
21
from desispec.workflow.queue import get_resubmission_states, update_from_queue, queue_info_from_qids
1✔
22
from desispec.workflow.timing import what_night_is_it
1✔
23
from desispec.workflow.desi_proc_funcs import get_desi_proc_batch_file_pathname, \
1✔
24
    create_desi_proc_batch_script, \
25
    get_desi_proc_batch_file_path, \
26
    get_desi_proc_tilenight_batch_file_pathname, \
27
    create_desi_proc_tilenight_batch_script, create_linkcal_batch_script
28
from desispec.workflow.utils import pathjoin, sleep_and_report, \
1✔
29
    load_override_file
30
from desispec.workflow.tableio import write_table, load_table
1✔
31
from desispec.workflow.proctable import table_row_to_dict, erow_to_prow
1✔
32
from desiutil.log import get_logger
1✔
33

34
from desispec.io import findfile, specprod_root
1✔
35
from desispec.io.util import decode_camword, create_camword, \
1✔
36
    difference_camwords, \
37
    camword_to_spectros, camword_union, camword_intersection, parse_badamps
38

39

40
#################################################
41
############## Misc Functions ###################
42
#################################################
43
def night_to_starting_iid(night=None):
1✔
44
    """
45
    Creates an internal ID for a given night. The resulting integer is an 8 digit number.
46
    The digits are YYMMDDxxx where YY is the years since 2000, MM and DD are the month and day. xxx are 000,
47
    and are incremented for up to 1000 unique job ID's for a given night.
48

49
    Args:
50
        night (str or int): YYYYMMDD of the night to get the starting internal ID for.
51

52
    Returns:
53
        int: 9 digit number consisting of YYMMDD000. YY is years after 2000, MMDD is month and day.
54
        000 being the starting job number (0).
55
    """
56
    if night is None:
1✔
57
        night = what_night_is_it()
×
58
    night = int(night)
1✔
59
    internal_id = (night - 20000000) * 1000
1✔
60
    return internal_id
1✔
61

62
class ProcessingParams():
1✔
63
    def __init__(self, dry_run_level=0, queue='realtime',
1✔
64
                 reservation=None, strictly_successful=True,
65
                 check_for_outputs=True,
66
                 resubmit_partial_complete=True,
67
                 system_name='perlmutter', use_specter=True):
68

NEW
69
        self.dry_run_level = dry_run_level
×
NEW
70
        self.system_name = system_name
×
NEW
71
        self.queue = queue
×
NEW
72
        self.reservation = reservation
×
NEW
73
        self.strictly_successful = strictly_successful
×
NEW
74
        self.check_for_outputs = check_for_outputs
×
NEW
75
        self.resubmit_partial_complete = resubmit_partial_complete
×
NEW
76
        self.use_specter = use_specter
×
77

78
#################################################
79
############ Script Functions ###################
80
#################################################
81
def batch_script_name(prow):
1✔
82
    """
83
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, JOBDESC, PROCCAMWORD defined)
84
    and determines the script file pathname as defined by desi_proc's helper functions.
85

86
    Args:
87
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
88

89
    Returns:
90
        str: The complete pathname to the script file, as it is defined within the desi_proc ecosystem.
91
    """
92
    expids = prow['EXPID']
1✔
93
    if len(expids) == 0:
1✔
94
        expids = None
1✔
95
    if prow['JOBDESC'] == 'tilenight':
1✔
96
        pathname = get_desi_proc_tilenight_batch_file_pathname(night = prow['NIGHT'], tileid=prow['TILEID'])
1✔
97
    else:
98
        pathname = get_desi_proc_batch_file_pathname(night = prow['NIGHT'], exp=expids, \
1✔
99
                                             jobdesc=prow['JOBDESC'], cameras=prow['PROCCAMWORD'])
100
    scriptfile =  pathname + '.slurm'
1✔
101
    return scriptfile
1✔
102

103
def get_jobdesc_to_file_map():
1✔
104
    """
105
    Returns a mapping of job descriptions to the filenames of the output files
106

107
    Args:
108
        None
109

110
    Returns:
111
        dict. Dictionary with keys as lowercase job descriptions and to the
112
            filename of their expected outputs.
113

114
    """
115
    return {'prestdstar': 'sframe',
1✔
116
            'stdstarfit': 'stdstars',
117
            'poststdstar': 'cframe',
118
            'nightlybias': 'biasnight',
119
            # 'ccdcalib': 'badcolumns',
120
            'badcol': 'badcolumns',
121
            'arc': 'fitpsf',
122
            'flat': 'fiberflat',
123
            'psfnight': 'psfnight',
124
            'nightlyflat': 'fiberflatnight',
125
            'spectra': 'spectra_tile',
126
            'coadds': 'coadds_tile',
127
            'redshift': 'redrock_tile'}
128

129
def get_file_to_jobdesc_map():
1✔
130
    """
131
    Returns a mapping of output filenames to job descriptions
132

133
    Args:
134
        None
135

136
    Returns:
137
        dict. Dictionary with keys as filename of their expected outputs to
138
            the lowercase job descriptions
139
            .
140

141
    """
142
    job_to_file_map = get_jobdesc_to_file_map()
1✔
143
    job_to_file_map.pop('badcol') # these files can also be in a ccdcalib job
1✔
144
    job_to_file_map.pop('nightlybias') # these files can also be in a ccdcalib job
1✔
145
    return {value: key for key, value in job_to_file_map.items()}
1✔
146

147
def check_for_outputs_on_disk(prow, resubmit_partial_complete=True):
1✔
148
    """
149
    Args:
150
        prow (Table.Row or dict): Must include keyword accessible definitions for processing_table columns found in
151
            desispect.workflow.proctable.get_processing_table_column_defs()
152
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
153
            jobs with some prior data are pruned using PROCCAMWORD to only process the
154
            remaining cameras not found to exist.
155

156
    Returns:
157
        Table.Row or dict: The same prow type and keywords as input except with modified values updated to reflect
158
        the change in job status after creating and submitting the job for processing.
159
    """
160
    prow['STATUS'] = 'UNKNOWN'
1✔
161
    log = get_logger()
1✔
162

163
    if prow['JOBDESC'] in ['linkcal', 'ccdcalib']:
1✔
164
        log.info(f"jobdesc={prow['JOBDESC']} has indeterminated outputs, so "
1✔
165
                + "not checking for files on disk.")
166
        return prow
1✔
167

168
    job_to_file_map = get_jobdesc_to_file_map()
1✔
169

170
    night = prow['NIGHT']
1✔
171
    if prow['JOBDESC'] in ['cumulative','pernight-v0','pernight','perexp']:
1✔
172
        filetype = 'redrock_tile'
1✔
173
    else:
174
        filetype = job_to_file_map[prow['JOBDESC']]
1✔
175
    orig_camword = prow['PROCCAMWORD']
1✔
176

177
    ## if spectro based, look for spectros, else look for cameras
178
    if prow['JOBDESC'] in ['stdstarfit','spectra','coadds','redshift']:
1✔
179
        ## Spectrograph based
180
        spectros = camword_to_spectros(prow['PROCCAMWORD'])
×
181
        n_desired = len(spectros)
×
182
        ## Suppress outputs about using tile based files in findfile if only looking for stdstarfits
183
        if prow['JOBDESC'] == 'stdstarfit':
×
184
            tileid = None
×
185
        else:
186
            tileid = prow['TILEID']
×
187
        expid = prow['EXPID'][0]
×
188
        existing_spectros = []
×
189
        for spectro in spectros:
×
190
            if os.path.exists(findfile(filetype=filetype, night=night, expid=expid, spectrograph=spectro, tile=tileid)):
×
191
                existing_spectros.append(spectro)
×
192
        completed = (len(existing_spectros) == n_desired)
×
193
        if not completed and resubmit_partial_complete and len(existing_spectros) > 0:
×
194
            existing_camword = 'a' + ''.join([str(spec) for spec in sorted(existing_spectros)])
×
195
            prow['PROCCAMWORD'] = difference_camwords(prow['PROCCAMWORD'],existing_camword)
×
196
    elif prow['JOBDESC'] in ['cumulative','pernight-v0','pernight','perexp']:
1✔
197
        ## Spectrograph based
198
        spectros = camword_to_spectros(prow['PROCCAMWORD'])
1✔
199
        n_desired = len(spectros)
1✔
200
        ## Suppress outputs about using tile based files in findfile if only looking for stdstarfits
201
        tileid = prow['TILEID']
1✔
202
        expid = prow['EXPID'][0]
1✔
203
        redux_dir = specprod_root()
1✔
204
        outdir = os.path.join(redux_dir,get_ztile_relpath(tileid,group=prow['JOBDESC'],night=night,expid=expid))
1✔
205
        suffix = get_ztile_script_suffix(tileid, group=prow['JOBDESC'], night=night, expid=expid)
1✔
206
        existing_spectros = []
1✔
207
        for spectro in spectros:
1✔
208
            if os.path.exists(os.path.join(outdir, f"redrock-{spectro}-{suffix}.fits")):
1✔
209
                existing_spectros.append(spectro)
×
210
        completed = (len(existing_spectros) == n_desired)
1✔
211
        if not completed and resubmit_partial_complete and len(existing_spectros) > 0:
1✔
212
            existing_camword = 'a' + ''.join([str(spec) for spec in sorted(existing_spectros)])
×
213
            prow['PROCCAMWORD'] = difference_camwords(prow['PROCCAMWORD'],existing_camword)
×
214
    else:
215
        ## Otheriwse camera based
216
        cameras = decode_camword(prow['PROCCAMWORD'])
1✔
217
        n_desired = len(cameras)
1✔
218
        if len(prow['EXPID']) > 0:
1✔
219
            expid = prow['EXPID'][0]
1✔
220
        else:
221
            expid = None
1✔
222
        if len(prow['EXPID']) > 1 and prow['JOBDESC'] not in ['psfnight','nightlyflat']:
1✔
223
            log.warning(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']}. This job type only makes " +
×
224
                     f"sense with a single exposure. Proceeding with {expid}.")
225
        missing_cameras = []
1✔
226
        for cam in cameras:
1✔
227
            if not os.path.exists(findfile(filetype=filetype, night=night, expid=expid, camera=cam)):
1✔
228
                missing_cameras.append(cam)
1✔
229
        completed = (len(missing_cameras) == 0)
1✔
230
        if not completed and resubmit_partial_complete and len(missing_cameras) < n_desired:
1✔
231
            prow['PROCCAMWORD'] = create_camword(missing_cameras)
×
232

233
    if completed:
1✔
234
        prow['STATUS'] = 'COMPLETED'
×
235
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} already has " +
×
236
                 f"the desired {n_desired} {filetype}'s. Not submitting this job.")
237
    elif resubmit_partial_complete and orig_camword != prow['PROCCAMWORD']:
1✔
238
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} already has " +
×
239
                 f"some {filetype}'s. Submitting smaller camword={prow['PROCCAMWORD']}.")
240
    elif not resubmit_partial_complete:
1✔
241
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} doesn't have all " +
×
242
                 f"{filetype}'s and resubmit_partial_complete=False. "+
243
                 f"Submitting full camword={prow['PROCCAMWORD']}.")
244
    else:
245
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} has no " +
1✔
246
                 f"existing {filetype}'s. Submitting full camword={prow['PROCCAMWORD']}.")
247
    return prow
1✔
248

249
def create_and_submit(prow, queue='realtime', reservation=None, dry_run=0,
1✔
250
                      joint=False, strictly_successful=False,
251
                      check_for_outputs=True, resubmit_partial_complete=True,
252
                      system_name=None, use_specter=False,
253
                      extra_job_args=None):
254
    """
255
    Wrapper script that takes a processing table row and three modifier keywords, creates a submission script for the
256
    compute nodes, and then submits that script to the Slurm scheduler with appropriate dependencies.
257

258
    Args:
259
        prow (Table.Row or dict): Must include keyword accessible definitions for processing_table columns found in
260
            desispect.workflow.proctable.get_processing_table_column_defs()
261
        queue (str, optional): The name of the NERSC Slurm queue to submit to. Default is the realtime queue.
262
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
263
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
264
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
265
            for testing as though scripts are being submitted. Default is 0 (false).
266
        joint (bool, optional): Whether this is a joint fitting job (the job involves multiple exposures) and therefore needs to be
267
            run with desi_proc_joint_fit. Default is False.
268
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
269
            less desirable because e.g. the sciences can run with SVN default calibrations rather
270
            than failing completely from failed calibrations. Default is False.
271
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
272
            data products for the script being submitted. If all files exist and this is True,
273
            then the script will not be submitted. If some files exist and this is True, only the
274
            subset of the cameras without the final data products will be generated and submitted.
275
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
276
            jobs with some prior data are pruned using PROCCAMWORD to only process the
277
            remaining cameras not found to exist.
278
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
279
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
280
        extra_job_args (dict): Dictionary with key-value pairs that specify additional
281
            information used for a specific type of job. Examples include refnight
282
            and include/exclude lists for linkcals, laststeps for for tilenight, etc.
283

284
    Returns:
285
        Table.Row or dict: The same prow type and keywords as input except with modified values updated to reflect
286
        the change in job status after creating and submitting the job for processing.
287

288
    Note:
289
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
290
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
291
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
292
    """
293
    orig_prow = prow.copy()
1✔
294
    if check_for_outputs:
1✔
295
        prow = check_for_outputs_on_disk(prow, resubmit_partial_complete)
1✔
296
        if prow['STATUS'].upper() == 'COMPLETED':
1✔
297
            return prow
×
298

299
    prow = create_batch_script(prow, queue=queue, dry_run=dry_run, joint=joint,
1✔
300
                               system_name=system_name, use_specter=use_specter,
301
                               extra_job_args=extra_job_args)
302
    prow = submit_batch_script(prow, reservation=reservation, dry_run=dry_run,
1✔
303
                               strictly_successful=strictly_successful)
304

305
    ## If resubmitted partial, the PROCCAMWORD and SCRIPTNAME will correspond
306
    ## to the pruned values. But we want to
307
    ## retain the full job's value, so get those from the old job.
308
    if resubmit_partial_complete:
1✔
309
        prow['PROCCAMWORD'] = orig_prow['PROCCAMWORD']
1✔
310
        prow['SCRIPTNAME'] = orig_prow['SCRIPTNAME']
1✔
311
    return prow
1✔
312

313
def desi_link_calibnight_command(prow, refnight, include=None):
1✔
314
    """
315
    Wrapper script that takes a processing table row (or dictionary with
316
    REFNIGHT, NIGHT, PROCCAMWORD defined) and determines the proper command
317
    line call to link data defined by the input row/dict.
318

319
    Args:
320
        prow (Table.Row or dict): Must include keyword accessible definitions
321
            for 'NIGHT', 'REFNIGHT', and 'PROCCAMWORD'.
322
        refnight (str or int): The night with a valid set of calibrations
323
            be created.
324
        include (list): The filetypes to include in the linking.
325
    Returns:
326
        str: The proper command to be submitted to desi_link_calibnight
327
            to process the job defined by the prow values.
328
    """
329
    cmd = 'desi_link_calibnight'
1✔
330
    cmd += f' --refnight={refnight}'
1✔
331
    cmd += f' --newnight={prow["NIGHT"]}'
1✔
332
    cmd += f' --cameras={prow["PROCCAMWORD"]}'
1✔
333
    if include is not None:
1✔
334
        cmd += f' --include=' + ','.join(list(include))
1✔
335
    return cmd
1✔
336

337
def desi_proc_command(prow, system_name, use_specter=False, queue=None):
1✔
338
    """
339
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, OBSTYPE, JOBDESC, PROCCAMWORD defined)
340
    and determines the proper command line call to process the data defined by the input row/dict.
341

342
    Args:
343
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
344
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, perlmutter-gpu
345
        queue (str, optional): The name of the NERSC Slurm queue to submit to. Default is None (which leaves it to the desi_proc default).
346
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
347

348
    Returns:
349
        str: The proper command to be submitted to desi_proc to process the job defined by the prow values.
350
    """
351
    cmd = 'desi_proc'
1✔
352
    cmd += ' --batch'
1✔
353
    cmd += ' --nosubmit'
1✔
354
    if queue is not None:
1✔
355
        cmd += f' -q {queue}'
1✔
356
    if prow['OBSTYPE'].lower() == 'science':
1✔
357
        if prow['JOBDESC'] == 'prestdstar':
×
358
            cmd += ' --nostdstarfit --nofluxcalib'
×
359
        elif prow['JOBDESC'] == 'poststdstar':
×
360
            cmd += ' --noprestdstarfit --nostdstarfit'
×
361

362
        if use_specter:
×
363
            cmd += ' --use-specter'
×
364
    elif prow['JOBDESC'] in ['flat', 'prestdstar'] and use_specter:
1✔
365
        cmd += ' --use-specter'
×
366
    pcamw = str(prow['PROCCAMWORD'])
1✔
367
    cmd += f" --cameras={pcamw} -n {prow['NIGHT']}"
1✔
368
    if len(prow['EXPID']) > 0:
1✔
369
        ## If ccdcalib job without a dark exposure, don't assign the flat expid
370
        ## since it would incorrectly process the flat using desi_proc
371
        if prow['OBSTYPE'].lower() != 'flat' or prow['JOBDESC'] != 'ccdcalib':
1✔
372
            cmd += f" -e {prow['EXPID'][0]}"
1✔
373
    if prow['BADAMPS'] != '':
1✔
374
        cmd += ' --badamps={}'.format(prow['BADAMPS'])
×
375
    return cmd
1✔
376

377
def desi_proc_joint_fit_command(prow, queue=None):
1✔
378
    """
379
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, OBSTYPE, PROCCAMWORD defined)
380
    and determines the proper command line call to process the data defined by the input row/dict.
381

382
    Args:
383
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
384
        queue (str): The name of the NERSC Slurm queue to submit to. Default is None (which leaves it to the desi_proc default).
385

386
    Returns:
387
        str: The proper command to be submitted to desi_proc_joint_fit
388
            to process the job defined by the prow values.
389
    """
390
    cmd = 'desi_proc_joint_fit'
1✔
391
    cmd += ' --batch'
1✔
392
    cmd += ' --nosubmit'
1✔
393
    if queue is not None:
1✔
394
        cmd += f' -q {queue}'
1✔
395

396
    descriptor = prow['OBSTYPE'].lower()
1✔
397

398
    night = prow['NIGHT']
1✔
399
    specs = str(prow['PROCCAMWORD'])
1✔
400
    expid_str = ','.join([str(eid) for eid in prow['EXPID']])
1✔
401

402
    cmd += f' --obstype {descriptor}'
1✔
403
    cmd += f' --cameras={specs} -n {night}'
1✔
404
    if len(expid_str) > 0:
1✔
405
        cmd += f' -e {expid_str}'
1✔
406
    return cmd
1✔
407

408

409
def create_batch_script(prow, queue='realtime', dry_run=0, joint=False,
1✔
410
                        system_name=None, use_specter=False, extra_job_args=None):
411
    """
412
    Wrapper script that takes a processing table row and three modifier keywords and creates a submission script for the
413
    compute nodes.
414

415
    Args:
416
        prow, Table.Row or dict. Must include keyword accessible definitions for processing_table columns found in
417
            desispect.workflow.proctable.get_processing_table_column_defs()
418
        queue, str. The name of the NERSC Slurm queue to submit to. Default is the realtime queue.
419
        dry_run, int. If nonzero, this is a simulated run. If dry_run=1 the scripts will be written but not submitted.
420
            If dry_run=2, the scripts will not be written nor submitted. Logging will remain the same
421
            for testing as though scripts are being submitted. Default is 0 (false).
422
        joint, bool. Whether this is a joint fitting job (the job involves multiple exposures) and therefore needs to be
423
            run with desi_proc_joint_fit when not using tilenight. Default is False.
424
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
425
        use_specter, bool, optional. Default is False. If True, use specter, otherwise use gpu_specter by default.
426
        extra_job_args (dict): Dictionary with key-value pairs that specify additional
427
            information used for a specific type of job. Examples include refnight
428
            and include/exclude lists for linkcal, laststeps for tilenight, etc.
429

430
    Returns:
431
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
432
        scriptname.
433

434
    Note:
435
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
436
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
437
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
438
    """
439
    log = get_logger()
1✔
440

441
    if extra_job_args is None:
1✔
442
        extra_job_args = {}
1✔
443

444
    if prow['JOBDESC'] in ['perexp','pernight','pernight-v0','cumulative']:
1✔
445
        if dry_run > 1:
1✔
446
            scriptpathname = get_ztile_script_pathname(tileid=prow['TILEID'],group=prow['JOBDESC'],
1✔
447
                                                               night=prow['NIGHT'], expid=prow['EXPID'][0])
448

449
            log.info("Output file would have been: {}".format(scriptpathname))
1✔
450
        else:
451
            #- run zmtl for cumulative redshifts but not others
452
            run_zmtl = (prow['JOBDESC'] == 'cumulative')
1✔
453
            no_afterburners = False
1✔
454
            print(f"entering tileredshiftscript: {prow}")
1✔
455
            scripts, failed_scripts = generate_tile_redshift_scripts(tileid=prow['TILEID'], group=prow['JOBDESC'],
1✔
456
                                                                     nights=[prow['NIGHT']], expids=prow['EXPID'],
457
                                                                     batch_queue=queue, system_name=system_name,
458
                                                                     run_zmtl=run_zmtl,
459
                                                                     no_afterburners=no_afterburners,
460
                                                                     nosubmit=True)
461
            if len(failed_scripts) > 0:
1✔
462
                log.error(f"Redshifts failed for group={prow['JOBDESC']}, night={prow['NIGHT']}, "+
×
463
                          f"tileid={prow['TILEID']}, expid={prow['EXPID']}.")
464
                log.info(f"Returned failed scriptname is {failed_scripts}")
×
465
            elif len(scripts) > 1:
1✔
466
                log.error(f"More than one redshifts returned for group={prow['JOBDESC']}, night={prow['NIGHT']}, "+
×
467
                          f"tileid={prow['TILEID']}, expid={prow['EXPID']}.")
468
                log.info(f"Returned scriptnames were {scripts}")
×
469
            elif len(scripts) == 0:
1✔
NEW
470
                msg = f'No scripts were generated for {prow=}'
×
NEW
471
                log.critical(prow)
×
NEW
472
                raise ValueError(msg)
×
473
            else:
474
                scriptpathname = scripts[0]
1✔
475

476
    elif prow['JOBDESC'] == 'linkcal':
1✔
477
        refnight, include, exclude = -99, None, None
1✔
478
        if 'refnight' in extra_job_args:
1✔
479
            refnight = extra_job_args['refnight']
1✔
480
        if 'include' in extra_job_args:
1✔
481
            include = extra_job_args['include']
1✔
482
        if 'exclude' in extra_job_args:
1✔
NEW
483
            exclude = extra_job_args['exclude']
×
484
        include, exclude = derive_include_exclude(include, exclude)
1✔
485
        ## Fiberflatnights shouldn't to be generated with psfs from same time, so
486
        ## shouldn't link psfs without also linking fiberflatnight
487
        ## However, this should be checked at a higher level. If set here,
488
        ## go ahead and do it
489
        # if 'psfnight' in include and not 'fiberflatnight' in include:
490
        #     err = "Must link fiberflatnight if linking psfnight"
491
        #     log.error(err)
492
        #     raise ValueError(err)
493
        if dry_run > 1:
1✔
494
            scriptpathname = batch_script_name(prow)
1✔
495
            log.info("Output file would have been: {}".format(scriptpathname))
1✔
496
            cmd = desi_link_calibnight_command(prow, refnight, include)
1✔
497
            log.info("Command to be run: {}".format(cmd.split()))
1✔
498
        else:
NEW
499
            if refnight == -99:
×
NEW
500
                err = f'For {prow=} asked to link calibration but not given' \
×
501
                      + ' a valid refnight'
NEW
502
                log.error(err)
×
NEW
503
                raise ValueError(err)
×
504

NEW
505
            cmd = desi_link_calibnight_command(prow, refnight, include)
×
NEW
506
            log.info(f"Running: {cmd.split()}")
×
NEW
507
            scriptpathname = create_linkcal_batch_script(newnight=prow['NIGHT'],
×
508
                                                        cameras=prow['PROCCAMWORD'],
509
                                                        queue=queue,
510
                                                        cmd=cmd,
511
                                                        system_name=system_name)
512
    else:
513
        if prow['JOBDESC'] != 'tilenight':
1✔
514
            nightlybias, nightlycte, cte_expids = False, False, None
1✔
515
            if 'nightlybias' in extra_job_args:
1✔
516
                nightlybias = extra_job_args['nightlybias']
1✔
517
            elif prow['JOBDESC'].lower() == 'nightlybias':
1✔
518
                nightlybias = True
1✔
519
            if 'nightlycte' in extra_job_args:
1✔
520
                nightlycte = extra_job_args['nightlycte']
1✔
521
            if 'cte_expids' in extra_job_args:
1✔
522
                cte_expids = extra_job_args['cte_expids']
1✔
523
            ## run known joint jobs as joint even if unspecified
524
            ## in the future we can eliminate the need for "joint"
525
            if joint or prow['JOBDESC'].lower() in ['psfnight', 'nightlyflat']:
1✔
526
                cmd = desi_proc_joint_fit_command(prow, queue=queue)
1✔
527
            else:
528
                cmd = desi_proc_command(prow, system_name, use_specter, queue=queue)
1✔
529
                if nightlybias:
1✔
530
                    cmd += ' --nightlybias'
1✔
531
                if nightlycte:
1✔
532
                    cmd += ' --nightlycte'
1✔
533
                    if cte_expids is not None:
1✔
534
                        cmd += ' --cte-expids '
1✔
535
                        cmd += ','.join(np.atleast_1d(cte_expids).astype(str))
1✔
536
        if dry_run > 1:
1✔
537
            scriptpathname = batch_script_name(prow)
1✔
538
            log.info("Output file would have been: {}".format(scriptpathname))
1✔
539
            if prow['JOBDESC'] != 'tilenight':
1✔
540
                log.info("Command to be run: {}".format(cmd.split()))
1✔
541
        else:
542
            expids = prow['EXPID']
1✔
543
            if len(expids) == 0:
1✔
544
                expids = None
×
545

546
            if prow['JOBDESC'] == 'tilenight':
1✔
547
                log.info("Creating tilenight script for tile {}".format(prow['TILEID']))
1✔
548
                if 'laststeps' in extra_job_args:
1✔
549
                    laststeps = extra_job_args['laststeps']
1✔
550
                else:
NEW
551
                    err = f'{prow=} job did not specify last steps to tilenight'
×
NEW
552
                    log.error(err)
×
NEW
553
                    raise ValueError(err)
×
554
                ncameras = len(decode_camword(prow['PROCCAMWORD']))
1✔
555
                scriptpathname = create_desi_proc_tilenight_batch_script(
1✔
556
                                                               night=prow['NIGHT'], exp=expids,
557
                                                               tileid=prow['TILEID'],
558
                                                               ncameras=ncameras,
559
                                                               queue=queue,
560
                                                               mpistdstars=True,
561
                                                               use_specter=use_specter,
562
                                                               system_name=system_name,
563
                                                               laststeps=laststeps)
564
            else:
565
                if expids is not None and len(expids) > 1:
1✔
566
                    expids = expids[:1]
1✔
567
                log.info("Running: {}".format(cmd.split()))
1✔
568
                scriptpathname = create_desi_proc_batch_script(night=prow['NIGHT'], exp=expids,
1✔
569
                                                               cameras=prow['PROCCAMWORD'],
570
                                                               jobdesc=prow['JOBDESC'],
571
                                                               queue=queue, cmdline=cmd,
572
                                                               use_specter=use_specter,
573
                                                               system_name=system_name,
574
                                                               nightlybias=nightlybias,
575
                                                               nightlycte=nightlycte,
576
                                                               cte_expids=cte_expids)
577
    log.info("Outfile is: {}".format(scriptpathname))
1✔
578
    prow['SCRIPTNAME'] = os.path.basename(scriptpathname)
1✔
579
    return prow
1✔
580

581
_fake_qid = int(time.time() - 1.7e9)
1✔
582
def _get_fake_qid():
1✔
583
    """
584
    Return fake slurm queue jobid to use for dry-run testing
585
    """
586
    # Note: not implemented as a yield generator so that this returns a
587
    # genuine int, not a generator object
588
    global _fake_qid
589
    _fake_qid += 1
1✔
590
    return _fake_qid
1✔
591

592
def submit_batch_script(prow, dry_run=0, reservation=None, strictly_successful=False):
1✔
593
    """
594
    Wrapper script that takes a processing table row and three modifier keywords and submits the scripts to the Slurm
595
    scheduler.
596

597
    Args:
598
        prow, Table.Row or dict. Must include keyword accessible definitions for processing_table columns found in
599
            desispect.workflow.proctable.get_processing_table_column_defs()
600
        dry_run, int. If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
601
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
602
            for testing as though scripts are being submitted. Default is 0 (false).
603
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
604
        strictly_successful, bool. Whether all jobs require all inputs to have succeeded. For daily processing, this is
605
            less desirable because e.g. the sciences can run with SVN default calibrations rather
606
            than failing completely from failed calibrations. Default is False.
607

608
    Returns:
609
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
610
        scriptname.
611

612
    Note:
613
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
614
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
615
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
616
    """
617
    log = get_logger()
1✔
618
    dep_qids = prow['LATEST_DEP_QID']
1✔
619
    dep_list, dep_str = '', ''
1✔
620

621
    # workaround for sbatch --dependency bug not tracking completed jobs correctly
622
    # see NERSC TICKET INC0203024
623
    if len(dep_qids) > 0 and not dry_run:
1✔
624
        dep_table = queue_info_from_qids(np.asarray(dep_qids), columns='jobid,state')
×
625
        for row in dep_table:
×
626
            if row['STATE'] == 'COMPLETED':
×
627
                log.info(f"removing completed jobid {row['JOBID']}")
×
628
                dep_qids = np.delete(dep_qids, np.argwhere(dep_qids==row['JOBID']))
×
629

630
    if len(dep_qids) > 0:
1✔
631
        jobtype = prow['JOBDESC']
1✔
632
        if strictly_successful:
1✔
633
            depcond = 'afterok'
1✔
634
        elif jobtype in ['arc', 'psfnight', 'prestdstar', 'stdstarfit']:
×
635
            ## (though psfnight and stdstarfit will require some inputs otherwise they'll go up in flames)
636
            depcond = 'afterany'
×
637
        else:
638
            ## if 'flat','nightlyflat','poststdstar', or any type of redshift, require strict success of inputs
639
            depcond = 'afterok'
×
640

641
        dep_str = f'--dependency={depcond}:'
1✔
642

643
        if np.isscalar(dep_qids):
1✔
644
            dep_list = str(dep_qids).strip(' \t')
×
645
            if dep_list == '':
×
646
                dep_str = ''
×
647
            else:
648
                dep_str += dep_list
×
649
        else:
650
            if len(dep_qids)>1:
1✔
651
                dep_list = ':'.join(np.array(dep_qids).astype(str))
1✔
652
                dep_str += dep_list
1✔
653
            elif len(dep_qids) == 1 and dep_qids[0] not in [None, 0]:
1✔
654
                dep_str += str(dep_qids[0])
1✔
655
            else:
656
                dep_str = ''
×
657

658
    # script = f'{jobname}.slurm'
659
    # script_path = pathjoin(batchdir, script)
660
    if prow['JOBDESC'] in ['pernight-v0','pernight','perexp','cumulative']:
1✔
661
        script_path = get_ztile_script_pathname(tileid=prow['TILEID'],group=prow['JOBDESC'],
1✔
662
                                                        night=prow['NIGHT'], expid=np.min(prow['EXPID']))
663
        jobname = os.path.basename(script_path)
1✔
664
    else:
665
        batchdir = get_desi_proc_batch_file_path(night=prow['NIGHT'])
1✔
666
        jobname = batch_script_name(prow)
1✔
667
        script_path = pathjoin(batchdir, jobname)
1✔
668

669
    batch_params = ['sbatch', '--parsable']
1✔
670
    if dep_str != '':
1✔
671
        batch_params.append(f'{dep_str}')
1✔
672
    if reservation is not None:
1✔
673
        batch_params.append(f'--reservation={reservation}')
×
674
    batch_params.append(f'{script_path}')
1✔
675

676
    if dry_run:
1✔
677
        current_qid = _get_fake_qid()
1✔
678
    else:
679
        #- sbatch sometimes fails; try several times before giving up
680
        max_attempts = 3
×
681
        for attempt in range(max_attempts):
×
682
            try:
×
683
                current_qid = subprocess.check_output(batch_params, stderr=subprocess.STDOUT, text=True)
×
684
                current_qid = int(current_qid.strip(' \t\n'))
×
685
                break
×
686
            except subprocess.CalledProcessError as err:
×
687
                log.error(f'{jobname} submission failure at {datetime.datetime.now()}')
×
688
                log.error(f'{jobname}   {batch_params}')
×
689
                log.error(f'{jobname}   {err.output=}')
×
690
                if attempt < max_attempts - 1:
×
691
                    log.info('Sleeping 60 seconds then retrying')
×
692
                    time.sleep(60)
×
693
        else:  #- for/else happens if loop doesn't succeed
694
            msg = f'{jobname} submission failed {max_attempts} times; exiting'
×
695
            log.critical(msg)
×
696
            raise RuntimeError(msg)
×
697

698
    log.info(batch_params)
1✔
699
    log.info(f'Submitted {jobname} with dependencies {dep_str} and reservation={reservation}. Returned qid: {current_qid}')
1✔
700

701
    prow['LATEST_QID'] = current_qid
1✔
702
    prow['ALL_QIDS'] = np.append(prow['ALL_QIDS'],current_qid)
1✔
703
    prow['STATUS'] = 'SUBMITTED'
1✔
704
    prow['SUBMIT_DATE'] = int(time.time())
1✔
705

706
    return prow
1✔
707

708

709
#############################################
710
##########   Row Manipulations   ############
711
#############################################
712
def define_and_assign_dependency(prow, calibjobs, use_tilenight=False,
1✔
713
                                 refnight=None):
714
    """
715
    Given input processing row and possible calibjobs, this defines the
716
    JOBDESC keyword and assigns the dependency appropriate for the job type of
717
    prow.
718

719
    Args:
720
        prow, Table.Row or dict. Must include keyword accessible definitions for
721
            'OBSTYPE'. A row must have column names for
722
            'JOBDESC', 'INT_DEP_IDS', and 'LATEST_DEP_ID'.
723
        calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
724
            and 'nightlyflat'. Each key corresponds to a Table.Row or
725
            None. The table.Row() values are for the corresponding
726
            calibration job. Each value that isn't None must contain
727
            'INTID', and 'LATEST_QID'. If None, it assumes the
728
            dependency doesn't exist and no dependency is assigned.
729
        use_tilenight, bool. Default is False. If True, use desi_proc_tilenight
730
            for prestdstar, stdstar,and poststdstar steps for
731
            science exposures.
732
        refnight, int. The reference night for linking jobs
733

734
    Returns:
735
        Table.Row or dict: The same prow type and keywords as input except
736
        with modified values updated values for 'JOBDESC', 'INT_DEP_IDS'. and 'LATEST_DEP_ID'.
737

738
    Note:
739
        This modifies the input. Though Table.Row objects are generally copied
740
        on modification, so the change to the input object in memory may or may
741
        not be changed. As of writing, a row from a table given to this function
742
        will not change during the execution of this function (but can be
743
        overwritten explicitly with the returned row if desired).
744
    """
745
    if prow['OBSTYPE'] in ['science', 'twiflat']:
1✔
746
        if calibjobs['nightlyflat'] is not None:
1✔
747
            dependency = calibjobs['nightlyflat']
1✔
748
        elif calibjobs['psfnight'] is not None:
1✔
749
            dependency = calibjobs['psfnight']
1✔
750
        elif calibjobs['ccdcalib'] is not None:
1✔
751
            dependency = calibjobs['ccdcalib']
1✔
752
        elif calibjobs['nightlybias'] is not None:
1✔
753
            dependency = calibjobs['nightlybias']
×
754
        elif calibjobs['badcol'] is not None:
1✔
NEW
755
            dependency = calibjobs['badcol']
×
756
        else:
757
            dependency = calibjobs['linkcal']
1✔
758
        if not use_tilenight:
1✔
759
            prow['JOBDESC'] = 'prestdstar'
1✔
760
    elif prow['OBSTYPE'] == 'flat':
1✔
761
        if calibjobs['psfnight'] is not None:
1✔
762
            dependency = calibjobs['psfnight']
1✔
763
        elif calibjobs['ccdcalib'] is not None:
1✔
764
            dependency = calibjobs['ccdcalib']
1✔
765
        elif calibjobs['nightlybias'] is not None:
1✔
766
            dependency = calibjobs['nightlybias']
×
767
        elif calibjobs['badcol'] is not None:
1✔
NEW
768
            dependency = calibjobs['badcol']
×
769
        else:
770
            dependency = calibjobs['linkcal']
1✔
771
    elif prow['OBSTYPE'] == 'arc':
1✔
772
        if calibjobs['ccdcalib'] is not None:
1✔
773
            dependency = calibjobs['ccdcalib']
1✔
774
        elif calibjobs['nightlybias'] is not None:
1✔
775
            dependency = calibjobs['nightlybias']
1✔
776
        elif calibjobs['badcol'] is not None:
1✔
777
            dependency = calibjobs['badcol']
×
778
        else:
779
            dependency = calibjobs['linkcal']
1✔
780
    elif prow['JOBDESC'] in ['badcol', 'nightlybias', 'ccdcalib']:
1✔
781
        dependency = calibjobs['linkcal']
1✔
782
    elif prow['OBSTYPE'] == 'dark':
1✔
NEW
783
        if calibjobs['ccdcalib'] is not None:
×
NEW
784
            dependency = calibjobs['ccdcalib']
×
NEW
785
        elif calibjobs['nightlybias'] is not None:
×
NEW
786
            dependency = calibjobs['nightlybias']
×
NEW
787
        elif calibjobs['badcol'] is not None:
×
NEW
788
            dependency = calibjobs['badcol']
×
789
        else:
NEW
790
            dependency = calibjobs['linkcal']
×
791
    elif prow['JOBDESC'] == 'linkcal' and refnight is not None:
1✔
792
        dependency = None
1✔
793
        ## For link cals only, enable cross-night dependencies if available
794
        refproctable = findfile('proctable', night=refnight)
1✔
795
        if os.path.exists(refproctable):
1✔
NEW
796
            ptab = load_table(tablename=refproctable, tabletype='proctable')
×
797
            ## This isn't perfect because we may depend on jobs that aren't
798
            ## actually being linked
799
            ## Also allows us to proceed even if jobs don't exist yet
NEW
800
            deps = []
×
NEW
801
            for job in ['nightlybias', 'ccdcalib', 'psfnight', 'nightlyflat']:
×
NEW
802
                if job in ptab['JOBDESC']:
×
803
                    ## add prow to dependencies
NEW
804
                    deps.append(ptab[ptab['JOBDESC']==job][0])
×
NEW
805
            if len(deps) > 0:
×
NEW
806
                dependency = deps
×
807
    else:
808
        dependency = None
×
809

810
    prow = assign_dependency(prow, dependency)
1✔
811

812
    return prow
1✔
813

814

815
def assign_dependency(prow, dependency):
1✔
816
    """
817
    Given input processing row and possible arcjob (processing row for psfnight) and flatjob (processing row for
818
    nightlyflat), this defines the JOBDESC keyword and assigns the dependency appropriate for the job type of prow.
819

820
    Args:
821
        prow, Table.Row or dict. Must include keyword accessible definitions for 'OBSTYPE'. A row must have column names for
822
            'JOBDESC', 'INT_DEP_IDS', and 'LATEST_DEP_ID'.
823
        dependency, NoneType or scalar/list/array of Table.Row, dict. Processing row corresponding to the required input
824
            for the job in prow. This must contain keyword
825
            accessible values for 'INTID', and 'LATEST_QID'.
826
            If None, it assumes the dependency doesn't exist
827
            and no dependency is assigned.
828

829
    Returns:
830
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
831
        'JOBDESC', 'INT_DEP_IDS'. and 'LATEST_DEP_ID'.
832

833
    Note:
834
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
835
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
836
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
837
    """
838
    prow['INT_DEP_IDS'] = np.ndarray(shape=0).astype(int)
1✔
839
    prow['LATEST_DEP_QID'] = np.ndarray(shape=0).astype(int)
1✔
840
    if dependency is not None:
1✔
841
        if type(dependency) in [list, np.array]:
1✔
842
            ids, qids = [], []
1✔
843
            for curdep in dependency:
1✔
844
                if still_a_dependency(curdep):
1✔
845
                    ids.append(curdep['INTID'])
1✔
846
                    qids.append(curdep['LATEST_QID'])
1✔
847
            prow['INT_DEP_IDS'] = np.array(ids, dtype=int)
1✔
848
            prow['LATEST_DEP_QID'] = np.array(qids, dtype=int)
1✔
849
        elif type(dependency) in [dict, OrderedDict, Table.Row] and still_a_dependency(dependency):
1✔
850
            prow['INT_DEP_IDS'] = np.array([dependency['INTID']], dtype=int)
1✔
851
            prow['LATEST_DEP_QID'] = np.array([dependency['LATEST_QID']], dtype=int)
1✔
852
    return prow
1✔
853

854
def still_a_dependency(dependency):
1✔
855
    """
856
    Defines the criteria for which a dependency is deemed complete (and therefore no longer a dependency).
857

858
     Args:
859
        dependency, Table.Row or dict. Processing row corresponding to the required input for the job in prow.
860
            This must contain keyword accessible values for 'STATUS', and 'LATEST_QID'.
861

862
    Returns:
863
        bool: False if the criteria indicate that the dependency is completed and no longer a blocking factor (ie no longer
864
        a genuine dependency). Returns True if the dependency is still a blocking factor such that the slurm
865
        scheduler needs to be aware of the pending job.
866

867
    """
868
    return dependency['LATEST_QID'] > 0 and dependency['STATUS'] != 'COMPLETED'
1✔
869

870
def get_type_and_tile(erow):
1✔
871
    """
872
    Trivial function to return the OBSTYPE and the TILEID from an exposure table row
873

874
    Args:
875
        erow, Table.Row or dict. Must contain 'OBSTYPE' and 'TILEID' as keywords.
876

877
    Returns:
878
        tuple (str, str), corresponding to the OBSTYPE and TILEID values of the input erow.
879
    """
880
    return str(erow['OBSTYPE']).lower(), erow['TILEID']
×
881

882

883
#############################################
884
#########   Table manipulators   ############
885
#############################################
886
def parse_previous_tables(etable, ptable, night):
1✔
887
    """
888
    This takes in the exposure and processing tables and regenerates all the working memory variables needed for the
889
    daily processing script.
890

891
    Used by the daily processing to define most of its state-ful variables into working memory.
892
    If the processing table is empty, these are simply declared and returned for use.
893
    If the code had previously run and exited (or crashed), however, this will all the code to
894
    re-establish itself by redefining these values.
895

896
    Args:
897
        etable, Table, Exposure table of all exposures that have been dealt with thus far.
898
        ptable, Table, Processing table of all exposures that have been processed.
899
        night, str or int, the night the data was taken.
900

901
    Returns:
902
        tuple: A tuple containing:
903

904
        * arcs, list of dicts, list of the individual arc jobs used for the psfnight (NOT all
905
          the arcs, if multiple sets existed)
906
        * flats, list of dicts, list of the individual flat jobs used for the nightlyflat (NOT
907
          all the flats, if multiple sets existed)
908
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
909
          (if currently processing that tile)
910
        * calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'badcol', 'psfnight'
911
          and 'nightlyflat'. Each key corresponds to a Table.Row or
912
          None. The table.Row() values are for the corresponding
913
          calibration job.
914
        * curtype, None, the obstype of the current job being run. Always None as first new job will define this.
915
        * lasttype, str or None, the obstype of the last individual exposure row to be processed.
916
        * curtile, None, the tileid of the current job (if science). Otherwise None. Always None as first
917
          new job will define this.
918
        * lasttile, str or None, the tileid of the last job (if science). Otherwise None.
919
        * internal_id, int, an internal identifier unique to each job. Increments with each new job. This
920
          is the latest unassigned value.
921
    """
922
    log = get_logger()
×
923
    arcs, flats, sciences = [], [], []
×
NEW
924
    calibjobs = {'nightlybias': None, 'ccdcalib': None, 'badcol': None,
×
925
                 'psfnight': None, 'nightlyflat': None, 'linkcal': None,
926
                 'completed': dict()}
927
    curtype,lasttype = None,None
×
928
    curtile,lasttile = None,None
×
929

930
    if len(ptable) > 0:
×
931
        prow = ptable[-1]
×
932
        internal_id = int(prow['INTID'])+1
×
933
        lasttype,lasttile = get_type_and_tile(ptable[-1])
×
934
        jobtypes = ptable['JOBDESC']
×
935

936
        if 'nightlybias' in jobtypes:
×
937
            calibjobs['nightlybias'] = table_row_to_dict(ptable[jobtypes=='nightlybias'][0])
×
938
            log.info("Located nightlybias job in exposure table: {}".format(calibjobs['nightlybias']))
×
939

940
        if 'ccdcalib' in jobtypes:
×
941
            calibjobs['ccdcalib'] = table_row_to_dict(ptable[jobtypes=='ccdcalib'][0])
×
942
            log.info("Located ccdcalib job in exposure table: {}".format(calibjobs['ccdcalib']))
×
943

944
        if 'psfnight' in jobtypes:
×
945
            calibjobs['psfnight'] = table_row_to_dict(ptable[jobtypes=='psfnight'][0])
×
946
            log.info("Located joint fit psfnight job in exposure table: {}".format(calibjobs['psfnight']))
×
947
        elif lasttype == 'arc':
×
948
            seqnum = 10
×
949
            for row in ptable[::-1]:
×
950
                erow = etable[etable['EXPID']==row['EXPID'][0]]
×
951
                if row['OBSTYPE'].lower() == 'arc' and int(erow['SEQNUM'])<seqnum:
×
952
                    arcs.append(table_row_to_dict(row))
×
953
                    seqnum = int(erow['SEQNUM'])
×
954
                else:
955
                    break
×
956
            ## Because we work backword to fill in, we need to reverse them to get chronological order back
957
            arcs = arcs[::-1]
×
958

959
        if 'nightlyflat' in jobtypes:
×
960
            calibjobs['nightlyflat'] = table_row_to_dict(ptable[jobtypes=='nightlyflat'][0])
×
961
            log.info("Located joint fit nightlyflat job in exposure table: {}".format(calibjobs['nightlyflat']))
×
962
        elif lasttype == 'flat':
×
963
            for row in ptable[::-1]:
×
964
                erow = etable[etable['EXPID']==row['EXPID'][0]]
×
965
                if row['OBSTYPE'].lower() == 'flat' and int(erow['SEQTOT']) < 5:
×
966
                    if float(erow['EXPTIME']) > 100.:
×
967
                        flats.append(table_row_to_dict(row))
×
968
                else:
969
                    break
×
970
            flats = flats[::-1]
×
971

972
        if lasttype.lower() == 'science':
×
973
            for row in ptable[::-1]:
×
974
                if row['OBSTYPE'].lower() == 'science' and row['TILEID'] == lasttile and \
×
975
                   row['JOBDESC'] == 'prestdstar' and row['LASTSTEP'] != 'skysub':
976
                    sciences.append(table_row_to_dict(row))
×
977
                else:
978
                    break
×
979
            sciences = sciences[::-1]
×
980
    else:
981
        internal_id = night_to_starting_iid(night)
×
982

983
    return arcs,flats,sciences, \
×
984
           calibjobs, \
985
           curtype, lasttype, \
986
           curtile, lasttile,\
987
           internal_id
988

989
def generate_calibration_dict(ptable, files_to_link=None):
1✔
990
    """
991
    This takes in a processing table and regenerates the working memory calibration
992
    dictionary for dependency tracking. Used by the daily processing to define 
993
    most of its state-ful variables into working memory.
994
    If the processing table is empty, these are simply declared and returned for use.
995
    If the code had previously run and exited (or crashed), however, this will all the code to
996
    re-establish itself by redefining these values.
997

998
    Args:
999
        ptable, Table, Processing table of all exposures that have been processed.
1000
        files_to_link, set, Set of filenames that the linkcal job will link.
1001

1002
    Returns:
1003
        calibjobs, dict. Dictionary containing 'nightlybias', 'badcol', 'ccdcalib',
1004
            'psfnight', 'nightlyflat', 'linkcal', and 'completed'. Each key corresponds to a
1005
            Table.Row or None. The table.Row() values are for the corresponding
1006
            calibration job.
1007
    """
1008
    log = get_logger()
1✔
1009
    job_to_file_map = get_jobdesc_to_file_map()
1✔
1010
    calibjobs = {'nightlybias': None, 'ccdcalib': None, 'badcol': None,
1✔
1011
                 'psfnight': None, 'nightlyflat': None, 'linkcal': None,
1012
                 'completed': dict()}
1013
    ptable_jobtypes = ptable['JOBDESC']
1✔
1014

1015
    for jobtype in calibjobs.keys():
1✔
1016
        calibjobs["completed"][jobtype] = False
1✔
1017
        if jobtype in ptable_jobtypes:
1✔
NEW
1018
            calibjobs[jobtype] = table_row_to_dict(ptable[ptable_jobtypes==jobtype][0])
×
NEW
1019
            log.info(f"Located {jobtype} job in exposure table: {calibjobs[jobtype]}")
×
NEW
1020
            calibjobs["completed"][jobtype] = True
×
1021

1022
    if calibjobs["completed"]['linkcal'] and files_to_link is not None:
1✔
NEW
1023
        calibjobs = update_calibjobs_with_linking(calibjobs, files_to_link)
×
1024

1025
    return calibjobs
1✔
1026

1027

1028
def update_calibjobs_with_linking(calibjobs, files_to_link):
1✔
1029
    """
1030
    This takes in a dictionary summarizing the calibration jobs and updates it
1031
    based on the files_to_link, which are assumed to have already been linked
1032
    such that those files already exist on disk and don't need ot be generated.
1033

1034
    Parameters
1035
    ----------
1036
        calibjobs: dict
1037
            Dictionary containing "nightlybias", "badcol", "ccdcalib",
1038
            "psfnight", "nightlyflat", "linkcal", and "completed". Each key corresponds to a
1039
            Table.Row or None. The table.Row() values are for the corresponding
1040
            calibration job.
1041
        files_to_link: set
1042
            Set of filenames that the linkcal job will link.
1043

1044
    Returns
1045
    -------
1046
        calibjobs, dict
1047
            Dictionary containing 'nightlybias', 'badcol', 'ccdcalib',
1048
            'psfnight', 'nightlyflat', 'linkcal', and 'completed'. Each key corresponds to a
1049
            Table.Row or None. The table.Row() values are for the corresponding
1050
            calibration job.
1051
    """
1052
    log = get_logger()
1✔
1053
    
1054
    file_job_map = get_file_to_jobdesc_map()
1✔
1055
    for fil in files_to_link:
1✔
1056
        if fil in file_job_map:
1✔
1057
            calibjobs['completed'][file_job_map[fil]] = True
1✔
1058
        elif fil in ['biasnight', 'badcolumns', 'ctecorrnight']:
1✔
1059
            continue
1✔
1060
        else:
NEW
1061
            err = f"Filetype {fil} doesn't map to a known job description: "
×
NEW
1062
            err += f"{file_job_map=}"
×
NEW
1063
            log.error(err)
×
NEW
1064
            raise ValueError(err)
×
1065
        
1066
    if 'biasnight' in files_to_link and 'badcolumns' in files_to_link \
1✔
1067
            and 'ctecorrnight' in files_to_link:
1068
        calibjobs['completed']['ccdcalib'] = True
1✔
1069

1070
    return calibjobs
1✔
1071

1072
def all_calibs_submitted(completed):
1✔
1073
    """
1074
    Function that returns the boolean logic to determine if the necessary
1075
    calibration jobs have been submitted for calibration.
1076

1077
    Args:
1078
        completed, dict, Dictionary with keys corresponding to the calibration job descriptions and values of True or False.
1079

1080
    Returns:
1081
        bool, True if all necessary calibrations have been submitted or handled, False otherwise.
1082
    """
1083
    ccdlevel_completed = (completed['nightlybias'] or completed['badcol']
1✔
1084
                         or completed['ccdcalib'])
1085
    return ccdlevel_completed and completed['psfnight'] and completed['nightlyflat']
1✔
1086

1087
def update_and_recurvsively_submit(proc_table, submits=0, resubmission_states=None,
1✔
1088
                                   ptab_name=None, dry_run=0,reservation=None):
1089
    """
1090
    Given an processing table, this loops over job rows and resubmits failed jobs (as defined by resubmission_states).
1091
    Before submitting a job, it checks the dependencies for failures. If a dependency needs to be resubmitted, it recursively
1092
    follows dependencies until it finds the first job without a failed dependency and resubmits that. Then resubmits the
1093
    other jobs with the new Slurm jobID's for proper dependency coordination within Slurm.
1094

1095
    Args:
1096
        proc_table, Table, the processing table with a row per job.
1097
        submits, int, the number of submissions made to the queue. Used for saving files and in not overloading the scheduler.
1098
        resubmission_states, list or array of strings, each element should be a capitalized string corresponding to a
1099
            possible Slurm scheduler state, where you wish for jobs with that
1100
            outcome to be resubmitted
1101
        ptab_name, str, the full pathname where the processing table should be saved.
1102
        dry_run, int, If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1103
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1104
            for testing as though scripts are being submitted. Default is 0 (false).
1105
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
1106

1107
    Returns:
1108
        tuple: A tuple containing:
1109

1110
        * proc_table: Table, a table with the same rows as the input except that Slurm and jobid relevant columns have
1111
          been updated for those jobs that needed to be resubmitted.
1112
        * submits: int, the number of submissions made to the queue. This is incremented from the input submits, so it is
1113
          the number of submissions made from this function call plus the input submits value.
1114

1115
    Note:
1116
        This modifies the inputs of both proc_table and submits and returns them.
1117
    """
1118
    log = get_logger()
×
1119
    if resubmission_states is None:
×
1120
        resubmission_states = get_resubmission_states()
×
1121
    log.info(f"Resubmitting jobs with current states in the following: {resubmission_states}")
×
1122
    proc_table = update_from_queue(proc_table, dry_run=False)
×
1123
    log.info("Updated processing table queue information:")
×
1124
    cols = ['INTID', 'INT_DEP_IDS', 'EXPID', 'TILEID',
×
1125
            'OBSTYPE', 'JOBDESC', 'LATEST_QID', 'STATUS']
1126
    print(np.array(cols))
×
1127
    for row in proc_table:
×
1128
        print(np.array(row[cols]))
×
1129
    print("\n")
×
1130
    id_to_row_map = {row['INTID']: rown for rown, row in enumerate(proc_table)}
×
1131
    for rown in range(len(proc_table)):
×
1132
        if proc_table['STATUS'][rown] in resubmission_states:
×
1133
            proc_table, submits = recursive_submit_failed(rown, proc_table, submits,
×
1134
                                                          id_to_row_map, ptab_name,
1135
                                                          resubmission_states,
1136
                                                          reservation, dry_run)
1137
    return proc_table, submits
×
1138

1139
def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name=None,
1✔
1140
                            resubmission_states=None, reservation=None, dry_run=0):
1141
    """
1142
    Given a row of a processing table and the full processing table, this resubmits the given job.
1143
    Before submitting a job, it checks the dependencies for failures in the processing table. If a dependency needs to
1144
    be resubmitted, it recursively follows dependencies until it finds the first job without a failed dependency and
1145
    resubmits that. Then resubmits the other jobs with the new Slurm jobID's for proper dependency coordination within Slurm.
1146

1147
    Args:
1148
        rown, Table.Row, the row of the processing table that you want to resubmit.
1149
        proc_table, Table, the processing table with a row per job.
1150
        submits, int, the number of submissions made to the queue. Used for saving files and in not overloading the scheduler.
1151
        id_to_row_map, dict, lookup dictionary where the keys are internal ids (INTID's) and the values are the row position
1152
            in the processing table.
1153
        ptab_name, str, the full pathname where the processing table should be saved.
1154
        resubmission_states, list or array of strings, each element should be a capitalized string corresponding to a
1155
            possible Slurm scheduler state, where you wish for jobs with that
1156
            outcome to be resubmitted
1157
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
1158
        dry_run, int, If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1159
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1160
            for testing as though scripts are being submitted. Default is 0 (false).
1161

1162
    Returns:
1163
        tuple: A tuple containing:
1164

1165
        * proc_table: Table, a table with the same rows as the input except that Slurm and jobid relevant columns have
1166
          been updated for those jobs that needed to be resubmitted.
1167
        * submits: int, the number of submissions made to the queue. This is incremented from the input submits, so it is
1168
          the number of submissions made from this function call plus the input submits value.
1169

1170
    Note:
1171
        This modifies the inputs of both proc_table and submits and returns them.
1172
    """
1173
    log = get_logger()
×
1174
    row = proc_table[rown]
×
1175
    log.info(f"Identified row {row['INTID']} as needing resubmission.")
×
1176
    log.info(f"{row['INTID']}: Expid(s): {row['EXPID']}  Job: {row['JOBDESC']}")
×
1177
    if resubmission_states is None:
×
1178
        resubmission_states = get_resubmission_states()
×
1179
    ideps = proc_table['INT_DEP_IDS'][rown]
×
1180
    if ideps is None:
×
1181
        proc_table['LATEST_DEP_QID'][rown] = np.ndarray(shape=0).astype(int)
×
1182
    else:
1183
        all_valid_states = list(resubmission_states.copy())
×
1184
        all_valid_states.extend(['RUNNING','PENDING','SUBMITTED','COMPLETED'])
×
1185
        for idep in np.sort(np.atleast_1d(ideps)):
×
NEW
1186
            if idep not in id_to_row_map and idep // 1000 != row['INTID'] // 1000:
×
NEW
1187
                log.warning(f"Internal ID: {idep} not in id_to_row_map. "
×
1188
                            + "This is expected since it's from another day. "
1189
                            + f" This dependency will not be checked or "
1190
                            + "resubmitted")
NEW
1191
                continue
×
1192
            if proc_table['STATUS'][id_to_row_map[idep]] not in all_valid_states:
×
1193
                log.warning(f"Proc INTID: {proc_table['INTID'][rown]} depended on" +
×
1194
                            f" INTID {proc_table['INTID'][id_to_row_map[idep]]}" +
1195
                            f" but that exposure has state" +
1196
                            f" {proc_table['STATUS'][id_to_row_map[idep]]} that" +
1197
                            f" isn't in the list of resubmission states." +
1198
                            f" Exiting this job's resubmission attempt.")
1199
                proc_table['STATUS'][rown] = "DEP_NOT_SUBD"
×
1200
                return proc_table, submits
×
1201
        qdeps = []
×
1202
        for idep in np.sort(np.atleast_1d(ideps)):
×
NEW
1203
            if idep not in id_to_row_map and idep // 1000 != row['INTID'] // 1000:
×
NEW
1204
                log.warning(f"Internal ID: {idep} not in id_to_row_map. "
×
1205
                            + "This is expected since it's from another day. "
1206
                            + f" This dependency will not be checked or "
1207
                            + "resubmitted")
NEW
1208
                continue
×
1209
            if proc_table['STATUS'][id_to_row_map[idep]] in resubmission_states:
×
1210
                proc_table, submits = recursive_submit_failed(id_to_row_map[idep],
×
1211
                                                              proc_table, submits,
1212
                                                              id_to_row_map,
1213
                                                              reservation=reservation,
1214
                                                              dry_run=dry_run)
1215
            qdeps.append(proc_table['LATEST_QID'][id_to_row_map[idep]])
×
1216

1217
        qdeps = np.atleast_1d(qdeps)
×
1218
        if len(qdeps) > 0:
×
1219
            proc_table['LATEST_DEP_QID'][rown] = qdeps
×
1220
        else:
1221
            log.error(f"number of qdeps should be 1 or more: Rown {rown}, ideps {ideps}")
×
1222

1223
    proc_table[rown] = submit_batch_script(proc_table[rown], reservation=reservation,
×
1224
                                           strictly_successful=True, dry_run=dry_run)
1225
    submits += 1
×
1226

1227
    if not dry_run:
×
1228
        sleep_and_report(1, message_suffix=f"after submitting job to queue")
×
1229
        if submits % 10 == 0:
×
1230
            if ptab_name is None:
×
1231
                write_table(proc_table, tabletype='processing', overwrite=True)
×
1232
            else:
1233
                write_table(proc_table, tablename=ptab_name, overwrite=True)
×
1234
            sleep_and_report(2, message_suffix=f"after writing to disk")
×
1235
        if submits % 100 == 0:
×
1236
            proc_table = update_from_queue(proc_table)
×
1237
            if ptab_name is None:
×
1238
                write_table(proc_table, tabletype='processing', overwrite=True)
×
1239
            else:
1240
                write_table(proc_table, tablename=ptab_name, overwrite=True)
×
1241
            sleep_and_report(10, message_suffix=f"after updating queue and writing to disk")
×
1242
    return proc_table, submits
×
1243

1244

1245
#########################################
1246
########     Joint fit     ##############
1247
#########################################
1248
def joint_fit(ptable, prows, internal_id, queue, reservation, descriptor, z_submit_types=None,
1✔
1249
              dry_run=0, strictly_successful=False, check_for_outputs=True, resubmit_partial_complete=True,
1250
              system_name=None):
1251
    """
1252
    DEPRECATED
1253
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1254
    joint fitting job given by descriptor. If the joint fitting job is standard star fitting, the post standard star fits
1255
    for all the individual exposures also created and submitted. The returned ptable has all of these rows added to the
1256
    table given as input.
1257

1258
    Args:
1259
        ptable (Table): The processing table where each row is a processed job.
1260
        prows (list or array of dict): The rows corresponding to the individual exposure jobs that are
1261
            inputs to the joint fit.
1262
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1263
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1264
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1265
        descriptor (str): Description of the joint fitting job. Can either be 'science' or 'stdstarfit', 'arc' or 'psfnight',
1266
            or 'flat' or 'nightlyflat'.
1267
        z_submit_types (list of str, optional): The "group" types of redshifts that should be submitted with each
1268
            exposure. If not specified or None, then no redshifts are submitted.
1269
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1270
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1271
            for testing as though scripts are being submitted. Default is 0 (false).
1272
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1273
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1274
            than failing completely from failed calibrations. Default is False.
1275
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1276
            data products for the script being submitted. If all files exist and this is True,
1277
            then the script will not be submitted. If some files exist and this is True, only the
1278
            subset of the cameras without the final data products will be generated and submitted.
1279
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1280
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1281
            remaining cameras not found to exist.
1282
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1283

1284
    Returns:
1285
        tuple: A tuple containing:
1286

1287
        * ptable, Table. The same processing table as input except with added rows for the joint fit job and, in the case
1288
          of a stdstarfit, the poststdstar science exposure jobs.
1289
        * joint_prow, dict. Row of a processing table corresponding to the joint fit job.
1290
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1291
    """
1292
    log = get_logger()
×
1293
    if len(prows) < 1:
×
1294
        return ptable, None, internal_id
×
1295

1296
    if descriptor is None:
×
1297
        return ptable, None
×
1298
    elif descriptor == 'arc':
×
1299
        descriptor = 'psfnight'
×
1300
    elif descriptor == 'flat':
×
1301
        descriptor = 'nightlyflat'
×
1302
    elif descriptor == 'science':
×
1303
        if z_submit_types is None or len(z_submit_types) == 0:
×
1304
            descriptor = 'stdstarfit'
×
1305

1306
    if descriptor not in ['psfnight', 'nightlyflat', 'science','stdstarfit']:
×
1307
        return ptable, None, internal_id
×
1308

1309
    log.info(" ")
×
1310
    log.info(f"Joint fit criteria found. Running {descriptor}.\n")
×
1311

1312
    if descriptor == 'science':
×
NEW
1313
        joint_prow, internal_id = make_joint_prow(prows, descriptor='stdstarfit', internal_id=internal_id)
×
1314
    else:
NEW
1315
        joint_prow, internal_id = make_joint_prow(prows, descriptor=descriptor, internal_id=internal_id)
×
1316
    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1317
                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1318
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1319
    ptable.add_row(joint_prow)
×
1320

1321
    if descriptor in ['science','stdstarfit']:
×
1322
        if descriptor == 'science':
×
1323
            zprows = []
×
1324
        log.info(" ")
×
1325
        log.info(f"Submitting individual science exposures now that joint fitting of standard stars is submitted.\n")
×
1326
        for row in prows:
×
1327
            if row['LASTSTEP'] == 'stdstarfit':
×
1328
                continue
×
1329
            row['JOBDESC'] = 'poststdstar'
×
1330

1331
            # poststdstar job can't process cameras not included in its stdstar joint fit
1332
            stdcamword = joint_prow['PROCCAMWORD']
×
1333
            thiscamword = row['PROCCAMWORD']
×
1334
            proccamword = camword_intersection([stdcamword, thiscamword])
×
1335
            if proccamword != thiscamword:
×
1336
                dropcams = difference_camwords(thiscamword, proccamword)
×
1337
                assert dropcams != ''  #- i.e. if they differ, we should be dropping something
×
1338
                log.warning(f"Dropping exp {row['EXPID']} poststdstar cameras {dropcams} since they weren't included in stdstar fit {stdcamword}")
×
1339
                row['PROCCAMWORD'] = proccamword
×
1340

1341
            row['INTID'] = internal_id
×
1342
            internal_id += 1
×
1343
            row['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
×
1344
            row = assign_dependency(row, joint_prow)
×
1345
            row = create_and_submit(row, queue=queue, reservation=reservation, dry_run=dry_run,
×
1346
                                    strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1347
                                    resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1348
            ptable.add_row(row)
×
1349
            if descriptor == 'science' and row['LASTSTEP'] == 'all':
×
1350
                zprows.append(row)
×
1351

1352
    ## Now run redshifts
1353
    if descriptor == 'science' and len(zprows) > 0 and z_submit_types is not None:
×
1354
        prow_selection = (  (ptable['OBSTYPE'] == 'science')
×
1355
                          & (ptable['LASTSTEP'] == 'all')
1356
                          & (ptable['JOBDESC'] == 'poststdstar')
1357
                          & (ptable['TILEID'] == int(zprows[0]['TILEID'])) )
1358
        nightly_zprows = []
×
1359
        if np.sum(prow_selection) == len(zprows):
×
1360
            nightly_zprows = zprows.copy()
×
1361
        else:
1362
            for prow in ptable[prow_selection]:
×
1363
                nightly_zprows.append(table_row_to_dict(prow))
×
1364

1365
        for zsubtype in z_submit_types:
×
1366
            if zsubtype == 'perexp':
×
1367
                for zprow in zprows:
×
1368
                    log.info(" ")
×
1369
                    log.info(f"Submitting redshift fit of type {zsubtype} for TILEID {zprow['TILEID']} and EXPID {zprow['EXPID']}.\n")
×
NEW
1370
                    joint_prow, internal_id = make_joint_prow([zprow], descriptor=zsubtype, internal_id=internal_id)
×
1371
                    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1372
                                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1373
                                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1374
                    ptable.add_row(joint_prow)
×
1375
            else:
1376
                log.info(" ")
×
1377
                log.info(f"Submitting joint redshift fits of type {zsubtype} for TILEID {nightly_zprows[0]['TILEID']}.")
×
1378
                expids = [prow['EXPID'][0] for prow in nightly_zprows]
×
1379
                log.info(f"Expids: {expids}.\n")
×
NEW
1380
                joint_prow, internal_id = make_joint_prow(nightly_zprows, descriptor=zsubtype, internal_id=internal_id)
×
1381
                joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1382
                                               strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1383
                                               resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1384
                ptable.add_row(joint_prow)
×
1385

1386
    if descriptor in ['psfnight', 'nightlyflat']:
×
1387
        log.info(f"Setting the calibration exposures as calibrators in the processing table.\n")
×
1388
        ptable = set_calibrator_flag(prows, ptable)
×
1389

1390
    return ptable, joint_prow, internal_id
×
1391

1392
def joint_cal_fit(descriptor, ptable, prows, internal_id, queue, reservation,
1✔
1393
                  dry_run=0, strictly_successful=False, check_for_outputs=True,
1394
                  resubmit_partial_complete=True, system_name=None):
1395
    """
1396
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1397
    joint fitting job given by descriptor. If the joint fitting job is standard star fitting, the post standard star fits
1398
    for all the individual exposures also created and submitted. The returned ptable has all of these rows added to the
1399
    table given as input.
1400

1401
    Args:
1402
        descriptor (str): Description of the joint fitting job. Can either be 'science' or 'stdstarfit', 'arc' or 'psfnight',
1403
            or 'flat' or 'nightlyflat'.
1404
        prows (list or array of dict): The rows corresponding to the individual exposure jobs that are
1405
            inputs to the joint fit.
1406
        ptable (Table): The processing table where each row is a processed job.
1407
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1408
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1409
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1410
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1411
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1412
            for testing as though scripts are being submitted. Default is 0 (false).
1413
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1414
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1415
            than failing completely from failed calibrations. Default is False.
1416
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1417
            data products for the script being submitted. If all files exist and this is True,
1418
            then the script will not be submitted. If some files exist and this is True, only the
1419
            subset of the cameras without the final data products will be generated and submitted.
1420
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1421
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1422
            remaining cameras not found to exist.
1423
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1424

1425
    Returns:
1426
        tuple: A tuple containing:
1427

1428
        * ptable, Table. The same processing table as input except with added rows for the joint fit job and, in the case
1429
          of a stdstarfit, the poststdstar science exposure jobs.
1430
        * joint_prow, dict. Row of a processing table corresponding to the joint fit job.
1431
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1432
    """
NEW
1433
    log = get_logger()
×
NEW
1434
    if len(prows) < 1:
×
NEW
1435
        return ptable, None, internal_id
×
1436

NEW
1437
    if descriptor is None:
×
NEW
1438
        return ptable, None
×
NEW
1439
    elif descriptor == 'arc':
×
NEW
1440
        descriptor = 'psfnight'
×
NEW
1441
    elif descriptor == 'flat':
×
NEW
1442
        descriptor = 'nightlyflat'
×
1443

NEW
1444
    if descriptor not in ['psfnight', 'nightlyflat']:
×
NEW
1445
        return ptable, None, internal_id
×
1446

NEW
1447
    log.info(" ")
×
NEW
1448
    log.info(f"Joint fit criteria found. Running {descriptor}.\n")
×
1449

NEW
1450
    joint_prow, internal_id = make_joint_prow(prows, descriptor=descriptor, internal_id=internal_id)
×
NEW
1451
    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1452
                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1453
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
NEW
1454
    ptable.add_row(joint_prow)
×
1455

NEW
1456
    if descriptor in ['psfnight', 'nightlyflat']:
×
NEW
1457
        log.info(f"Setting the calibration exposures as calibrators in the processing table.\n")
×
NEW
1458
        ptable = set_calibrator_flag(prows, ptable)
×
1459

NEW
1460
    return ptable, joint_prow, internal_id
×
1461

1462

1463
#########################################
1464
########     Redshifts     ##############
1465
#########################################
1466
def submit_redshifts(ptable, prows, tnight, internal_id, queue, reservation,
1✔
1467
              dry_run=0, strictly_successful=False,
1468
              check_for_outputs=True, resubmit_partial_complete=True,
1469
              z_submit_types=None, system_name=None):
1470
    """
1471
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1472
    tilenight job given by descriptor. The returned ptable has all of these rows added to the
1473
    table given as input.
1474

1475
    Args:
1476
        ptable (Table): The processing table where each row is a processed job.
1477
        prows (list or array of dict): Unsubmitted prestdstar jobs that are first steps in tilenight.
1478
        tnight (Table.Row): The processing table row of the tilenight job on which the redshifts depend.
1479
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1480
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1481
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1482
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1483
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1484
            for testing as though scripts are being submitted. Default is 0 (false).
1485
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1486
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1487
            than failing completely from failed calibrations. Default is False.
1488
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1489
            data products for the script being submitted. If all files exist and this is True,
1490
            then the script will not be submitted. If some files exist and this is True, only the
1491
            subset of the cameras without the final data products will be generated and submitted.
1492
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1493
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1494
            remaining cameras not found to exist.
1495
        z_submit_types (list of str): The "group" types of redshifts that should be submitted with each
1496
            exposure. If not specified or None, then no redshifts are submitted.
1497
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1498

1499
    Returns:
1500
        tuple: A tuple containing:
1501

1502
        * ptable, Table. The same processing table as input except with added rows for the joint fit job.
1503
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1504
    """
1505
    log = get_logger()
1✔
1506
    if len(prows) < 1 or z_submit_types == None:
1✔
1507
        return ptable, internal_id
1✔
1508

1509
    log.info(" ")
1✔
1510
    log.info(f"Running redshifts.\n")
1✔
1511

1512
    ## Now run redshifts
1513
    zprows = []
1✔
1514
    for row in prows:
1✔
1515
        if row['LASTSTEP'] == 'all':
1✔
1516
            zprows.append(row)
1✔
1517

1518
    if len(zprows) > 0:
1✔
1519
        for zsubtype in z_submit_types:
1✔
1520
            if zsubtype == 'perexp':
1✔
1521
                for zprow in zprows:
×
1522
                    log.info(" ")
×
1523
                    log.info(f"Submitting redshift fit of type {zsubtype} for TILEID {zprow['TILEID']} and EXPID {zprow['EXPID']}.\n")
×
1524
                    redshift_prow = make_redshift_prow([zprow], tnight, descriptor=zsubtype, internal_id=internal_id)
×
1525
                    internal_id += 1
×
1526
                    redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1527
                                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1528
                                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1529
                    ptable.add_row(redshift_prow)
×
1530
            else:
1531
                log.info(" ")
1✔
1532
                log.info(f"Submitting joint redshift fits of type {zsubtype} for TILEID {zprows[0]['TILEID']}.")
1✔
1533
                expids = [prow['EXPID'][0] for prow in zprows]
1✔
1534
                log.info(f"Expids: {expids}.\n")
1✔
1535
                redshift_prow = make_redshift_prow(zprows, tnight, descriptor=zsubtype, internal_id=internal_id)
1✔
1536
                internal_id += 1
1✔
1537
                redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
1✔
1538
                                               strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1539
                                               resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1540
                ptable.add_row(redshift_prow)
1✔
1541

1542
    return ptable, internal_id
1✔
1543

1544
#########################################
1545
########     Tilenight     ##############
1546
#########################################
1547
def submit_tilenight(ptable, prows, calibjobs, internal_id, queue, reservation,
1✔
1548
              dry_run=0, strictly_successful=False, resubmit_partial_complete=True,
1549
              system_name=None, use_specter=False, extra_job_args=None):
1550
    """
1551
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1552
    tilenight job given by descriptor. The returned ptable has all of these rows added to the
1553
    table given as input.
1554

1555
    Args:
1556
        ptable (Table): The processing table where each row is a processed job.
1557
        prows (list or array of dict): Unsubmitted prestdstar jobs that are first steps in tilenight.
1558
        calibjobs (dict): Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
1559
            and 'nightlyflat'. Each key corresponds to a Table.Row or
1560
            None. The table.Row() values are for the corresponding
1561
            calibration job.
1562
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1563
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1564
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1565
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1566
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1567
            for testing as though scripts are being submitted. Default is 0 (false).
1568
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1569
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1570
            than failing completely from failed calibrations. Default is False.
1571
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1572
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1573
            remaining cameras not found to exist.
1574
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1575
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
1576
        extra_job_args (dict): Dictionary with key-value pairs that specify additional
1577
            information used for a specific type of job. Examples include
1578
            laststeps for for tilenight, etc.
1579

1580
    Returns:
1581
        tuple: A tuple containing:
1582

1583
        * ptable, Table. The same processing table as input except with added rows for the joint fit job.
1584
        * tnight_prow, dict. Row of a processing table corresponding to the tilenight job.
1585
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1586
    """
1587
    log = get_logger()
1✔
1588
    if len(prows) < 1:
1✔
1589
        return ptable, None, internal_id
×
1590

1591
    log.info(" ")
1✔
1592
    log.info(f"Running tilenight.\n")
1✔
1593

1594
    tnight_prow = make_tnight_prow(prows, calibjobs, internal_id=internal_id)
1✔
1595
    internal_id += 1
1✔
1596
    tnight_prow = create_and_submit(tnight_prow, queue=queue, reservation=reservation, dry_run=dry_run,
1✔
1597
                                   strictly_successful=strictly_successful, check_for_outputs=False,
1598
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name,
1599
                                   use_specter=use_specter, extra_job_args=extra_job_args)
1600
    ptable.add_row(tnight_prow)
1✔
1601

1602
    return ptable, tnight_prow, internal_id
1✔
1603

1604
## wrapper functions for joint fitting
1605
def science_joint_fit(ptable, sciences, internal_id, queue='realtime', reservation=None,
1✔
1606
                      z_submit_types=None, dry_run=0, strictly_successful=False,
1607
                      check_for_outputs=True, resubmit_partial_complete=True,
1608
                      system_name=None):
1609
    """
1610
    Wrapper function for desiproc.workflow.processing.joint_fit specific to the stdstarfit joint fit and redshift fitting.
1611

1612
    All variables are the same except::
1613

1614
        Arg 'sciences' is mapped to the prows argument of joint_fit.
1615
        The joint_fit argument descriptor is pre-defined as 'science'.
1616
    """
1617
    return joint_fit(ptable=ptable, prows=sciences, internal_id=internal_id, queue=queue, reservation=reservation,
×
1618
                     descriptor='science', z_submit_types=z_submit_types, dry_run=dry_run,
1619
                     strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1620
                     resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1621

1622

1623
def flat_joint_fit(ptable, flats, internal_id, queue='realtime',
1✔
1624
                   reservation=None, dry_run=0, strictly_successful=False,
1625
                   check_for_outputs=True, resubmit_partial_complete=True,
1626
                   system_name=None):
1627
    """
1628
    Wrapper function for desiproc.workflow.processing.joint_fit specific to the nightlyflat joint fit.
1629

1630
    All variables are the same except::
1631

1632
        Arg 'flats' is mapped to the prows argument of joint_fit.
1633
        The joint_fit argument descriptor is pre-defined as 'nightlyflat'.
1634
    """
1635
    return joint_fit(ptable=ptable, prows=flats, internal_id=internal_id, queue=queue, reservation=reservation,
×
1636
                     descriptor='nightlyflat', dry_run=dry_run, strictly_successful=strictly_successful,
1637
                     check_for_outputs=check_for_outputs, resubmit_partial_complete=resubmit_partial_complete,
1638
                     system_name=system_name)
1639

1640

1641
def arc_joint_fit(ptable, arcs, internal_id, queue='realtime',
1✔
1642
                  reservation=None, dry_run=0, strictly_successful=False,
1643
                  check_for_outputs=True, resubmit_partial_complete=True,
1644
                  system_name=None):
1645
    """
1646
    Wrapper function for desiproc.workflow.processing.joint_fit specific to the psfnight joint fit.
1647

1648
    All variables are the same except::
1649

1650
        Arg 'arcs' is mapped to the prows argument of joint_fit.
1651
        The joint_fit argument descriptor is pre-defined as 'psfnight'.
1652
    """
1653
    return joint_fit(ptable=ptable, prows=arcs, internal_id=internal_id, queue=queue, reservation=reservation,
×
1654
                     descriptor='psfnight', dry_run=dry_run, strictly_successful=strictly_successful,
1655
                     check_for_outputs=check_for_outputs, resubmit_partial_complete=resubmit_partial_complete,
1656
                     system_name=system_name)
1657

1658
def make_joint_prow(prows, descriptor, internal_id):
1✔
1659
    """
1660
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
1661
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
1662
    input prows).
1663

1664
    Args:
1665
        prows, list or array of dicts. The rows corresponding to the individual exposure jobs that are
1666
            inputs to the joint fit.
1667
        descriptor, str. Description of the joint fitting job. Can either be 'stdstarfit', 'psfnight', or 'nightlyflat'.
1668
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1669

1670
    Returns:
1671
        dict: Row of a processing table corresponding to the joint fit job.
1672
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1673
    """
1674
    first_row = table_row_to_dict(prows[0])
1✔
1675
    joint_prow = first_row.copy()
1✔
1676

1677
    joint_prow['INTID'] = internal_id
1✔
1678
    internal_id += 1
1✔
1679
    joint_prow['JOBDESC'] = descriptor
1✔
1680
    joint_prow['LATEST_QID'] = -99
1✔
1681
    joint_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
1✔
1682
    joint_prow['SUBMIT_DATE'] = -99
1✔
1683
    joint_prow['STATUS'] = 'U'
1✔
1684
    joint_prow['SCRIPTNAME'] = ''
1✔
1685
    joint_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int)
1✔
1686

1687
    ## Assign the PROCCAMWORD based on the descriptor and the input exposures
1688
    if descriptor == 'stdstarfit':
1✔
1689
        pcamwords = [prow['PROCCAMWORD'] for prow in prows]
×
1690
        joint_prow['PROCCAMWORD'] = camword_union(pcamwords,
×
1691
                                                  full_spectros_only=True)
1692
    else:
1693
        ## For arcs and flats, a BADAMP takes out the camera, so remove those
1694
        ## cameras from the proccamword
1695
        pcamwords = []
1✔
1696
        for prow in prows:
1✔
1697
            if len(prow['BADAMPS']) > 0:
1✔
1698
                badcams = []
×
1699
                for (camera, petal, amplifier) in parse_badamps(prow['BADAMPS']):
×
1700
                    badcams.append(f'{camera}{petal}')
×
1701
                badampcamword = create_camword(list(set(badcams)))
×
1702
                pcamword = difference_camwords(prow['PROCCAMWORD'], badampcamword)
×
1703
            else:
1704
                pcamword = prow['PROCCAMWORD']
1✔
1705
            pcamwords.append(pcamword)
1✔
1706

1707
        ## For flats we want any camera that exists in all 12 exposures
1708
        ## For arcs we want any camera that exists in at least 3 exposures
1709
        if descriptor == 'nightlyflat':
1✔
1710
            joint_prow['PROCCAMWORD'] = camword_intersection(pcamwords,
1✔
1711
                                                             full_spectros_only=False)
1712
        elif descriptor == 'psfnight':
1✔
1713
            ## Count number of exposures each camera is present for
1714
            camcheck = {}
1✔
1715
            for camword in pcamwords:
1✔
1716
                for cam in decode_camword(camword):
1✔
1717
                    if cam in camcheck:
1✔
1718
                        camcheck[cam] += 1
1✔
1719
                    else:
1720
                        camcheck[cam] = 1
1✔
1721
            ## if exists in 3 or more exposures, then include it
1722
            goodcams = []
1✔
1723
            for cam,camcount in camcheck.items():
1✔
1724
                if camcount >= 3:
1✔
1725
                    goodcams.append(cam)
1✔
1726
            joint_prow['PROCCAMWORD'] = create_camword(goodcams)
1✔
1727

1728
    joint_prow = assign_dependency(joint_prow, dependency=prows)
1✔
1729
    return joint_prow, internal_id
1✔
1730

1731
def make_exposure_prow(erow, int_id, calibjobs, jobdesc=None):
1✔
1732
    prow = erow_to_prow(erow)
1✔
1733
    prow['INTID'] = int_id
1✔
1734
    int_id += 1
1✔
1735
    if jobdesc is None:
1✔
1736
        prow['JOBDESC'] = prow['OBSTYPE']
1✔
1737
    else:
1738
        prow['JOBDESC'] = jobdesc
1✔
1739
    prow = define_and_assign_dependency(prow, calibjobs)
1✔
1740
    return prow, int_id
1✔
1741

1742
def make_tnight_prow(prows, calibjobs, internal_id):
1✔
1743
    """
1744
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
1745
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
1746
    input prows).
1747

1748
    Args:
1749
        prows, list or array of dicts. Unsumbitted rows corresponding to the individual prestdstar jobs that are
1750
            the first steps of tilenight.
1751
        calibjobs, dict. Dictionary containing keys that each corresponds to a Table.Row or
1752
            None, with each table.Row() value corresponding to a calibration job
1753
            on which the tilenight job depends.
1754
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1755

1756
    Returns:
1757
        dict: Row of a processing table corresponding to the tilenight job.
1758
    """
1759
    first_row = table_row_to_dict(prows[0])
1✔
1760
    joint_prow = first_row.copy()
1✔
1761

1762
    joint_prow['INTID'] = internal_id
1✔
1763
    joint_prow['JOBDESC'] = 'tilenight'
1✔
1764
    joint_prow['LATEST_QID'] = -99
1✔
1765
    joint_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
1✔
1766
    joint_prow['SUBMIT_DATE'] = -99
1✔
1767
    joint_prow['STATUS'] = 'U'
1✔
1768
    joint_prow['SCRIPTNAME'] = ''
1✔
1769
    joint_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int)
1✔
1770

1771
    joint_prow = define_and_assign_dependency(joint_prow,calibjobs,use_tilenight=True)
1✔
1772

1773
    return joint_prow
1✔
1774

1775
def make_redshift_prow(prows, tnight, descriptor, internal_id):
1✔
1776
    """
1777
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
1778
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
1779
    input prows).
1780

1781
    Args:
1782
        prows, list or array of dicts. Unsumbitted rows corresponding to the individual prestdstar jobs that are
1783
            the first steps of tilenight.
1784
        tnight, Table.Row object. Row corresponding to the tilenight job on which the redshift job depends.
1785
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1786

1787
    Returns:
1788
        dict: Row of a processing table corresponding to the tilenight job.
1789
    """
1790
    first_row = table_row_to_dict(prows[0])
1✔
1791
    redshift_prow = first_row.copy()
1✔
1792

1793
    redshift_prow['INTID'] = internal_id
1✔
1794
    redshift_prow['JOBDESC'] = descriptor
1✔
1795
    redshift_prow['LATEST_QID'] = -99
1✔
1796
    redshift_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
1✔
1797
    redshift_prow['SUBMIT_DATE'] = -99
1✔
1798
    redshift_prow['STATUS'] = 'U'
1✔
1799
    redshift_prow['SCRIPTNAME'] = ''
1✔
1800
    redshift_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int)
1✔
1801

1802
    redshift_prow = assign_dependency(redshift_prow,dependency=tnight)
1✔
1803

1804
    return redshift_prow
1✔
1805

1806
def checkfor_and_submit_joint_job(ptable, arcs, flats, sciences, calibjobs,
1✔
1807
                                  lasttype, internal_id, z_submit_types=None, dry_run=0,
1808
                                  queue='realtime', reservation=None, strictly_successful=False,
1809
                                  check_for_outputs=True, resubmit_partial_complete=True,
1810
                                  system_name=None):
1811
    """
1812
    Takes all the state-ful data from daily processing and determines whether a joint fit needs to be submitted. Places
1813
    the decision criteria into a single function for easier maintainability over time. These are separate from the
1814
    new standard manifest*.json method of indicating a calibration sequence is complete. That is checked independently
1815
    elsewhere and doesn't interact with this.
1816

1817
    Args:
1818
        ptable (Table): Processing table of all exposures that have been processed.
1819
        arcs (list of dict): list of the individual arc jobs to be used for the psfnight (NOT all
1820
            the arcs, if multiple sets existed). May be empty if none identified yet.
1821
        flats (list of dict): list of the individual flat jobs to be used for the nightlyflat (NOT
1822
            all the flats, if multiple sets existed). May be empty if none identified yet.
1823
        sciences (list of dict): list of the most recent individual prestdstar science exposures
1824
            (if currently processing that tile). May be empty if none identified yet.
1825
        calibjobs (dict): Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
1826
            and 'nightlyflat'. Each key corresponds to a Table.Row or
1827
            None. The table.Row() values are for the corresponding
1828
            calibration job.
1829
        lasttype (str or None): the obstype of the last individual exposure row to be processed.
1830
        internal_id (int): an internal identifier unique to each job. Increments with each new job. This
1831
            is the smallest unassigned value.
1832
        z_submit_types (list of str): The "group" types of redshifts that should be submitted with each
1833
            exposure. If not specified or None, then no redshifts are submitted.
1834
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1835
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1836
            for testing as though scripts are being submitted. Default is 0 (false).
1837
        queue (str, optional): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1838
        reservation (str, optional): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1839
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1840
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1841
            than failing completely from failed calibrations. Default is False.
1842
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1843
            data products for the script being submitted. If all files exist and this is True,
1844
            then the script will not be submitted. If some files exist and this is True, only the
1845
            subset of the cameras without the final data products will be generated and submitted.
1846
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1847
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1848
            remaining cameras not found to exist.
1849
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, permutter-gpu
1850

1851
    Returns:
1852
        tuple: A tuple containing:
1853

1854
        * ptable, Table, Processing table of all exposures that have been processed.
1855
        * calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
1856
          and 'nightlyflat'. Each key corresponds to a Table.Row or
1857
          None. The table.Row() values are for the corresponding
1858
          calibration job.
1859
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
1860
          (if currently processing that tile). May be empty if none identified yet or
1861
          we just submitted them for processing.
1862
        * internal_id, int, if no job is submitted, this is the same as the input, otherwise it is incremented upward from
1863
          from the input such that it represents the smallest unused ID.
1864
    """
1865
    if lasttype == 'science' and len(sciences) > 0:
×
1866
        log = get_logger()
×
1867
        skysubonly = np.array([sci['LASTSTEP'] == 'skysub' for sci in sciences])
×
1868
        if np.all(skysubonly):
×
1869
            log.error("Identified all exposures in joint fitting request as skysub-only. Not submitting")
×
1870
            sciences = []
×
1871
            return ptable, calibjobs, sciences, internal_id
×
1872

1873
        if np.any(skysubonly):
×
1874
            log.error("Identified skysub-only exposures in joint fitting request")
×
1875
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
1876
            log.info("LASTSTEP's: {}".format([row['LASTSTEP'] for row in sciences]))
×
1877
            sciences = (np.array(sciences,dtype=object)[~skysubonly]).tolist()
×
1878
            log.info("Removed skysub only exposures in joint fitting:")
×
1879
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
1880
            log.info("LASTSTEP's: {}".format([row['LASTSTEP'] for row in sciences]))
×
1881

1882
        from collections import Counter
×
1883
        tiles = np.array([sci['TILEID'] for sci in sciences])
×
1884
        counts = Counter(tiles)
×
1885
        if len(counts.most_common()) > 1:
×
1886
            log.error("Identified more than one tile in a joint fitting request")
×
1887
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
1888
            log.info("Tileid's: {}".format(tiles))
×
1889
            log.info("Returning without joint fitting any of these exposures.")
×
1890
            # most_common, nmost_common = counts.most_common()[0]
1891
            # if most_common == -99:
1892
            #     most_common, nmost_common = counts.most_common()[1]
1893
            # log.warning(f"Given multiple tiles to jointly fit: {counts}. "+
1894
            #             "Only processing the most common non-default " +
1895
            #             f"tile: {most_common} with {nmost_common} exposures")
1896
            # sciences = (np.array(sciences,dtype=object)[tiles == most_common]).tolist()
1897
            # log.info("Tiles and exposure id's being submitted for joint fitting:")
1898
            # log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
1899
            # log.info("Tileid's: {}".format([row['TILEID'] for row in sciences]))
1900
            sciences = []
×
1901
            return ptable, calibjobs, sciences, internal_id
×
1902

1903
        ptable, tilejob, internal_id = science_joint_fit(ptable, sciences, internal_id, z_submit_types=z_submit_types,
×
1904
                                                         dry_run=dry_run, queue=queue, reservation=reservation,
1905
                                                         strictly_successful=strictly_successful,
1906
                                                         check_for_outputs=check_for_outputs,
1907
                                                         resubmit_partial_complete=resubmit_partial_complete,
1908
                                                         system_name=system_name)
1909
        if tilejob is not None:
×
1910
            sciences = []
×
1911

1912
    elif lasttype == 'flat' and calibjobs['nightlyflat'] is None and len(flats) == 12:
×
1913
        ## Note here we have an assumption about the number of expected flats being greater than 11
1914
        ptable, calibjobs['nightlyflat'], internal_id \
×
1915
            = flat_joint_fit(ptable, flats, internal_id, dry_run=dry_run, queue=queue,
1916
                             reservation=reservation, strictly_successful=strictly_successful,
1917
                             check_for_outputs=check_for_outputs,
1918
                             resubmit_partial_complete=resubmit_partial_complete,
1919
                             system_name=system_name
1920
                            )
1921

1922
    elif lasttype == 'arc' and calibjobs['psfnight'] is None and len(arcs) == 5:
×
1923
        ## Note here we have an assumption about the number of expected arcs being greater than 4
1924
        ptable, calibjobs['psfnight'], internal_id \
×
1925
            = arc_joint_fit(ptable, arcs, internal_id, dry_run=dry_run, queue=queue,
1926
                            reservation=reservation, strictly_successful=strictly_successful,
1927
                            check_for_outputs=check_for_outputs,
1928
                            resubmit_partial_complete=resubmit_partial_complete,
1929
                            system_name=system_name
1930
                            )
1931
    return ptable, calibjobs, sciences, internal_id
×
1932

1933
def submit_tilenight_and_redshifts(ptable, sciences, calibjobs, internal_id, dry_run=0,
1✔
1934
                                  queue='realtime', reservation=None, strictly_successful=False,
1935
                                  check_for_outputs=True, resubmit_partial_complete=True,
1936
                                  system_name=None,use_specter=False, extra_job_args=None):
1937
    """
1938
    Takes all the state-ful data from daily processing and determines whether a tilenight job needs to be submitted.
1939

1940
    Args:
1941
        ptable (Table): Processing table of all exposures that have been processed.
1942
        sciences (list of dict): list of the most recent individual prestdstar science exposures
1943
            (if currently processing that tile). May be empty if none identified yet.
1944
        internal_id (int): an internal identifier unique to each job. Increments with each new job. This
1945
            is the smallest unassigned value.
1946
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1947
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1948
            for testing as though scripts are being submitted. Default is 0 (false).
1949
        queue (str, optional): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1950
        reservation (str, optional): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1951
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1952
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1953
            than failing completely from failed calibrations. Default is False.
1954
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1955
            data products for the script being submitted. If all files exist and this is True,
1956
            then the script will not be submitted. If some files exist and this is True, only the
1957
            subset of the cameras without the final data products will be generated and submitted.
1958
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1959
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1960
            remaining cameras not found to exist.
1961
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, permutter-gpu
1962
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
1963
        extra_job_args (dict, optional): Dictionary with key-value pairs that specify additional
1964
            information used for a specific type of job. Examples include
1965
            laststeps for tilenight, z_submit_types for redshifts, etc.
1966

1967
    Returns:
1968
        tuple: A tuple containing:
1969

1970
        * ptable, Table, Processing table of all exposures that have been processed.
1971
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
1972
          (if currently processing that tile). May be empty if none identified yet or
1973
          we just submitted them for processing.
1974
        * internal_id, int, if no job is submitted, this is the same as the input, otherwise it is incremented upward from
1975
          from the input such that it represents the smallest unused ID.
1976
    """
1977
    ptable, tnight, internal_id = submit_tilenight(ptable, sciences, calibjobs, internal_id,
1✔
1978
                                             queue=queue, reservation=reservation,
1979
                                             dry_run=dry_run, strictly_successful=strictly_successful,
1980
                                             resubmit_partial_complete=resubmit_partial_complete,
1981
                                             system_name=system_name,use_specter=use_specter,
1982
                                             extra_job_args=extra_job_args)
1983

1984
    z_submit_types = None
1✔
1985
    if 'z_submit_types'  in extra_job_args:
1✔
1986
        z_submit_types = extra_job_args['z_submit_types']
1✔
1987
        
1988
    ptable, internal_id = submit_redshifts(ptable, sciences, tnight, internal_id,
1✔
1989
                                    queue=queue, reservation=reservation,
1990
                                    dry_run=dry_run, strictly_successful=strictly_successful,
1991
                                    check_for_outputs=check_for_outputs,
1992
                                    resubmit_partial_complete=resubmit_partial_complete,
1993
                                    z_submit_types=z_submit_types,
1994
                                    system_name=system_name)
1995

1996
    if tnight is not None:
1✔
1997
        sciences = []
1✔
1998

1999
    return ptable, sciences, internal_id
1✔
2000

2001
def set_calibrator_flag(prows, ptable):
1✔
2002
    """
2003
    Sets the "CALIBRATOR" column of a procesing table row to 1 (integer representation of True)
2004
     for all input rows. Used within joint fitting code to flag the exposures that were input
2005
     to the psfnight or nightlyflat for later reference.
2006

2007
    Args:
2008
        prows, list or array of Table.Rows or dicts. The rows corresponding to the individual exposure jobs that are
2009
            inputs to the joint fit.
2010
        ptable, Table. The processing table where each row is a processed job.
2011

2012
    Returns:
2013
        Table: The same processing table as input except with added rows for the joint fit job and, in the case
2014
        of a stdstarfit, the poststdstar science exposure jobs.
2015
    """
2016
    for prow in prows:
1✔
2017
        ptable['CALIBRATOR'][ptable['INTID'] == prow['INTID']] = 1
1✔
2018
    return ptable
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc