• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

desihub / desispec / 8335038359

18 Mar 2024 11:28PM UTC coverage: 28.093% (+3.0%) from 25.113%
8335038359

Pull #2187

github

akremin
fix typo in tilenight laststeps
Pull Request #2187: Introduce desi_proc_night to unify and simplify processing scripts

752 of 1122 new or added lines in 21 files covered. (67.02%)

1066 existing lines in 11 files now uncovered.

13160 of 46844 relevant lines covered (28.09%)

0.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.67
/py/desispec/workflow/processing.py
1
"""
2
desispec.workflow.processing
3
============================
4

5
"""
6
import sys, os, glob
1✔
7
import json
1✔
8
from astropy.io import fits
1✔
9
from astropy.table import Table, join
1✔
10
import numpy as np
1✔
11

12
import time, datetime
1✔
13
from collections import OrderedDict
1✔
14
import subprocess
1✔
15

16
from desispec.scripts.link_calibnight import derive_include_exclude
1✔
17
from desispec.scripts.tile_redshifts import generate_tile_redshift_scripts
1✔
18
from desispec.workflow.redshifts import get_ztile_script_pathname, \
1✔
19
                                        get_ztile_relpath, \
20
                                        get_ztile_script_suffix
21
from desispec.workflow.queue import get_resubmission_states, update_from_queue, queue_info_from_qids
1✔
22
from desispec.workflow.timing import what_night_is_it
1✔
23
from desispec.workflow.desi_proc_funcs import get_desi_proc_batch_file_pathname, \
1✔
24
    create_desi_proc_batch_script, \
25
    get_desi_proc_batch_file_path, \
26
    get_desi_proc_tilenight_batch_file_pathname, \
27
    create_desi_proc_tilenight_batch_script, create_linkcal_batch_script
28
from desispec.workflow.utils import pathjoin, sleep_and_report, \
1✔
29
    load_override_file
30
from desispec.workflow.tableio import write_table, load_table
1✔
31
from desispec.workflow.proctable import table_row_to_dict, erow_to_prow
1✔
32
from desiutil.log import get_logger
1✔
33

34
from desispec.io import findfile, specprod_root
1✔
35
from desispec.io.util import decode_camword, create_camword, \
1✔
36
    difference_camwords, \
37
    camword_to_spectros, camword_union, camword_intersection, parse_badamps
38

39

40
#################################################
41
############## Misc Functions ###################
42
#################################################
43
def night_to_starting_iid(night=None):
1✔
44
    """
45
    Creates an internal ID for a given night. The resulting integer is an 8 digit number.
46
    The digits are YYMMDDxxx where YY is the years since 2000, MM and DD are the month and day. xxx are 000,
47
    and are incremented for up to 1000 unique job ID's for a given night.
48

49
    Args:
50
        night (str or int): YYYYMMDD of the night to get the starting internal ID for.
51

52
    Returns:
53
        int: 9 digit number consisting of YYMMDD000. YY is years after 2000, MMDD is month and day.
54
        000 being the starting job number (0).
55
    """
56
    if night is None:
1✔
57
        night = what_night_is_it()
×
58
    night = int(night)
1✔
59
    internal_id = (night - 20000000) * 1000
1✔
60
    return internal_id
1✔
61

62
class ProcessingParams():
1✔
63
    def __init__(self, dry_run_level=0, queue='realtime',
1✔
64
                 reservation=None, strictly_successful=True,
65
                 check_for_outputs=True,
66
                 resubmit_partial_complete=True,
67
                 system_name='perlmutter', use_specter=True):
68

NEW
69
        self.dry_run_level = dry_run_level
×
NEW
70
        self.system_name = system_name
×
NEW
71
        self.queue = queue
×
NEW
72
        self.reservation = reservation
×
NEW
73
        self.strictly_successful = strictly_successful
×
NEW
74
        self.check_for_outputs = check_for_outputs
×
NEW
75
        self.resubmit_partial_complete = resubmit_partial_complete
×
NEW
76
        self.use_specter = use_specter
×
77

78
#################################################
79
############ Script Functions ###################
80
#################################################
81
def batch_script_name(prow):
1✔
82
    """
83
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, JOBDESC, PROCCAMWORD defined)
84
    and determines the script file pathname as defined by desi_proc's helper functions.
85

86
    Args:
87
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
88

89
    Returns:
90
        str: The complete pathname to the script file, as it is defined within the desi_proc ecosystem.
91
    """
92
    expids = prow['EXPID']
1✔
93
    if len(expids) == 0:
1✔
94
        expids = None
1✔
95
    if prow['JOBDESC'] == 'tilenight':
1✔
96
        pathname = get_desi_proc_tilenight_batch_file_pathname(night = prow['NIGHT'], tileid=prow['TILEID'])
1✔
97
    else:
98
        pathname = get_desi_proc_batch_file_pathname(night = prow['NIGHT'], exp=expids, \
1✔
99
                                             jobdesc=prow['JOBDESC'], cameras=prow['PROCCAMWORD'])
100
    scriptfile =  pathname + '.slurm'
1✔
101
    return scriptfile
1✔
102

103
def get_jobdesc_to_file_map():
1✔
104
    """
105
    Returns a mapping of job descriptions to the filenames of the output files
106

107
    Args:
108
        None
109

110
    Returns:
111
        dict. Dictionary with keys as lowercase job descriptions and to the
112
            filename of their expected outputs.
113

114
    """
115
    return {'prestdstar': 'sframe',
1✔
116
            'stdstarfit': 'stdstars',
117
            'poststdstar': 'cframe',
118
            'nightlybias': 'biasnight',
119
            # 'ccdcalib': 'badcolumns',
120
            'badcol': 'badcolumns',
121
            'arc': 'fitpsf',
122
            'flat': 'fiberflat',
123
            'psfnight': 'psfnight',
124
            'nightlyflat': 'fiberflatnight',
125
            'spectra': 'spectra_tile',
126
            'coadds': 'coadds_tile',
127
            'redshift': 'redrock_tile'}
128

129
def get_file_to_jobdesc_map():
1✔
130
    """
131
    Returns a mapping of output filenames to job descriptions
132

133
    Args:
134
        None
135

136
    Returns:
137
        dict. Dictionary with keys as filename of their expected outputs to
138
            the lowercase job descriptions
139
            .
140

141
    """
142
    job_to_file_map = get_jobdesc_to_file_map()
1✔
143
    job_to_file_map.pop('badcol') # these files can also be in a ccdcalib job
1✔
144
    job_to_file_map.pop('nightlybias') # these files can also be in a ccdcalib job
1✔
145
    return {value: key for key, value in job_to_file_map.items()}
1✔
146

147
def check_for_outputs_on_disk(prow, resubmit_partial_complete=True):
1✔
148
    """
149
    Args:
150
        prow (Table.Row or dict): Must include keyword accessible definitions for processing_table columns found in
151
            desispect.workflow.proctable.get_processing_table_column_defs()
152
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
153
            jobs with some prior data are pruned using PROCCAMWORD to only process the
154
            remaining cameras not found to exist.
155

156
    Returns:
157
        Table.Row or dict: The same prow type and keywords as input except with modified values updated to reflect
158
        the change in job status after creating and submitting the job for processing.
159
    """
160
    prow['STATUS'] = 'UNKNOWN'
1✔
161
    log = get_logger()
1✔
162

163
    if prow['JOBDESC'] in ['linkcal', 'ccdcalib']:
1✔
164
        log.info(f"jobdesc={prow['JOBDESC']} has indeterminated outputs, so "
1✔
165
                + "not checking for files on disk.")
166
        return prow
1✔
167

168
    job_to_file_map = get_jobdesc_to_file_map()
1✔
169

170
    night = prow['NIGHT']
1✔
171
    if prow['JOBDESC'] in ['cumulative','pernight-v0','pernight','perexp']:
1✔
172
        filetype = 'redrock_tile'
1✔
173
    else:
174
        filetype = job_to_file_map[prow['JOBDESC']]
1✔
175
    orig_camword = prow['PROCCAMWORD']
1✔
176

177
    ## if spectro based, look for spectros, else look for cameras
178
    if prow['JOBDESC'] in ['stdstarfit','spectra','coadds','redshift']:
1✔
179
        ## Spectrograph based
180
        spectros = camword_to_spectros(prow['PROCCAMWORD'])
×
181
        n_desired = len(spectros)
×
182
        ## Suppress outputs about using tile based files in findfile if only looking for stdstarfits
183
        if prow['JOBDESC'] == 'stdstarfit':
×
184
            tileid = None
×
185
        else:
186
            tileid = prow['TILEID']
×
187
        expid = prow['EXPID'][0]
×
188
        existing_spectros = []
×
189
        for spectro in spectros:
×
190
            if os.path.exists(findfile(filetype=filetype, night=night, expid=expid, spectrograph=spectro, tile=tileid)):
×
191
                existing_spectros.append(spectro)
×
192
        completed = (len(existing_spectros) == n_desired)
×
193
        if not completed and resubmit_partial_complete and len(existing_spectros) > 0:
×
194
            existing_camword = 'a' + ''.join([str(spec) for spec in sorted(existing_spectros)])
×
195
            prow['PROCCAMWORD'] = difference_camwords(prow['PROCCAMWORD'],existing_camword)
×
196
    elif prow['JOBDESC'] in ['cumulative','pernight-v0','pernight','perexp']:
1✔
197
        ## Spectrograph based
198
        spectros = camword_to_spectros(prow['PROCCAMWORD'])
1✔
199
        n_desired = len(spectros)
1✔
200
        ## Suppress outputs about using tile based files in findfile if only looking for stdstarfits
201
        tileid = prow['TILEID']
1✔
202
        expid = prow['EXPID'][0]
1✔
203
        redux_dir = specprod_root()
1✔
204
        outdir = os.path.join(redux_dir,get_ztile_relpath(tileid,group=prow['JOBDESC'],night=night,expid=expid))
1✔
205
        suffix = get_ztile_script_suffix(tileid, group=prow['JOBDESC'], night=night, expid=expid)
1✔
206
        existing_spectros = []
1✔
207
        for spectro in spectros:
1✔
208
            if os.path.exists(os.path.join(outdir, f"redrock-{spectro}-{suffix}.fits")):
1✔
209
                existing_spectros.append(spectro)
×
210
        completed = (len(existing_spectros) == n_desired)
1✔
211
        if not completed and resubmit_partial_complete and len(existing_spectros) > 0:
1✔
212
            existing_camword = 'a' + ''.join([str(spec) for spec in sorted(existing_spectros)])
×
213
            prow['PROCCAMWORD'] = difference_camwords(prow['PROCCAMWORD'],existing_camword)
×
214
    else:
215
        ## Otheriwse camera based
216
        cameras = decode_camword(prow['PROCCAMWORD'])
1✔
217
        n_desired = len(cameras)
1✔
218
        if len(prow['EXPID']) > 0:
1✔
219
            expid = prow['EXPID'][0]
1✔
220
        else:
221
            expid = None
1✔
222
        if len(prow['EXPID']) > 1 and prow['JOBDESC'] not in ['psfnight','nightlyflat']:
1✔
223
            log.warning(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']}. This job type only makes " +
×
224
                     f"sense with a single exposure. Proceeding with {expid}.")
225
        missing_cameras = []
1✔
226
        for cam in cameras:
1✔
227
            if not os.path.exists(findfile(filetype=filetype, night=night, expid=expid, camera=cam)):
1✔
228
                missing_cameras.append(cam)
1✔
229
        completed = (len(missing_cameras) == 0)
1✔
230
        if not completed and resubmit_partial_complete and len(missing_cameras) < n_desired:
1✔
231
            prow['PROCCAMWORD'] = create_camword(missing_cameras)
×
232

233
    if completed:
1✔
234
        prow['STATUS'] = 'COMPLETED'
×
235
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} already has " +
×
236
                 f"the desired {n_desired} {filetype}'s. Not submitting this job.")
237
    elif resubmit_partial_complete and orig_camword != prow['PROCCAMWORD']:
1✔
238
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} already has " +
×
239
                 f"some {filetype}'s. Submitting smaller camword={prow['PROCCAMWORD']}.")
240
    elif not resubmit_partial_complete:
1✔
241
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} doesn't have all " +
×
242
                 f"{filetype}'s and resubmit_partial_complete=False. "+
243
                 f"Submitting full camword={prow['PROCCAMWORD']}.")
244
    else:
245
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} has no " +
1✔
246
                 f"existing {filetype}'s. Submitting full camword={prow['PROCCAMWORD']}.")
247
    return prow
1✔
248

249
def create_and_submit(prow, queue='realtime', reservation=None, dry_run=0,
1✔
250
                      joint=False, strictly_successful=False,
251
                      check_for_outputs=True, resubmit_partial_complete=True,
252
                      system_name=None, use_specter=False,
253
                      extra_job_args=None):
254
    """
255
    Wrapper script that takes a processing table row and three modifier keywords, creates a submission script for the
256
    compute nodes, and then submits that script to the Slurm scheduler with appropriate dependencies.
257

258
    Args:
259
        prow (Table.Row or dict): Must include keyword accessible definitions for processing_table columns found in
260
            desispect.workflow.proctable.get_processing_table_column_defs()
261
        queue (str, optional): The name of the NERSC Slurm queue to submit to. Default is the realtime queue.
262
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
263
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
264
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
265
            for testing as though scripts are being submitted. Default is 0 (false).
266
        joint (bool, optional): Whether this is a joint fitting job (the job involves multiple exposures) and therefore needs to be
267
            run with desi_proc_joint_fit. Default is False.
268
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
269
            less desirable because e.g. the sciences can run with SVN default calibrations rather
270
            than failing completely from failed calibrations. Default is False.
271
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
272
            data products for the script being submitted. If all files exist and this is True,
273
            then the script will not be submitted. If some files exist and this is True, only the
274
            subset of the cameras without the final data products will be generated and submitted.
275
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
276
            jobs with some prior data are pruned using PROCCAMWORD to only process the
277
            remaining cameras not found to exist.
278
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
279
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
280
        extra_job_args (dict): Dictionary with key-value pairs that specify additional
281
            information used for a specific type of job. Examples include refnight
282
            and include/exclude lists for linkcals, laststeps for for tilenight, etc.
283

284
    Returns:
285
        Table.Row or dict: The same prow type and keywords as input except with modified values updated to reflect
286
        the change in job status after creating and submitting the job for processing.
287

288
    Note:
289
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
290
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
291
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
292
    """
293
    orig_prow = prow.copy()
1✔
294
    if check_for_outputs:
1✔
295
        prow = check_for_outputs_on_disk(prow, resubmit_partial_complete)
1✔
296
        if prow['STATUS'].upper() == 'COMPLETED':
1✔
297
            return prow
×
298

299
    prow = create_batch_script(prow, queue=queue, dry_run=dry_run, joint=joint,
1✔
300
                               system_name=system_name, use_specter=use_specter,
301
                               extra_job_args=extra_job_args)
302
    prow = submit_batch_script(prow, reservation=reservation, dry_run=dry_run,
1✔
303
                               strictly_successful=strictly_successful)
304

305
    ## If resubmitted partial, the PROCCAMWORD and SCRIPTNAME will correspond
306
    ## to the pruned values. But we want to
307
    ## retain the full job's value, so get those from the old job.
308
    if resubmit_partial_complete:
1✔
309
        prow['PROCCAMWORD'] = orig_prow['PROCCAMWORD']
1✔
310
        prow['SCRIPTNAME'] = orig_prow['SCRIPTNAME']
1✔
311
    return prow
1✔
312

313
def desi_link_calibnight_command(prow, refnight, include=None):
1✔
314
    """
315
    Wrapper script that takes a processing table row (or dictionary with
316
    REFNIGHT, NIGHT, PROCCAMWORD defined) and determines the proper command
317
    line call to link data defined by the input row/dict.
318

319
    Args:
320
        prow (Table.Row or dict): Must include keyword accessible definitions
321
            for 'NIGHT', 'REFNIGHT', and 'PROCCAMWORD'.
322
        refnight (str or int): The night with a valid set of calibrations
323
            be created.
324
        include (list): The filetypes to include in the linking.
325
    Returns:
326
        str: The proper command to be submitted to desi_link_calibnight
327
            to process the job defined by the prow values.
328
    """
329
    cmd = 'desi_link_calibnight'
1✔
330
    cmd += f' --refnight={refnight}'
1✔
331
    cmd += f' --newnight={prow["NIGHT"]}'
1✔
332
    cmd += f' --cameras={prow["PROCCAMWORD"]}'
1✔
333
    if include is not None:
1✔
334
        cmd += f' --include=' + ','.join(list(include))
1✔
335
    return cmd
1✔
336

337
def desi_proc_command(prow, system_name, use_specter=False, queue=None):
1✔
338
    """
339
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, OBSTYPE, JOBDESC, PROCCAMWORD defined)
340
    and determines the proper command line call to process the data defined by the input row/dict.
341

342
    Args:
343
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
344
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, perlmutter-gpu
345
        queue (str, optional): The name of the NERSC Slurm queue to submit to. Default is None (which leaves it to the desi_proc default).
346
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
347

348
    Returns:
349
        str: The proper command to be submitted to desi_proc to process the job defined by the prow values.
350
    """
351
    cmd = 'desi_proc'
1✔
352
    cmd += ' --batch'
1✔
353
    cmd += ' --nosubmit'
1✔
354
    if queue is not None:
1✔
355
        cmd += f' -q {queue}'
1✔
356
    if prow['OBSTYPE'].lower() == 'science':
1✔
357
        if prow['JOBDESC'] == 'prestdstar':
×
358
            cmd += ' --nostdstarfit --nofluxcalib'
×
359
        elif prow['JOBDESC'] == 'poststdstar':
×
360
            cmd += ' --noprestdstarfit --nostdstarfit'
×
361

362
        if use_specter:
×
363
            cmd += ' --use-specter'
×
364
    elif prow['JOBDESC'] in ['flat', 'prestdstar'] and use_specter:
1✔
365
        cmd += ' --use-specter'
×
366
    pcamw = str(prow['PROCCAMWORD'])
1✔
367
    cmd += f" --cameras={pcamw} -n {prow['NIGHT']}"
1✔
368
    if len(prow['EXPID']) > 0:
1✔
369
        ## If ccdcalib job without a dark exposure, don't assign the flat expid
370
        ## since it would incorrectly process the flat using desi_proc
371
        if prow['OBSTYPE'].lower() != 'flat' or prow['JOBDESC'] != 'ccdcalib':
1✔
372
            cmd += f" -e {prow['EXPID'][0]}"
1✔
373
    if prow['BADAMPS'] != '':
1✔
374
        cmd += ' --badamps={}'.format(prow['BADAMPS'])
×
375
    return cmd
1✔
376

377
def desi_proc_joint_fit_command(prow, queue=None):
1✔
378
    """
379
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, OBSTYPE, PROCCAMWORD defined)
380
    and determines the proper command line call to process the data defined by the input row/dict.
381

382
    Args:
383
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
384
        queue (str): The name of the NERSC Slurm queue to submit to. Default is None (which leaves it to the desi_proc default).
385

386
    Returns:
387
        str: The proper command to be submitted to desi_proc_joint_fit
388
            to process the job defined by the prow values.
389
    """
390
    cmd = 'desi_proc_joint_fit'
1✔
391
    cmd += ' --batch'
1✔
392
    cmd += ' --nosubmit'
1✔
393
    if queue is not None:
1✔
394
        cmd += f' -q {queue}'
1✔
395

396
    descriptor = prow['OBSTYPE'].lower()
1✔
397

398
    night = prow['NIGHT']
1✔
399
    specs = str(prow['PROCCAMWORD'])
1✔
400
    expid_str = ','.join([str(eid) for eid in prow['EXPID']])
1✔
401

402
    cmd += f' --obstype {descriptor}'
1✔
403
    cmd += f' --cameras={specs} -n {night}'
1✔
404
    if len(expid_str) > 0:
1✔
405
        cmd += f' -e {expid_str}'
1✔
406
    return cmd
1✔
407

408

409
def create_batch_script(prow, queue='realtime', dry_run=0, joint=False,
1✔
410
                        system_name=None, use_specter=False, extra_job_args=None):
411
    """
412
    Wrapper script that takes a processing table row and three modifier keywords and creates a submission script for the
413
    compute nodes.
414

415
    Args:
416
        prow, Table.Row or dict. Must include keyword accessible definitions for processing_table columns found in
417
            desispect.workflow.proctable.get_processing_table_column_defs()
418
        queue, str. The name of the NERSC Slurm queue to submit to. Default is the realtime queue.
419
        dry_run, int. If nonzero, this is a simulated run. If dry_run=1 the scripts will be written but not submitted.
420
            If dry_run=2, the scripts will not be written nor submitted. Logging will remain the same
421
            for testing as though scripts are being submitted. Default is 0 (false).
422
        joint, bool. Whether this is a joint fitting job (the job involves multiple exposures) and therefore needs to be
423
            run with desi_proc_joint_fit when not using tilenight. Default is False.
424
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
425
        use_specter, bool, optional. Default is False. If True, use specter, otherwise use gpu_specter by default.
426
        extra_job_args (dict): Dictionary with key-value pairs that specify additional
427
            information used for a specific type of job. Examples include refnight
428
            and include/exclude lists for linkcal, laststeps for tilenight, etc.
429

430
    Returns:
431
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
432
        scriptname.
433

434
    Note:
435
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
436
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
437
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
438
    """
439
    log = get_logger()
1✔
440

441
    if extra_job_args is None:
1✔
442
        extra_job_args = {}
1✔
443

444
    if prow['JOBDESC'] in ['perexp','pernight','pernight-v0','cumulative']:
1✔
445
        if dry_run > 1:
1✔
446
            scriptpathname = get_ztile_script_pathname(tileid=prow['TILEID'],group=prow['JOBDESC'],
1✔
447
                                                               night=prow['NIGHT'], expid=prow['EXPID'][0])
448

449
            log.info("Output file would have been: {}".format(scriptpathname))
1✔
450
        else:
451
            #- run zmtl for cumulative redshifts but not others
452
            run_zmtl = (prow['JOBDESC'] == 'cumulative')
1✔
453
            no_afterburners = False
1✔
454
            scripts, failed_scripts = generate_tile_redshift_scripts(tileid=prow['TILEID'], group=prow['JOBDESC'],
1✔
455
                                                                     nights=[prow['NIGHT']], expids=prow['EXPID'],
456
                                                                     batch_queue=queue, system_name=system_name,
457
                                                                     run_zmtl=run_zmtl,
458
                                                                     no_afterburners=no_afterburners,
459
                                                                     nosubmit=True)
460
            if len(failed_scripts) > 0:
1✔
461
                log.error(f"Redshifts failed for group={prow['JOBDESC']}, night={prow['NIGHT']}, "+
×
462
                          f"tileid={prow['TILEID']}, expid={prow['EXPID']}.")
463
                log.info(f"Returned failed scriptname is {failed_scripts}")
×
464
            elif len(scripts) > 1:
1✔
465
                log.error(f"More than one redshifts returned for group={prow['JOBDESC']}, night={prow['NIGHT']}, "+
×
466
                          f"tileid={prow['TILEID']}, expid={prow['EXPID']}.")
467
                log.info(f"Returned scriptnames were {scripts}")
×
468
            else:
469
                scriptpathname = scripts[0]
1✔
470
    elif prow['JOBDESC'] == 'linkcal':
1✔
471
        refnight, include, exclude = -99, None, None
1✔
472
        if 'refnight' in extra_job_args:
1✔
473
            refnight = extra_job_args['refnight']
1✔
474
        if 'include' in extra_job_args:
1✔
475
            include = extra_job_args['include']
1✔
476
        if 'exclude' in extra_job_args:
1✔
NEW
477
            exclude = extra_job_args['exclude']
×
478
        include, exclude = derive_include_exclude(include, exclude)
1✔
479
        ## Fiberflatnights shouldn't to be generated with psfs from same time, so
480
        ## shouldn't link psfs without also linking fiberflatnight
481
        ## However, this should be checked at a higher level. If set here,
482
        ## go ahead and do it
483
        # if 'psfnight' in include and not 'fiberflatnight' in include:
484
        #     err = "Must link fiberflatnight if linking psfnight"
485
        #     log.error(err)
486
        #     raise ValueError(err)
487
        if dry_run > 1:
1✔
488
            scriptpathname = batch_script_name(prow)
1✔
489
            log.info("Output file would have been: {}".format(scriptpathname))
1✔
490
            cmd = desi_link_calibnight_command(prow, refnight, include)
1✔
491
            log.info("Command to be run: {}".format(cmd.split()))
1✔
492
        else:
NEW
493
            if refnight == -99:
×
NEW
494
                err = f'For {prow=} asked to link calibration but not given' \
×
495
                      + ' a valid refnight'
NEW
496
                log.error(err)
×
NEW
497
                raise ValueError(err)
×
498

NEW
499
            cmd = desi_link_calibnight_command(prow, refnight, include)
×
NEW
500
            log.info(f"Running: {cmd.split()}")
×
NEW
501
            scriptpathname = create_linkcal_batch_script(newnight=prow['NIGHT'],
×
502
                                                        cameras=prow['PROCCAMWORD'],
503
                                                        queue=queue,
504
                                                        cmd=cmd,
505
                                                        system_name=system_name)
506
    else:
507
        if prow['JOBDESC'] != 'tilenight':
1✔
508
            nightlybias, nightlycte, cte_expids = False, False, None
1✔
509
            if 'nightlybias' in extra_job_args:
1✔
510
                nightlybias = extra_job_args['nightlybias']
1✔
511
            elif prow['JOBDESC'].lower() == 'nightlybias':
1✔
512
                nightlybias = True
1✔
513
            if 'nightlycte' in extra_job_args:
1✔
514
                nightlycte = extra_job_args['nightlycte']
1✔
515
            if 'cte_expids' in extra_job_args:
1✔
516
                cte_expids = extra_job_args['cte_expids']
1✔
517
            ## run known joint jobs as joint even if unspecified
518
            ## in the future we can eliminate the need for "joint"
519
            if joint or prow['JOBDESC'].lower() in ['psfnight', 'nightlyflat']:
1✔
520
                cmd = desi_proc_joint_fit_command(prow, queue=queue)
1✔
521
            else:
522
                cmd = desi_proc_command(prow, system_name, use_specter, queue=queue)
1✔
523
                if nightlybias:
1✔
524
                    cmd += ' --nightlybias'
1✔
525
                if nightlycte:
1✔
526
                    cmd += ' --nightlycte'
1✔
527
                    if cte_expids is not None:
1✔
528
                        cmd += ' --cte-expids '
1✔
529
                        cmd += ','.join(np.atleast_1d(cte_expids).astype(str))
1✔
530
        if dry_run > 1:
1✔
531
            scriptpathname = batch_script_name(prow)
1✔
532
            log.info("Output file would have been: {}".format(scriptpathname))
1✔
533
            if prow['JOBDESC'] != 'tilenight':
1✔
534
                log.info("Command to be run: {}".format(cmd.split()))
1✔
535
        else:
536
            expids = prow['EXPID']
1✔
537
            if len(expids) == 0:
1✔
538
                expids = None
×
539

540
            if prow['JOBDESC'] == 'tilenight':
1✔
541
                log.info("Creating tilenight script for tile {}".format(prow['TILEID']))
1✔
542
                if 'laststeps' in extra_job_args:
1✔
543
                    laststeps = extra_job_args['laststeps']
1✔
544
                else:
NEW
545
                    err = f'{prow=} job did not specify last steps to tilenight'
×
NEW
546
                    log.error(err)
×
NEW
547
                    raise ValueError(err)
×
548
                ncameras = len(decode_camword(prow['PROCCAMWORD']))
1✔
549
                scriptpathname = create_desi_proc_tilenight_batch_script(
1✔
550
                                                               night=prow['NIGHT'], exp=expids,
551
                                                               tileid=prow['TILEID'],
552
                                                               ncameras=ncameras,
553
                                                               queue=queue,
554
                                                               mpistdstars=True,
555
                                                               use_specter=use_specter,
556
                                                               system_name=system_name,
557
                                                               laststeps=laststeps)
558
            else:
559
                if expids is not None and len(expids) > 1:
1✔
560
                    expids = expids[:1]
1✔
561
                log.info("Running: {}".format(cmd.split()))
1✔
562
                scriptpathname = create_desi_proc_batch_script(night=prow['NIGHT'], exp=expids,
1✔
563
                                                               cameras=prow['PROCCAMWORD'],
564
                                                               jobdesc=prow['JOBDESC'],
565
                                                               queue=queue, cmdline=cmd,
566
                                                               use_specter=use_specter,
567
                                                               system_name=system_name,
568
                                                               nightlybias=nightlybias,
569
                                                               nightlycte=nightlycte,
570
                                                               cte_expids=cte_expids)
571
    log.info("Outfile is: {}".format(scriptpathname))
1✔
572
    prow['SCRIPTNAME'] = os.path.basename(scriptpathname)
1✔
573
    return prow
1✔
574

575
_fake_qid = int(time.time() - 1.7e9)
1✔
576
def _get_fake_qid():
1✔
577
    """
578
    Return fake slurm queue jobid to use for dry-run testing
579
    """
580
    # Note: not implemented as a yield generator so that this returns a
581
    # genuine int, not a generator object
582
    global _fake_qid
583
    _fake_qid += 1
1✔
584
    return _fake_qid
1✔
585

586
def submit_batch_script(prow, dry_run=0, reservation=None, strictly_successful=False):
1✔
587
    """
588
    Wrapper script that takes a processing table row and three modifier keywords and submits the scripts to the Slurm
589
    scheduler.
590

591
    Args:
592
        prow, Table.Row or dict. Must include keyword accessible definitions for processing_table columns found in
593
            desispect.workflow.proctable.get_processing_table_column_defs()
594
        dry_run, int. If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
595
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
596
            for testing as though scripts are being submitted. Default is 0 (false).
597
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
598
        strictly_successful, bool. Whether all jobs require all inputs to have succeeded. For daily processing, this is
599
            less desirable because e.g. the sciences can run with SVN default calibrations rather
600
            than failing completely from failed calibrations. Default is False.
601

602
    Returns:
603
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
604
        scriptname.
605

606
    Note:
607
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
608
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
609
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
610
    """
611
    log = get_logger()
1✔
612
    dep_qids = prow['LATEST_DEP_QID']
1✔
613
    dep_list, dep_str = '', ''
1✔
614

615
    # workaround for sbatch --dependency bug not tracking completed jobs correctly
616
    # see NERSC TICKET INC0203024
617
    if len(dep_qids) > 0 and not dry_run:
1✔
618
        dep_table = queue_info_from_qids(np.asarray(dep_qids), columns='jobid,state')
×
619
        for row in dep_table:
×
620
            if row['STATE'] == 'COMPLETED':
×
621
                log.info(f"removing completed jobid {row['JOBID']}")
×
622
                dep_qids = np.delete(dep_qids, np.argwhere(dep_qids==row['JOBID']))
×
623

624
    if len(dep_qids) > 0:
1✔
625
        jobtype = prow['JOBDESC']
1✔
626
        if strictly_successful:
1✔
627
            depcond = 'afterok'
1✔
628
        elif jobtype in ['arc', 'psfnight', 'prestdstar', 'stdstarfit']:
×
629
            ## (though psfnight and stdstarfit will require some inputs otherwise they'll go up in flames)
630
            depcond = 'afterany'
×
631
        else:
632
            ## if 'flat','nightlyflat','poststdstar', or any type of redshift, require strict success of inputs
633
            depcond = 'afterok'
×
634

635
        dep_str = f'--dependency={depcond}:'
1✔
636

637
        if np.isscalar(dep_qids):
1✔
638
            dep_list = str(dep_qids).strip(' \t')
×
639
            if dep_list == '':
×
640
                dep_str = ''
×
641
            else:
642
                dep_str += dep_list
×
643
        else:
644
            if len(dep_qids)>1:
1✔
645
                dep_list = ':'.join(np.array(dep_qids).astype(str))
1✔
646
                dep_str += dep_list
1✔
647
            elif len(dep_qids) == 1 and dep_qids[0] not in [None, 0]:
1✔
648
                dep_str += str(dep_qids[0])
1✔
649
            else:
650
                dep_str = ''
×
651

652
    # script = f'{jobname}.slurm'
653
    # script_path = pathjoin(batchdir, script)
654
    if prow['JOBDESC'] in ['pernight-v0','pernight','perexp','cumulative']:
1✔
655
        script_path = get_ztile_script_pathname(tileid=prow['TILEID'],group=prow['JOBDESC'],
1✔
656
                                                        night=prow['NIGHT'], expid=np.min(prow['EXPID']))
657
        jobname = os.path.basename(script_path)
1✔
658
    else:
659
        batchdir = get_desi_proc_batch_file_path(night=prow['NIGHT'])
1✔
660
        jobname = batch_script_name(prow)
1✔
661
        script_path = pathjoin(batchdir, jobname)
1✔
662

663
    batch_params = ['sbatch', '--parsable']
1✔
664
    if dep_str != '':
1✔
665
        batch_params.append(f'{dep_str}')
1✔
666
    if reservation is not None:
1✔
667
        batch_params.append(f'--reservation={reservation}')
×
668
    batch_params.append(f'{script_path}')
1✔
669

670
    if dry_run:
1✔
671
        current_qid = _get_fake_qid()
1✔
672
    else:
673
        #- sbatch sometimes fails; try several times before giving up
674
        max_attempts = 3
×
675
        for attempt in range(max_attempts):
×
676
            try:
×
677
                current_qid = subprocess.check_output(batch_params, stderr=subprocess.STDOUT, text=True)
×
678
                current_qid = int(current_qid.strip(' \t\n'))
×
679
                break
×
680
            except subprocess.CalledProcessError as err:
×
681
                log.error(f'{jobname} submission failure at {datetime.datetime.now()}')
×
682
                log.error(f'{jobname}   {batch_params}')
×
683
                log.error(f'{jobname}   {err.output=}')
×
684
                if attempt < max_attempts - 1:
×
685
                    log.info('Sleeping 60 seconds then retrying')
×
686
                    time.sleep(60)
×
687
        else:  #- for/else happens if loop doesn't succeed
688
            msg = f'{jobname} submission failed {max_attempts} times; exiting'
×
689
            log.critical(msg)
×
690
            raise RuntimeError(msg)
×
691

692
    log.info(batch_params)
1✔
693
    log.info(f'Submitted {jobname} with dependencies {dep_str} and reservation={reservation}. Returned qid: {current_qid}')
1✔
694

695
    prow['LATEST_QID'] = current_qid
1✔
696
    prow['ALL_QIDS'] = np.append(prow['ALL_QIDS'],current_qid)
1✔
697
    prow['STATUS'] = 'SUBMITTED'
1✔
698
    prow['SUBMIT_DATE'] = int(time.time())
1✔
699

700
    return prow
1✔
701

702

703
#############################################
704
##########   Row Manipulations   ############
705
#############################################
706
def define_and_assign_dependency(prow, calibjobs, use_tilenight=False,
1✔
707
                                 refnight=None):
708
    """
709
    Given input processing row and possible calibjobs, this defines the
710
    JOBDESC keyword and assigns the dependency appropriate for the job type of
711
    prow.
712

713
    Args:
714
        prow, Table.Row or dict. Must include keyword accessible definitions for
715
            'OBSTYPE'. A row must have column names for
716
            'JOBDESC', 'INT_DEP_IDS', and 'LATEST_DEP_ID'.
717
        calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
718
            and 'nightlyflat'. Each key corresponds to a Table.Row or
719
            None. The table.Row() values are for the corresponding
720
            calibration job. Each value that isn't None must contain
721
            'INTID', and 'LATEST_QID'. If None, it assumes the
722
            dependency doesn't exist and no dependency is assigned.
723
        use_tilenight, bool. Default is False. If True, use desi_proc_tilenight
724
            for prestdstar, stdstar,and poststdstar steps for
725
            science exposures.
726
        refnight, int. The reference night for linking jobs
727

728
    Returns:
729
        Table.Row or dict: The same prow type and keywords as input except
730
        with modified values updated values for 'JOBDESC', 'INT_DEP_IDS'. and 'LATEST_DEP_ID'.
731

732
    Note:
733
        This modifies the input. Though Table.Row objects are generally copied
734
        on modification, so the change to the input object in memory may or may
735
        not be changed. As of writing, a row from a table given to this function
736
        will not change during the execution of this function (but can be
737
        overwritten explicitly with the returned row if desired).
738
    """
739
    if prow['OBSTYPE'] in ['science', 'twiflat']:
1✔
740
        if calibjobs['nightlyflat'] is not None:
1✔
741
            dependency = calibjobs['nightlyflat']
1✔
742
        elif calibjobs['psfnight'] is not None:
1✔
743
            dependency = calibjobs['psfnight']
1✔
744
        elif calibjobs['ccdcalib'] is not None:
1✔
745
            dependency = calibjobs['ccdcalib']
1✔
746
        elif calibjobs['nightlybias'] is not None:
1✔
747
            dependency = calibjobs['nightlybias']
×
748
        elif calibjobs['badcol'] is not None:
1✔
NEW
749
            dependency = calibjobs['badcol']
×
750
        else:
751
            dependency = calibjobs['linkcal']
1✔
752
        if not use_tilenight:
1✔
753
            prow['JOBDESC'] = 'prestdstar'
1✔
754
    elif prow['OBSTYPE'] == 'flat':
1✔
755
        if calibjobs['psfnight'] is not None:
1✔
756
            dependency = calibjobs['psfnight']
1✔
757
        elif calibjobs['ccdcalib'] is not None:
1✔
758
            dependency = calibjobs['ccdcalib']
1✔
759
        elif calibjobs['nightlybias'] is not None:
1✔
760
            dependency = calibjobs['nightlybias']
×
761
        elif calibjobs['badcol'] is not None:
1✔
NEW
762
            dependency = calibjobs['badcol']
×
763
        else:
764
            dependency = calibjobs['linkcal']
1✔
765
    elif prow['OBSTYPE'] == 'arc':
1✔
766
        if calibjobs['ccdcalib'] is not None:
1✔
767
            dependency = calibjobs['ccdcalib']
1✔
768
        elif calibjobs['nightlybias'] is not None:
1✔
769
            dependency = calibjobs['nightlybias']
1✔
770
        elif calibjobs['badcol'] is not None:
1✔
771
            dependency = calibjobs['badcol']
×
772
        else:
773
            dependency = calibjobs['linkcal']
1✔
774
    elif prow['JOBDESC'] in ['badcol', 'nightlybias', 'ccdcalib']:
1✔
775
        dependency = calibjobs['linkcal']
1✔
776
    elif prow['OBSTYPE'] == 'dark':
1✔
NEW
777
        if calibjobs['ccdcalib'] is not None:
×
NEW
778
            dependency = calibjobs['ccdcalib']
×
NEW
779
        elif calibjobs['nightlybias'] is not None:
×
NEW
780
            dependency = calibjobs['nightlybias']
×
NEW
781
        elif calibjobs['badcol'] is not None:
×
NEW
782
            dependency = calibjobs['badcol']
×
783
        else:
NEW
784
            dependency = calibjobs['linkcal']
×
785
    elif prow['JOBDESC'] == 'linkcal' and refnight is not None:
1✔
786
        dependency = None
1✔
787
        ## For link cals only, enable cross-night dependencies if available
788
        refproctable = findfile('proctable', night=refnight)
1✔
789
        if os.path.exists(refproctable):
1✔
NEW
790
            ptab = load_table(tablename=refproctable, tabletype='proctable')
×
791
            ## This isn't perfect because we may depend on jobs that aren't
792
            ## actually being linked
793
            ## Also allows us to proceed even if jobs don't exist yet
NEW
794
            deps = []
×
NEW
795
            for job in ['nightlybias', 'ccdcalib', 'psfnight', 'nightlyflat']:
×
NEW
796
                if job in ptab['JOBDESC']:
×
797
                    ## add prow to dependencies
NEW
798
                    deps.append(ptab[ptab['JOBDESC']==job][0])
×
NEW
799
            if len(deps) > 0:
×
NEW
800
                dependency = deps
×
801
    else:
802
        dependency = None
×
803

804
    prow = assign_dependency(prow, dependency)
1✔
805

806
    return prow
1✔
807

808

809
def assign_dependency(prow, dependency):
1✔
810
    """
811
    Given input processing row and possible arcjob (processing row for psfnight) and flatjob (processing row for
812
    nightlyflat), this defines the JOBDESC keyword and assigns the dependency appropriate for the job type of prow.
813

814
    Args:
815
        prow, Table.Row or dict. Must include keyword accessible definitions for 'OBSTYPE'. A row must have column names for
816
            'JOBDESC', 'INT_DEP_IDS', and 'LATEST_DEP_ID'.
817
        dependency, NoneType or scalar/list/array of Table.Row, dict. Processing row corresponding to the required input
818
            for the job in prow. This must contain keyword
819
            accessible values for 'INTID', and 'LATEST_QID'.
820
            If None, it assumes the dependency doesn't exist
821
            and no dependency is assigned.
822

823
    Returns:
824
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
825
        'JOBDESC', 'INT_DEP_IDS'. and 'LATEST_DEP_ID'.
826

827
    Note:
828
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
829
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
830
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
831
    """
832
    prow['INT_DEP_IDS'] = np.ndarray(shape=0).astype(int)
1✔
833
    prow['LATEST_DEP_QID'] = np.ndarray(shape=0).astype(int)
1✔
834
    if dependency is not None:
1✔
835
        if type(dependency) in [list, np.array]:
1✔
836
            ids, qids = [], []
1✔
837
            for curdep in dependency:
1✔
838
                if still_a_dependency(curdep):
1✔
839
                    ids.append(curdep['INTID'])
1✔
840
                    qids.append(curdep['LATEST_QID'])
1✔
841
            prow['INT_DEP_IDS'] = np.array(ids, dtype=int)
1✔
842
            prow['LATEST_DEP_QID'] = np.array(qids, dtype=int)
1✔
843
        elif type(dependency) in [dict, OrderedDict, Table.Row] and still_a_dependency(dependency):
1✔
844
            prow['INT_DEP_IDS'] = np.array([dependency['INTID']], dtype=int)
1✔
845
            prow['LATEST_DEP_QID'] = np.array([dependency['LATEST_QID']], dtype=int)
1✔
846
    return prow
1✔
847

848
def still_a_dependency(dependency):
1✔
849
    """
850
    Defines the criteria for which a dependency is deemed complete (and therefore no longer a dependency).
851

852
     Args:
853
        dependency, Table.Row or dict. Processing row corresponding to the required input for the job in prow.
854
            This must contain keyword accessible values for 'STATUS', and 'LATEST_QID'.
855

856
    Returns:
857
        bool: False if the criteria indicate that the dependency is completed and no longer a blocking factor (ie no longer
858
        a genuine dependency). Returns True if the dependency is still a blocking factor such that the slurm
859
        scheduler needs to be aware of the pending job.
860

861
    """
862
    return dependency['LATEST_QID'] > 0 and dependency['STATUS'] != 'COMPLETED'
1✔
863

864
def get_type_and_tile(erow):
1✔
865
    """
866
    Trivial function to return the OBSTYPE and the TILEID from an exposure table row
867

868
    Args:
869
        erow, Table.Row or dict. Must contain 'OBSTYPE' and 'TILEID' as keywords.
870

871
    Returns:
872
        tuple (str, str), corresponding to the OBSTYPE and TILEID values of the input erow.
873
    """
874
    return str(erow['OBSTYPE']).lower(), erow['TILEID']
×
875

876

877
#############################################
878
#########   Table manipulators   ############
879
#############################################
880
def parse_previous_tables(etable, ptable, night):
1✔
881
    """
882
    This takes in the exposure and processing tables and regenerates all the working memory variables needed for the
883
    daily processing script.
884

885
    Used by the daily processing to define most of its state-ful variables into working memory.
886
    If the processing table is empty, these are simply declared and returned for use.
887
    If the code had previously run and exited (or crashed), however, this will all the code to
888
    re-establish itself by redefining these values.
889

890
    Args:
891
        etable, Table, Exposure table of all exposures that have been dealt with thus far.
892
        ptable, Table, Processing table of all exposures that have been processed.
893
        night, str or int, the night the data was taken.
894

895
    Returns:
896
        tuple: A tuple containing:
897

898
        * arcs, list of dicts, list of the individual arc jobs used for the psfnight (NOT all
899
          the arcs, if multiple sets existed)
900
        * flats, list of dicts, list of the individual flat jobs used for the nightlyflat (NOT
901
          all the flats, if multiple sets existed)
902
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
903
          (if currently processing that tile)
904
        * calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'badcol', 'psfnight'
905
          and 'nightlyflat'. Each key corresponds to a Table.Row or
906
          None. The table.Row() values are for the corresponding
907
          calibration job.
908
        * curtype, None, the obstype of the current job being run. Always None as first new job will define this.
909
        * lasttype, str or None, the obstype of the last individual exposure row to be processed.
910
        * curtile, None, the tileid of the current job (if science). Otherwise None. Always None as first
911
          new job will define this.
912
        * lasttile, str or None, the tileid of the last job (if science). Otherwise None.
913
        * internal_id, int, an internal identifier unique to each job. Increments with each new job. This
914
          is the latest unassigned value.
915
    """
916
    log = get_logger()
×
917
    arcs, flats, sciences = [], [], []
×
918
    calibjobs = {'nightlybias': None, 'ccdcalib': None, 'badcol': None, 'psfnight': None,
×
919
                 'nightlyflat': None}
920
    curtype,lasttype = None,None
×
921
    curtile,lasttile = None,None
×
922

923
    if len(ptable) > 0:
×
924
        prow = ptable[-1]
×
925
        internal_id = int(prow['INTID'])+1
×
926
        lasttype,lasttile = get_type_and_tile(ptable[-1])
×
927
        jobtypes = ptable['JOBDESC']
×
928

929
        if 'nightlybias' in jobtypes:
×
930
            calibjobs['nightlybias'] = table_row_to_dict(ptable[jobtypes=='nightlybias'][0])
×
931
            log.info("Located nightlybias job in exposure table: {}".format(calibjobs['nightlybias']))
×
932

933
        if 'ccdcalib' in jobtypes:
×
934
            calibjobs['ccdcalib'] = table_row_to_dict(ptable[jobtypes=='ccdcalib'][0])
×
935
            log.info("Located ccdcalib job in exposure table: {}".format(calibjobs['ccdcalib']))
×
936

937
        if 'psfnight' in jobtypes:
×
938
            calibjobs['psfnight'] = table_row_to_dict(ptable[jobtypes=='psfnight'][0])
×
939
            log.info("Located joint fit psfnight job in exposure table: {}".format(calibjobs['psfnight']))
×
940
        elif lasttype == 'arc':
×
941
            seqnum = 10
×
942
            for row in ptable[::-1]:
×
943
                erow = etable[etable['EXPID']==row['EXPID'][0]]
×
944
                if row['OBSTYPE'].lower() == 'arc' and int(erow['SEQNUM'])<seqnum:
×
945
                    arcs.append(table_row_to_dict(row))
×
946
                    seqnum = int(erow['SEQNUM'])
×
947
                else:
948
                    break
×
949
            ## Because we work backword to fill in, we need to reverse them to get chronological order back
950
            arcs = arcs[::-1]
×
951

952
        if 'nightlyflat' in jobtypes:
×
953
            calibjobs['nightlyflat'] = table_row_to_dict(ptable[jobtypes=='nightlyflat'][0])
×
954
            log.info("Located joint fit nightlyflat job in exposure table: {}".format(calibjobs['nightlyflat']))
×
955
        elif lasttype == 'flat':
×
956
            for row in ptable[::-1]:
×
957
                erow = etable[etable['EXPID']==row['EXPID'][0]]
×
958
                if row['OBSTYPE'].lower() == 'flat' and int(erow['SEQTOT']) < 5:
×
959
                    if float(erow['EXPTIME']) > 100.:
×
960
                        flats.append(table_row_to_dict(row))
×
961
                else:
962
                    break
×
963
            flats = flats[::-1]
×
964

965
        if lasttype.lower() == 'science':
×
966
            for row in ptable[::-1]:
×
967
                if row['OBSTYPE'].lower() == 'science' and row['TILEID'] == lasttile and \
×
968
                   row['JOBDESC'] == 'prestdstar' and row['LASTSTEP'] != 'skysub':
969
                    sciences.append(table_row_to_dict(row))
×
970
                else:
971
                    break
×
972
            sciences = sciences[::-1]
×
973
    else:
974
        internal_id = night_to_starting_iid(night)
×
975

976
    return arcs,flats,sciences, \
×
977
           calibjobs, \
978
           curtype, lasttype, \
979
           curtile, lasttile,\
980
           internal_id
981

982
def generate_calibration_dict(ptable, files_to_link=None):
1✔
983
    """
984
    This takes in a processing table and regenerates the working memory calibration
985
    dictionary for dependency tracking. Used by the daily processing to define 
986
    most of its state-ful variables into working memory.
987
    If the processing table is empty, these are simply declared and returned for use.
988
    If the code had previously run and exited (or crashed), however, this will all the code to
989
    re-establish itself by redefining these values.
990

991
    Args:
992
        ptable, Table, Processing table of all exposures that have been processed.
993
        files_to_link, set, Set of filenames that the linkcal job will link.
994

995
    Returns:
996
        calibjobs, dict. Dictionary containing 'nightlybias', 'badcol', 'ccdcalib',
997
            'psfnight', 'nightlyflat', 'linkcal', and 'completed'. Each key corresponds to a
998
            Table.Row or None. The table.Row() values are for the corresponding
999
            calibration job.
1000
    """
1001
    log = get_logger()
1✔
1002
    job_to_file_map = get_jobdesc_to_file_map()
1✔
1003
    calibjobs = {'nightlybias': None, 'ccdcalib': None, 'badcol': None,
1✔
1004
                 'psfnight': None, 'nightlyflat': None, 'linkcal': None,
1005
                 'completed': dict()}
1006
    ptable_jobtypes = ptable['JOBDESC']
1✔
1007

1008
    for jobtype in calibjobs.keys():
1✔
1009
        calibjobs["completed"][jobtype] = False
1✔
1010
        if jobtype in ptable_jobtypes:
1✔
NEW
1011
            calibjobs[jobtype] = table_row_to_dict(ptable[ptable_jobtypes==jobtype][0])
×
NEW
1012
            log.info(f"Located {jobtype} job in exposure table: {calibjobs[jobtype]}")
×
NEW
1013
            calibjobs["completed"][jobtype] = True
×
1014

1015
    if calibjobs["completed"]['linkcal'] and files_to_link is not None:
1✔
NEW
1016
        calibjobs = update_calibjobs_with_linking(calibjobs, files_to_link)
×
1017

1018
    return calibjobs
1✔
1019

1020

1021
def update_calibjobs_with_linking(calibjobs, files_to_link):
1✔
1022
    """
1023
    This takes in a dictionary summarizing the calibration jobs and updates it
1024
    based on the files_to_link, which are assumed to have already been linked
1025
    such that those files already exist on disk and don't need ot be generated.
1026

1027
    Parameters
1028
    ----------
1029
        calibjobs: dict
1030
            Dictionary containing "nightlybias", "badcol", "ccdcalib",
1031
            "psfnight", "nightlyflat", "linkcal", and "completed". Each key corresponds to a
1032
            Table.Row or None. The table.Row() values are for the corresponding
1033
            calibration job.
1034
        files_to_link: set
1035
            Set of filenames that the linkcal job will link.
1036

1037
    Returns
1038
    -------
1039
        calibjobs, dict
1040
            Dictionary containing 'nightlybias', 'badcol', 'ccdcalib',
1041
            'psfnight', 'nightlyflat', 'linkcal', and 'completed'. Each key corresponds to a
1042
            Table.Row or None. The table.Row() values are for the corresponding
1043
            calibration job.
1044
    """
1045
    log = get_logger()
1✔
1046
    
1047
    file_job_map = get_file_to_jobdesc_map()
1✔
1048
    for fil in files_to_link:
1✔
1049
        if fil in file_job_map:
1✔
1050
            calibjobs['completed'][file_job_map[fil]] = True
1✔
1051
        elif fil in ['biasnight', 'badcolumns', 'ctecorrnight']:
1✔
1052
            continue
1✔
1053
        else:
NEW
1054
            err = f"Filetype {fil} doesn't map to a known job description: "
×
NEW
1055
            err += f"{file_job_map=}"
×
NEW
1056
            log.error(err)
×
NEW
1057
            raise ValueError(err)
×
1058
        
1059
    if 'biasnight' in files_to_link and 'badcolumns' in files_to_link \
1✔
1060
            and 'ctecorrnight' in files_to_link:
1061
        calibjobs['completed']['ccdcalib'] = True
1✔
1062

1063
    return calibjobs
1✔
1064

1065
def all_calibs_submitted(completed):
1✔
1066
    """
1067
    Function that returns the boolean logic to determine if the necessary
1068
    calibration jobs have been submitted for calibration.
1069

1070
    Args:
1071
        completed, dict, Dictionary with keys corresponding to the calibration job descriptions and values of True or False.
1072

1073
    Returns:
1074
        bool, True if all necessary calibrations have been submitted or handled, False otherwise.
1075
    """
1076
    ccdlevel_completed = (completed['nightlybias'] or completed['badcol']
1✔
1077
                         or completed['ccdcalib'])
1078
    return ccdlevel_completed and completed['psfnight'] and completed['nightlyflat']
1✔
1079

1080
def update_and_recurvsively_submit(proc_table, submits=0, resubmission_states=None,
1✔
1081
                                   ptab_name=None, dry_run=0,reservation=None):
1082
    """
1083
    Given an processing table, this loops over job rows and resubmits failed jobs (as defined by resubmission_states).
1084
    Before submitting a job, it checks the dependencies for failures. If a dependency needs to be resubmitted, it recursively
1085
    follows dependencies until it finds the first job without a failed dependency and resubmits that. Then resubmits the
1086
    other jobs with the new Slurm jobID's for proper dependency coordination within Slurm.
1087

1088
    Args:
1089
        proc_table, Table, the processing table with a row per job.
1090
        submits, int, the number of submissions made to the queue. Used for saving files and in not overloading the scheduler.
1091
        resubmission_states, list or array of strings, each element should be a capitalized string corresponding to a
1092
            possible Slurm scheduler state, where you wish for jobs with that
1093
            outcome to be resubmitted
1094
        ptab_name, str, the full pathname where the processing table should be saved.
1095
        dry_run, int, If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1096
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1097
            for testing as though scripts are being submitted. Default is 0 (false).
1098
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
1099

1100
    Returns:
1101
        tuple: A tuple containing:
1102

1103
        * proc_table: Table, a table with the same rows as the input except that Slurm and jobid relevant columns have
1104
          been updated for those jobs that needed to be resubmitted.
1105
        * submits: int, the number of submissions made to the queue. This is incremented from the input submits, so it is
1106
          the number of submissions made from this function call plus the input submits value.
1107

1108
    Note:
1109
        This modifies the inputs of both proc_table and submits and returns them.
1110
    """
1111
    log = get_logger()
×
1112
    if resubmission_states is None:
×
1113
        resubmission_states = get_resubmission_states()
×
1114
    log.info(f"Resubmitting jobs with current states in the following: {resubmission_states}")
×
1115
    proc_table = update_from_queue(proc_table, dry_run=False)
×
1116
    log.info("Updated processing table queue information:")
×
1117
    cols = ['INTID', 'INT_DEP_IDS', 'EXPID', 'TILEID',
×
1118
            'OBSTYPE', 'JOBDESC', 'LATEST_QID', 'STATUS']
1119
    print(np.array(cols))
×
1120
    for row in proc_table:
×
1121
        print(np.array(row[cols]))
×
1122
    print("\n")
×
1123
    id_to_row_map = {row['INTID']: rown for rown, row in enumerate(proc_table)}
×
1124
    for rown in range(len(proc_table)):
×
1125
        if proc_table['STATUS'][rown] in resubmission_states:
×
1126
            proc_table, submits = recursive_submit_failed(rown, proc_table, submits,
×
1127
                                                          id_to_row_map, ptab_name,
1128
                                                          resubmission_states,
1129
                                                          reservation, dry_run)
1130
    return proc_table, submits
×
1131

1132
def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name=None,
1✔
1133
                            resubmission_states=None, reservation=None, dry_run=0):
1134
    """
1135
    Given a row of a processing table and the full processing table, this resubmits the given job.
1136
    Before submitting a job, it checks the dependencies for failures in the processing table. If a dependency needs to
1137
    be resubmitted, it recursively follows dependencies until it finds the first job without a failed dependency and
1138
    resubmits that. Then resubmits the other jobs with the new Slurm jobID's for proper dependency coordination within Slurm.
1139

1140
    Args:
1141
        rown, Table.Row, the row of the processing table that you want to resubmit.
1142
        proc_table, Table, the processing table with a row per job.
1143
        submits, int, the number of submissions made to the queue. Used for saving files and in not overloading the scheduler.
1144
        id_to_row_map, dict, lookup dictionary where the keys are internal ids (INTID's) and the values are the row position
1145
            in the processing table.
1146
        ptab_name, str, the full pathname where the processing table should be saved.
1147
        resubmission_states, list or array of strings, each element should be a capitalized string corresponding to a
1148
            possible Slurm scheduler state, where you wish for jobs with that
1149
            outcome to be resubmitted
1150
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
1151
        dry_run, int, If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1152
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1153
            for testing as though scripts are being submitted. Default is 0 (false).
1154

1155
    Returns:
1156
        tuple: A tuple containing:
1157

1158
        * proc_table: Table, a table with the same rows as the input except that Slurm and jobid relevant columns have
1159
          been updated for those jobs that needed to be resubmitted.
1160
        * submits: int, the number of submissions made to the queue. This is incremented from the input submits, so it is
1161
          the number of submissions made from this function call plus the input submits value.
1162

1163
    Note:
1164
        This modifies the inputs of both proc_table and submits and returns them.
1165
    """
1166
    log = get_logger()
×
1167
    row = proc_table[rown]
×
1168
    log.info(f"Identified row {row['INTID']} as needing resubmission.")
×
1169
    log.info(f"{row['INTID']}: Expid(s): {row['EXPID']}  Job: {row['JOBDESC']}")
×
1170
    if resubmission_states is None:
×
1171
        resubmission_states = get_resubmission_states()
×
1172
    ideps = proc_table['INT_DEP_IDS'][rown]
×
1173
    if ideps is None:
×
1174
        proc_table['LATEST_DEP_QID'][rown] = np.ndarray(shape=0).astype(int)
×
1175
    else:
1176
        all_valid_states = list(resubmission_states.copy())
×
1177
        all_valid_states.extend(['RUNNING','PENDING','SUBMITTED','COMPLETED'])
×
1178
        for idep in np.sort(np.atleast_1d(ideps)):
×
NEW
1179
            if idep not in id_to_row_map and idep // 1000 != row['INTID'] // 1000:
×
NEW
1180
                log.warning(f"Internal ID: {idep} not in id_to_row_map. "
×
1181
                            + "This is expected since it's from another day. "
1182
                            + f" This dependency will not be checked or "
1183
                            + "resubmitted")
NEW
1184
                continue
×
1185
            if proc_table['STATUS'][id_to_row_map[idep]] not in all_valid_states:
×
1186
                log.warning(f"Proc INTID: {proc_table['INTID'][rown]} depended on" +
×
1187
                            f" INTID {proc_table['INTID'][id_to_row_map[idep]]}" +
1188
                            f" but that exposure has state" +
1189
                            f" {proc_table['STATUS'][id_to_row_map[idep]]} that" +
1190
                            f" isn't in the list of resubmission states." +
1191
                            f" Exiting this job's resubmission attempt.")
1192
                proc_table['STATUS'][rown] = "DEP_NOT_SUBD"
×
1193
                return proc_table, submits
×
1194
        qdeps = []
×
1195
        for idep in np.sort(np.atleast_1d(ideps)):
×
NEW
1196
            if idep not in id_to_row_map and idep // 1000 != row['INTID'] // 1000:
×
NEW
1197
                log.warning(f"Internal ID: {idep} not in id_to_row_map. "
×
1198
                            + "This is expected since it's from another day. "
1199
                            + f" This dependency will not be checked or "
1200
                            + "resubmitted")
NEW
1201
                continue
×
1202
            if proc_table['STATUS'][id_to_row_map[idep]] in resubmission_states:
×
1203
                proc_table, submits = recursive_submit_failed(id_to_row_map[idep],
×
1204
                                                              proc_table, submits,
1205
                                                              id_to_row_map,
1206
                                                              reservation=reservation,
1207
                                                              dry_run=dry_run)
1208
            qdeps.append(proc_table['LATEST_QID'][id_to_row_map[idep]])
×
1209

1210
        qdeps = np.atleast_1d(qdeps)
×
1211
        if len(qdeps) > 0:
×
1212
            proc_table['LATEST_DEP_QID'][rown] = qdeps
×
1213
        else:
1214
            log.error(f"number of qdeps should be 1 or more: Rown {rown}, ideps {ideps}")
×
1215

1216
    proc_table[rown] = submit_batch_script(proc_table[rown], reservation=reservation,
×
1217
                                           strictly_successful=True, dry_run=dry_run)
1218
    submits += 1
×
1219

1220
    if not dry_run:
×
1221
        sleep_and_report(1, message_suffix=f"after submitting job to queue")
×
1222
        if submits % 10 == 0:
×
1223
            if ptab_name is None:
×
1224
                write_table(proc_table, tabletype='processing', overwrite=True)
×
1225
            else:
1226
                write_table(proc_table, tablename=ptab_name, overwrite=True)
×
1227
            sleep_and_report(2, message_suffix=f"after writing to disk")
×
1228
        if submits % 100 == 0:
×
1229
            proc_table = update_from_queue(proc_table)
×
1230
            if ptab_name is None:
×
1231
                write_table(proc_table, tabletype='processing', overwrite=True)
×
1232
            else:
1233
                write_table(proc_table, tablename=ptab_name, overwrite=True)
×
1234
            sleep_and_report(10, message_suffix=f"after updating queue and writing to disk")
×
1235
    return proc_table, submits
×
1236

1237

1238
#########################################
1239
########     Joint fit     ##############
1240
#########################################
1241
def joint_fit(ptable, prows, internal_id, queue, reservation, descriptor, z_submit_types=None,
1✔
1242
              dry_run=0, strictly_successful=False, check_for_outputs=True, resubmit_partial_complete=True,
1243
              system_name=None):
1244
    """
1245
    DEPRECATED
1246
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1247
    joint fitting job given by descriptor. If the joint fitting job is standard star fitting, the post standard star fits
1248
    for all the individual exposures also created and submitted. The returned ptable has all of these rows added to the
1249
    table given as input.
1250

1251
    Args:
1252
        ptable (Table): The processing table where each row is a processed job.
1253
        prows (list or array of dict): The rows corresponding to the individual exposure jobs that are
1254
            inputs to the joint fit.
1255
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1256
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1257
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1258
        descriptor (str): Description of the joint fitting job. Can either be 'science' or 'stdstarfit', 'arc' or 'psfnight',
1259
            or 'flat' or 'nightlyflat'.
1260
        z_submit_types (list of str, optional): The "group" types of redshifts that should be submitted with each
1261
            exposure. If not specified or None, then no redshifts are submitted.
1262
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1263
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1264
            for testing as though scripts are being submitted. Default is 0 (false).
1265
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1266
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1267
            than failing completely from failed calibrations. Default is False.
1268
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1269
            data products for the script being submitted. If all files exist and this is True,
1270
            then the script will not be submitted. If some files exist and this is True, only the
1271
            subset of the cameras without the final data products will be generated and submitted.
1272
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1273
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1274
            remaining cameras not found to exist.
1275
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1276

1277
    Returns:
1278
        tuple: A tuple containing:
1279

1280
        * ptable, Table. The same processing table as input except with added rows for the joint fit job and, in the case
1281
          of a stdstarfit, the poststdstar science exposure jobs.
1282
        * joint_prow, dict. Row of a processing table corresponding to the joint fit job.
1283
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1284
    """
1285
    log = get_logger()
×
1286
    if len(prows) < 1:
×
1287
        return ptable, None, internal_id
×
1288

1289
    if descriptor is None:
×
1290
        return ptable, None
×
1291
    elif descriptor == 'arc':
×
1292
        descriptor = 'psfnight'
×
1293
    elif descriptor == 'flat':
×
1294
        descriptor = 'nightlyflat'
×
1295
    elif descriptor == 'science':
×
1296
        if z_submit_types is None or len(z_submit_types) == 0:
×
1297
            descriptor = 'stdstarfit'
×
1298

1299
    if descriptor not in ['psfnight', 'nightlyflat', 'science','stdstarfit']:
×
1300
        return ptable, None, internal_id
×
1301

1302
    log.info(" ")
×
1303
    log.info(f"Joint fit criteria found. Running {descriptor}.\n")
×
1304

1305
    if descriptor == 'science':
×
NEW
1306
        joint_prow, internal_id = make_joint_prow(prows, descriptor='stdstarfit', internal_id=internal_id)
×
1307
    else:
NEW
1308
        joint_prow, internal_id = make_joint_prow(prows, descriptor=descriptor, internal_id=internal_id)
×
1309
    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1310
                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1311
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1312
    ptable.add_row(joint_prow)
×
1313

1314
    if descriptor in ['science','stdstarfit']:
×
1315
        if descriptor == 'science':
×
1316
            zprows = []
×
1317
        log.info(" ")
×
1318
        log.info(f"Submitting individual science exposures now that joint fitting of standard stars is submitted.\n")
×
1319
        for row in prows:
×
1320
            if row['LASTSTEP'] == 'stdstarfit':
×
1321
                continue
×
1322
            row['JOBDESC'] = 'poststdstar'
×
1323

1324
            # poststdstar job can't process cameras not included in its stdstar joint fit
1325
            stdcamword = joint_prow['PROCCAMWORD']
×
1326
            thiscamword = row['PROCCAMWORD']
×
1327
            proccamword = camword_intersection([stdcamword, thiscamword])
×
1328
            if proccamword != thiscamword:
×
1329
                dropcams = difference_camwords(thiscamword, proccamword)
×
1330
                assert dropcams != ''  #- i.e. if they differ, we should be dropping something
×
1331
                log.warning(f"Dropping exp {row['EXPID']} poststdstar cameras {dropcams} since they weren't included in stdstar fit {stdcamword}")
×
1332
                row['PROCCAMWORD'] = proccamword
×
1333

1334
            row['INTID'] = internal_id
×
1335
            internal_id += 1
×
1336
            row['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
×
1337
            row = assign_dependency(row, joint_prow)
×
1338
            row = create_and_submit(row, queue=queue, reservation=reservation, dry_run=dry_run,
×
1339
                                    strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1340
                                    resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1341
            ptable.add_row(row)
×
1342
            if descriptor == 'science' and row['LASTSTEP'] == 'all':
×
1343
                zprows.append(row)
×
1344

1345
    ## Now run redshifts
1346
    if descriptor == 'science' and len(zprows) > 0 and z_submit_types is not None:
×
1347
        prow_selection = (  (ptable['OBSTYPE'] == 'science')
×
1348
                          & (ptable['LASTSTEP'] == 'all')
1349
                          & (ptable['JOBDESC'] == 'poststdstar')
1350
                          & (ptable['TILEID'] == int(zprows[0]['TILEID'])) )
1351
        nightly_zprows = []
×
1352
        if np.sum(prow_selection) == len(zprows):
×
1353
            nightly_zprows = zprows.copy()
×
1354
        else:
1355
            for prow in ptable[prow_selection]:
×
1356
                nightly_zprows.append(table_row_to_dict(prow))
×
1357

1358
        for zsubtype in z_submit_types:
×
1359
            if zsubtype == 'perexp':
×
1360
                for zprow in zprows:
×
1361
                    log.info(" ")
×
1362
                    log.info(f"Submitting redshift fit of type {zsubtype} for TILEID {zprow['TILEID']} and EXPID {zprow['EXPID']}.\n")
×
NEW
1363
                    joint_prow, internal_id = make_joint_prow([zprow], descriptor=zsubtype, internal_id=internal_id)
×
1364
                    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1365
                                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1366
                                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1367
                    ptable.add_row(joint_prow)
×
1368
            else:
1369
                log.info(" ")
×
1370
                log.info(f"Submitting joint redshift fits of type {zsubtype} for TILEID {nightly_zprows[0]['TILEID']}.")
×
1371
                expids = [prow['EXPID'][0] for prow in nightly_zprows]
×
1372
                log.info(f"Expids: {expids}.\n")
×
NEW
1373
                joint_prow, internal_id = make_joint_prow(nightly_zprows, descriptor=zsubtype, internal_id=internal_id)
×
1374
                joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1375
                                               strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1376
                                               resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1377
                ptable.add_row(joint_prow)
×
1378

1379
    if descriptor in ['psfnight', 'nightlyflat']:
×
1380
        log.info(f"Setting the calibration exposures as calibrators in the processing table.\n")
×
1381
        ptable = set_calibrator_flag(prows, ptable)
×
1382

1383
    return ptable, joint_prow, internal_id
×
1384

1385
def joint_cal_fit(descriptor, ptable, prows, internal_id, queue, reservation,
1✔
1386
                  dry_run=0, strictly_successful=False, check_for_outputs=True,
1387
                  resubmit_partial_complete=True, system_name=None):
1388
    """
1389
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1390
    joint fitting job given by descriptor. If the joint fitting job is standard star fitting, the post standard star fits
1391
    for all the individual exposures also created and submitted. The returned ptable has all of these rows added to the
1392
    table given as input.
1393

1394
    Args:
1395
        descriptor (str): Description of the joint fitting job. Can either be 'science' or 'stdstarfit', 'arc' or 'psfnight',
1396
            or 'flat' or 'nightlyflat'.
1397
        prows (list or array of dict): The rows corresponding to the individual exposure jobs that are
1398
            inputs to the joint fit.
1399
        ptable (Table): The processing table where each row is a processed job.
1400
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1401
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1402
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1403
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1404
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1405
            for testing as though scripts are being submitted. Default is 0 (false).
1406
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1407
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1408
            than failing completely from failed calibrations. Default is False.
1409
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1410
            data products for the script being submitted. If all files exist and this is True,
1411
            then the script will not be submitted. If some files exist and this is True, only the
1412
            subset of the cameras without the final data products will be generated and submitted.
1413
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1414
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1415
            remaining cameras not found to exist.
1416
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1417

1418
    Returns:
1419
        tuple: A tuple containing:
1420

1421
        * ptable, Table. The same processing table as input except with added rows for the joint fit job and, in the case
1422
          of a stdstarfit, the poststdstar science exposure jobs.
1423
        * joint_prow, dict. Row of a processing table corresponding to the joint fit job.
1424
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1425
    """
NEW
1426
    log = get_logger()
×
NEW
1427
    if len(prows) < 1:
×
NEW
1428
        return ptable, None, internal_id
×
1429

NEW
1430
    if descriptor is None:
×
NEW
1431
        return ptable, None
×
NEW
1432
    elif descriptor == 'arc':
×
NEW
1433
        descriptor = 'psfnight'
×
NEW
1434
    elif descriptor == 'flat':
×
NEW
1435
        descriptor = 'nightlyflat'
×
1436

NEW
1437
    if descriptor not in ['psfnight', 'nightlyflat']:
×
NEW
1438
        return ptable, None, internal_id
×
1439

NEW
1440
    log.info(" ")
×
NEW
1441
    log.info(f"Joint fit criteria found. Running {descriptor}.\n")
×
1442

NEW
1443
    joint_prow, internal_id = make_joint_prow(prows, descriptor=descriptor, internal_id=internal_id)
×
NEW
1444
    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1445
                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1446
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
NEW
1447
    ptable.add_row(joint_prow)
×
1448

NEW
1449
    if descriptor in ['psfnight', 'nightlyflat']:
×
NEW
1450
        log.info(f"Setting the calibration exposures as calibrators in the processing table.\n")
×
NEW
1451
        ptable = set_calibrator_flag(prows, ptable)
×
1452

NEW
1453
    return ptable, joint_prow, internal_id
×
1454

1455

1456
#########################################
1457
########     Redshifts     ##############
1458
#########################################
1459
def submit_redshifts(ptable, prows, tnight, internal_id, queue, reservation,
1✔
1460
              dry_run=0, strictly_successful=False,
1461
              check_for_outputs=True, resubmit_partial_complete=True,
1462
              z_submit_types=None, system_name=None):
1463
    """
1464
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1465
    tilenight job given by descriptor. The returned ptable has all of these rows added to the
1466
    table given as input.
1467

1468
    Args:
1469
        ptable (Table): The processing table where each row is a processed job.
1470
        prows (list or array of dict): Unsubmitted prestdstar jobs that are first steps in tilenight.
1471
        tnight (Table.Row): The processing table row of the tilenight job on which the redshifts depend.
1472
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1473
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1474
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1475
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1476
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1477
            for testing as though scripts are being submitted. Default is 0 (false).
1478
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1479
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1480
            than failing completely from failed calibrations. Default is False.
1481
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1482
            data products for the script being submitted. If all files exist and this is True,
1483
            then the script will not be submitted. If some files exist and this is True, only the
1484
            subset of the cameras without the final data products will be generated and submitted.
1485
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1486
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1487
            remaining cameras not found to exist.
1488
        z_submit_types (list of str): The "group" types of redshifts that should be submitted with each
1489
            exposure. If not specified or None, then no redshifts are submitted.
1490
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1491

1492
    Returns:
1493
        tuple: A tuple containing:
1494

1495
        * ptable, Table. The same processing table as input except with added rows for the joint fit job.
1496
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1497
    """
1498
    log = get_logger()
1✔
1499
    if len(prows) < 1 or z_submit_types == None:
1✔
1500
        return ptable, internal_id
1✔
1501

1502
    log.info(" ")
1✔
1503
    log.info(f"Running redshifts.\n")
1✔
1504

1505
    ## Now run redshifts
1506
    zprows = []
1✔
1507
    for row in prows:
1✔
1508
        if row['LASTSTEP'] == 'all':
1✔
1509
            zprows.append(row)
1✔
1510

1511
    if len(zprows) > 0:
1✔
1512
        for zsubtype in z_submit_types:
1✔
1513
            if zsubtype == 'perexp':
1✔
1514
                for zprow in zprows:
×
1515
                    log.info(" ")
×
1516
                    log.info(f"Submitting redshift fit of type {zsubtype} for TILEID {zprow['TILEID']} and EXPID {zprow['EXPID']}.\n")
×
1517
                    redshift_prow = make_redshift_prow([zprow], tnight, descriptor=zsubtype, internal_id=internal_id)
×
1518
                    internal_id += 1
×
1519
                    redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1520
                                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1521
                                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1522
                    ptable.add_row(redshift_prow)
×
1523
            else:
1524
                log.info(" ")
1✔
1525
                log.info(f"Submitting joint redshift fits of type {zsubtype} for TILEID {zprows[0]['TILEID']}.")
1✔
1526
                expids = [prow['EXPID'][0] for prow in zprows]
1✔
1527
                log.info(f"Expids: {expids}.\n")
1✔
1528
                redshift_prow = make_redshift_prow(zprows, tnight, descriptor=zsubtype, internal_id=internal_id)
1✔
1529
                internal_id += 1
1✔
1530
                redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
1✔
1531
                                               strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1532
                                               resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1533
                ptable.add_row(redshift_prow)
1✔
1534

1535
    return ptable, internal_id
1✔
1536

1537
#########################################
1538
########     Tilenight     ##############
1539
#########################################
1540
def submit_tilenight(ptable, prows, calibjobs, internal_id, queue, reservation,
1✔
1541
              dry_run=0, strictly_successful=False, resubmit_partial_complete=True,
1542
              system_name=None, use_specter=False, extra_job_args=None):
1543
    """
1544
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1545
    tilenight job given by descriptor. The returned ptable has all of these rows added to the
1546
    table given as input.
1547

1548
    Args:
1549
        ptable (Table): The processing table where each row is a processed job.
1550
        prows (list or array of dict): Unsubmitted prestdstar jobs that are first steps in tilenight.
1551
        calibjobs (dict): Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
1552
            and 'nightlyflat'. Each key corresponds to a Table.Row or
1553
            None. The table.Row() values are for the corresponding
1554
            calibration job.
1555
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1556
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1557
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1558
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1559
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1560
            for testing as though scripts are being submitted. Default is 0 (false).
1561
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1562
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1563
            than failing completely from failed calibrations. Default is False.
1564
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1565
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1566
            remaining cameras not found to exist.
1567
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1568
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
1569
        extra_job_args (dict): Dictionary with key-value pairs that specify additional
1570
            information used for a specific type of job. Examples include
1571
            laststeps for for tilenight, etc.
1572

1573
    Returns:
1574
        tuple: A tuple containing:
1575

1576
        * ptable, Table. The same processing table as input except with added rows for the joint fit job.
1577
        * tnight_prow, dict. Row of a processing table corresponding to the tilenight job.
1578
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1579
    """
1580
    log = get_logger()
1✔
1581
    if len(prows) < 1:
1✔
1582
        return ptable, None, internal_id
×
1583

1584
    log.info(" ")
1✔
1585
    log.info(f"Running tilenight.\n")
1✔
1586

1587
    tnight_prow = make_tnight_prow(prows, calibjobs, internal_id=internal_id)
1✔
1588
    internal_id += 1
1✔
1589
    tnight_prow = create_and_submit(tnight_prow, queue=queue, reservation=reservation, dry_run=dry_run,
1✔
1590
                                   strictly_successful=strictly_successful, check_for_outputs=False,
1591
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name,
1592
                                   use_specter=use_specter, extra_job_args=extra_job_args)
1593
    ptable.add_row(tnight_prow)
1✔
1594

1595
    return ptable, tnight_prow, internal_id
1✔
1596

1597
## wrapper functions for joint fitting
1598
def science_joint_fit(ptable, sciences, internal_id, queue='realtime', reservation=None,
1✔
1599
                      z_submit_types=None, dry_run=0, strictly_successful=False,
1600
                      check_for_outputs=True, resubmit_partial_complete=True,
1601
                      system_name=None):
1602
    """
1603
    Wrapper function for desiproc.workflow.processing.joint_fit specific to the stdstarfit joint fit and redshift fitting.
1604

1605
    All variables are the same except::
1606

1607
        Arg 'sciences' is mapped to the prows argument of joint_fit.
1608
        The joint_fit argument descriptor is pre-defined as 'science'.
1609
    """
1610
    return joint_fit(ptable=ptable, prows=sciences, internal_id=internal_id, queue=queue, reservation=reservation,
×
1611
                     descriptor='science', z_submit_types=z_submit_types, dry_run=dry_run,
1612
                     strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1613
                     resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1614

1615

1616
def flat_joint_fit(ptable, flats, internal_id, queue='realtime',
1✔
1617
                   reservation=None, dry_run=0, strictly_successful=False,
1618
                   check_for_outputs=True, resubmit_partial_complete=True,
1619
                   system_name=None):
1620
    """
1621
    Wrapper function for desiproc.workflow.processing.joint_fit specific to the nightlyflat joint fit.
1622

1623
    All variables are the same except::
1624

1625
        Arg 'flats' is mapped to the prows argument of joint_fit.
1626
        The joint_fit argument descriptor is pre-defined as 'nightlyflat'.
1627
    """
1628
    return joint_fit(ptable=ptable, prows=flats, internal_id=internal_id, queue=queue, reservation=reservation,
×
1629
                     descriptor='nightlyflat', dry_run=dry_run, strictly_successful=strictly_successful,
1630
                     check_for_outputs=check_for_outputs, resubmit_partial_complete=resubmit_partial_complete,
1631
                     system_name=system_name)
1632

1633

1634
def arc_joint_fit(ptable, arcs, internal_id, queue='realtime',
1✔
1635
                  reservation=None, dry_run=0, strictly_successful=False,
1636
                  check_for_outputs=True, resubmit_partial_complete=True,
1637
                  system_name=None):
1638
    """
1639
    Wrapper function for desiproc.workflow.processing.joint_fit specific to the psfnight joint fit.
1640

1641
    All variables are the same except::
1642

1643
        Arg 'arcs' is mapped to the prows argument of joint_fit.
1644
        The joint_fit argument descriptor is pre-defined as 'psfnight'.
1645
    """
1646
    return joint_fit(ptable=ptable, prows=arcs, internal_id=internal_id, queue=queue, reservation=reservation,
×
1647
                     descriptor='psfnight', dry_run=dry_run, strictly_successful=strictly_successful,
1648
                     check_for_outputs=check_for_outputs, resubmit_partial_complete=resubmit_partial_complete,
1649
                     system_name=system_name)
1650

1651
def make_joint_prow(prows, descriptor, internal_id):
1✔
1652
    """
1653
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
1654
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
1655
    input prows).
1656

1657
    Args:
1658
        prows, list or array of dicts. The rows corresponding to the individual exposure jobs that are
1659
            inputs to the joint fit.
1660
        descriptor, str. Description of the joint fitting job. Can either be 'stdstarfit', 'psfnight', or 'nightlyflat'.
1661
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1662

1663
    Returns:
1664
        dict: Row of a processing table corresponding to the joint fit job.
1665
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1666
    """
1667
    first_row = table_row_to_dict(prows[0])
1✔
1668
    joint_prow = first_row.copy()
1✔
1669

1670
    joint_prow['INTID'] = internal_id
1✔
1671
    internal_id += 1
1✔
1672
    joint_prow['JOBDESC'] = descriptor
1✔
1673
    joint_prow['LATEST_QID'] = -99
1✔
1674
    joint_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
1✔
1675
    joint_prow['SUBMIT_DATE'] = -99
1✔
1676
    joint_prow['STATUS'] = 'U'
1✔
1677
    joint_prow['SCRIPTNAME'] = ''
1✔
1678
    joint_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int)
1✔
1679

1680
    ## Assign the PROCCAMWORD based on the descriptor and the input exposures
1681
    if descriptor == 'stdstarfit':
1✔
1682
        pcamwords = [prow['PROCCAMWORD'] for prow in prows]
×
1683
        joint_prow['PROCCAMWORD'] = camword_union(pcamwords,
×
1684
                                                  full_spectros_only=True)
1685
    else:
1686
        ## For arcs and flats, a BADAMP takes out the camera, so remove those
1687
        ## cameras from the proccamword
1688
        pcamwords = []
1✔
1689
        for prow in prows:
1✔
1690
            if len(prow['BADAMPS']) > 0:
1✔
1691
                badcams = []
×
1692
                for (camera, petal, amplifier) in parse_badamps(prow['BADAMPS']):
×
1693
                    badcams.append(f'{camera}{petal}')
×
1694
                badampcamword = create_camword(list(set(badcams)))
×
1695
                pcamword = difference_camwords(prow['PROCCAMWORD'], badampcamword)
×
1696
            else:
1697
                pcamword = prow['PROCCAMWORD']
1✔
1698
            pcamwords.append(pcamword)
1✔
1699

1700
        ## For flats we want any camera that exists in all 12 exposures
1701
        ## For arcs we want any camera that exists in at least 3 exposures
1702
        if descriptor == 'nightlyflat':
1✔
1703
            joint_prow['PROCCAMWORD'] = camword_intersection(pcamwords,
1✔
1704
                                                             full_spectros_only=False)
1705
        elif descriptor == 'psfnight':
1✔
1706
            ## Count number of exposures each camera is present for
1707
            camcheck = {}
1✔
1708
            for camword in pcamwords:
1✔
1709
                for cam in decode_camword(camword):
1✔
1710
                    if cam in camcheck:
1✔
1711
                        camcheck[cam] += 1
1✔
1712
                    else:
1713
                        camcheck[cam] = 1
1✔
1714
            ## if exists in 3 or more exposures, then include it
1715
            goodcams = []
1✔
1716
            for cam,camcount in camcheck.items():
1✔
1717
                if camcount >= 3:
1✔
1718
                    goodcams.append(cam)
1✔
1719
            joint_prow['PROCCAMWORD'] = create_camword(goodcams)
1✔
1720

1721
    joint_prow = assign_dependency(joint_prow, dependency=prows)
1✔
1722
    return joint_prow, internal_id
1✔
1723

1724
def make_exposure_prow(erow, int_id, calibjobs, jobdesc=None):
1✔
1725
    prow = erow_to_prow(erow)
1✔
1726
    prow['INTID'] = int_id
1✔
1727
    int_id += 1
1✔
1728
    if jobdesc is None:
1✔
1729
        prow['JOBDESC'] = prow['OBSTYPE']
1✔
1730
    else:
1731
        prow['JOBDESC'] = jobdesc
1✔
1732
    prow = define_and_assign_dependency(prow, calibjobs)
1✔
1733
    return prow, int_id
1✔
1734

1735
def make_tnight_prow(prows, calibjobs, internal_id):
1✔
1736
    """
1737
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
1738
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
1739
    input prows).
1740

1741
    Args:
1742
        prows, list or array of dicts. Unsumbitted rows corresponding to the individual prestdstar jobs that are
1743
            the first steps of tilenight.
1744
        calibjobs, dict. Dictionary containing keys that each corresponds to a Table.Row or
1745
            None, with each table.Row() value corresponding to a calibration job
1746
            on which the tilenight job depends.
1747
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1748

1749
    Returns:
1750
        dict: Row of a processing table corresponding to the tilenight job.
1751
    """
1752
    first_row = table_row_to_dict(prows[0])
1✔
1753
    joint_prow = first_row.copy()
1✔
1754

1755
    joint_prow['INTID'] = internal_id
1✔
1756
    joint_prow['JOBDESC'] = 'tilenight'
1✔
1757
    joint_prow['LATEST_QID'] = -99
1✔
1758
    joint_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
1✔
1759
    joint_prow['SUBMIT_DATE'] = -99
1✔
1760
    joint_prow['STATUS'] = 'U'
1✔
1761
    joint_prow['SCRIPTNAME'] = ''
1✔
1762
    joint_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int)
1✔
1763

1764
    joint_prow = define_and_assign_dependency(joint_prow,calibjobs,use_tilenight=True)
1✔
1765

1766
    return joint_prow
1✔
1767

1768
def make_redshift_prow(prows, tnight, descriptor, internal_id):
1✔
1769
    """
1770
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
1771
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
1772
    input prows).
1773

1774
    Args:
1775
        prows, list or array of dicts. Unsumbitted rows corresponding to the individual prestdstar jobs that are
1776
            the first steps of tilenight.
1777
        tnight, Table.Row object. Row corresponding to the tilenight job on which the redshift job depends.
1778
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1779

1780
    Returns:
1781
        dict: Row of a processing table corresponding to the tilenight job.
1782
    """
1783
    first_row = table_row_to_dict(prows[0])
1✔
1784
    redshift_prow = first_row.copy()
1✔
1785

1786
    redshift_prow['INTID'] = internal_id
1✔
1787
    redshift_prow['JOBDESC'] = descriptor
1✔
1788
    redshift_prow['LATEST_QID'] = -99
1✔
1789
    redshift_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
1✔
1790
    redshift_prow['SUBMIT_DATE'] = -99
1✔
1791
    redshift_prow['STATUS'] = 'U'
1✔
1792
    redshift_prow['SCRIPTNAME'] = ''
1✔
1793
    redshift_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int)
1✔
1794

1795
    redshift_prow = assign_dependency(redshift_prow,dependency=tnight)
1✔
1796

1797
    return redshift_prow
1✔
1798

1799
def checkfor_and_submit_joint_job(ptable, arcs, flats, sciences, calibjobs,
1✔
1800
                                  lasttype, internal_id, z_submit_types=None, dry_run=0,
1801
                                  queue='realtime', reservation=None, strictly_successful=False,
1802
                                  check_for_outputs=True, resubmit_partial_complete=True,
1803
                                  system_name=None):
1804
    """
1805
    Takes all the state-ful data from daily processing and determines whether a joint fit needs to be submitted. Places
1806
    the decision criteria into a single function for easier maintainability over time. These are separate from the
1807
    new standard manifest*.json method of indicating a calibration sequence is complete. That is checked independently
1808
    elsewhere and doesn't interact with this.
1809

1810
    Args:
1811
        ptable (Table): Processing table of all exposures that have been processed.
1812
        arcs (list of dict): list of the individual arc jobs to be used for the psfnight (NOT all
1813
            the arcs, if multiple sets existed). May be empty if none identified yet.
1814
        flats (list of dict): list of the individual flat jobs to be used for the nightlyflat (NOT
1815
            all the flats, if multiple sets existed). May be empty if none identified yet.
1816
        sciences (list of dict): list of the most recent individual prestdstar science exposures
1817
            (if currently processing that tile). May be empty if none identified yet.
1818
        calibjobs (dict): Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
1819
            and 'nightlyflat'. Each key corresponds to a Table.Row or
1820
            None. The table.Row() values are for the corresponding
1821
            calibration job.
1822
        lasttype (str or None): the obstype of the last individual exposure row to be processed.
1823
        internal_id (int): an internal identifier unique to each job. Increments with each new job. This
1824
            is the smallest unassigned value.
1825
        z_submit_types (list of str): The "group" types of redshifts that should be submitted with each
1826
            exposure. If not specified or None, then no redshifts are submitted.
1827
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1828
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1829
            for testing as though scripts are being submitted. Default is 0 (false).
1830
        queue (str, optional): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1831
        reservation (str, optional): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1832
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1833
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1834
            than failing completely from failed calibrations. Default is False.
1835
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1836
            data products for the script being submitted. If all files exist and this is True,
1837
            then the script will not be submitted. If some files exist and this is True, only the
1838
            subset of the cameras without the final data products will be generated and submitted.
1839
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1840
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1841
            remaining cameras not found to exist.
1842
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, permutter-gpu
1843

1844
    Returns:
1845
        tuple: A tuple containing:
1846

1847
        * ptable, Table, Processing table of all exposures that have been processed.
1848
        * calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
1849
          and 'nightlyflat'. Each key corresponds to a Table.Row or
1850
          None. The table.Row() values are for the corresponding
1851
          calibration job.
1852
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
1853
          (if currently processing that tile). May be empty if none identified yet or
1854
          we just submitted them for processing.
1855
        * internal_id, int, if no job is submitted, this is the same as the input, otherwise it is incremented upward from
1856
          from the input such that it represents the smallest unused ID.
1857
    """
1858
    if lasttype == 'science' and len(sciences) > 0:
×
1859
        log = get_logger()
×
1860
        skysubonly = np.array([sci['LASTSTEP'] == 'skysub' for sci in sciences])
×
1861
        if np.all(skysubonly):
×
1862
            log.error("Identified all exposures in joint fitting request as skysub-only. Not submitting")
×
1863
            sciences = []
×
1864
            return ptable, calibjobs, sciences, internal_id
×
1865

1866
        if np.any(skysubonly):
×
1867
            log.error("Identified skysub-only exposures in joint fitting request")
×
1868
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
1869
            log.info("LASTSTEP's: {}".format([row['LASTSTEP'] for row in sciences]))
×
1870
            sciences = (np.array(sciences,dtype=object)[~skysubonly]).tolist()
×
1871
            log.info("Removed skysub only exposures in joint fitting:")
×
1872
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
1873
            log.info("LASTSTEP's: {}".format([row['LASTSTEP'] for row in sciences]))
×
1874

1875
        from collections import Counter
×
1876
        tiles = np.array([sci['TILEID'] for sci in sciences])
×
1877
        counts = Counter(tiles)
×
1878
        if len(counts.most_common()) > 1:
×
1879
            log.error("Identified more than one tile in a joint fitting request")
×
1880
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
1881
            log.info("Tileid's: {}".format(tiles))
×
1882
            log.info("Returning without joint fitting any of these exposures.")
×
1883
            # most_common, nmost_common = counts.most_common()[0]
1884
            # if most_common == -99:
1885
            #     most_common, nmost_common = counts.most_common()[1]
1886
            # log.warning(f"Given multiple tiles to jointly fit: {counts}. "+
1887
            #             "Only processing the most common non-default " +
1888
            #             f"tile: {most_common} with {nmost_common} exposures")
1889
            # sciences = (np.array(sciences,dtype=object)[tiles == most_common]).tolist()
1890
            # log.info("Tiles and exposure id's being submitted for joint fitting:")
1891
            # log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
1892
            # log.info("Tileid's: {}".format([row['TILEID'] for row in sciences]))
1893
            sciences = []
×
1894
            return ptable, calibjobs, sciences, internal_id
×
1895

1896
        ptable, tilejob, internal_id = science_joint_fit(ptable, sciences, internal_id, z_submit_types=z_submit_types,
×
1897
                                                         dry_run=dry_run, queue=queue, reservation=reservation,
1898
                                                         strictly_successful=strictly_successful,
1899
                                                         check_for_outputs=check_for_outputs,
1900
                                                         resubmit_partial_complete=resubmit_partial_complete,
1901
                                                         system_name=system_name)
1902
        if tilejob is not None:
×
1903
            sciences = []
×
1904

1905
    elif lasttype == 'flat' and calibjobs['nightlyflat'] is None and len(flats) == 12:
×
1906
        ## Note here we have an assumption about the number of expected flats being greater than 11
1907
        ptable, calibjobs['nightlyflat'], internal_id \
×
1908
            = flat_joint_fit(ptable, flats, internal_id, dry_run=dry_run, queue=queue,
1909
                             reservation=reservation, strictly_successful=strictly_successful,
1910
                             check_for_outputs=check_for_outputs,
1911
                             resubmit_partial_complete=resubmit_partial_complete,
1912
                             system_name=system_name
1913
                            )
1914

1915
    elif lasttype == 'arc' and calibjobs['psfnight'] is None and len(arcs) == 5:
×
1916
        ## Note here we have an assumption about the number of expected arcs being greater than 4
1917
        ptable, calibjobs['psfnight'], internal_id \
×
1918
            = arc_joint_fit(ptable, arcs, internal_id, dry_run=dry_run, queue=queue,
1919
                            reservation=reservation, strictly_successful=strictly_successful,
1920
                            check_for_outputs=check_for_outputs,
1921
                            resubmit_partial_complete=resubmit_partial_complete,
1922
                            system_name=system_name
1923
                            )
1924
    return ptable, calibjobs, sciences, internal_id
×
1925

1926
def submit_tilenight_and_redshifts(ptable, sciences, calibjobs, internal_id, dry_run=0,
1✔
1927
                                  queue='realtime', reservation=None, strictly_successful=False,
1928
                                  check_for_outputs=True, resubmit_partial_complete=True,
1929
                                  system_name=None,use_specter=False, extra_job_args=None):
1930
    """
1931
    Takes all the state-ful data from daily processing and determines whether a tilenight job needs to be submitted.
1932

1933
    Args:
1934
        ptable (Table): Processing table of all exposures that have been processed.
1935
        sciences (list of dict): list of the most recent individual prestdstar science exposures
1936
            (if currently processing that tile). May be empty if none identified yet.
1937
        internal_id (int): an internal identifier unique to each job. Increments with each new job. This
1938
            is the smallest unassigned value.
1939
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1940
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1941
            for testing as though scripts are being submitted. Default is 0 (false).
1942
        queue (str, optional): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1943
        reservation (str, optional): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1944
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1945
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1946
            than failing completely from failed calibrations. Default is False.
1947
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1948
            data products for the script being submitted. If all files exist and this is True,
1949
            then the script will not be submitted. If some files exist and this is True, only the
1950
            subset of the cameras without the final data products will be generated and submitted.
1951
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1952
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1953
            remaining cameras not found to exist.
1954
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, permutter-gpu
1955
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
1956
        extra_job_args (dict, optional): Dictionary with key-value pairs that specify additional
1957
            information used for a specific type of job. Examples include
1958
            laststeps for tilenight, z_submit_types for redshifts, etc.
1959

1960
    Returns:
1961
        tuple: A tuple containing:
1962

1963
        * ptable, Table, Processing table of all exposures that have been processed.
1964
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
1965
          (if currently processing that tile). May be empty if none identified yet or
1966
          we just submitted them for processing.
1967
        * internal_id, int, if no job is submitted, this is the same as the input, otherwise it is incremented upward from
1968
          from the input such that it represents the smallest unused ID.
1969
    """
1970
    ptable, tnight, internal_id = submit_tilenight(ptable, sciences, calibjobs, internal_id,
1✔
1971
                                             queue=queue, reservation=reservation,
1972
                                             dry_run=dry_run, strictly_successful=strictly_successful,
1973
                                             resubmit_partial_complete=resubmit_partial_complete,
1974
                                             system_name=system_name,use_specter=use_specter,
1975
                                             extra_job_args=extra_job_args)
1976

1977
    z_submit_types = None
1✔
1978
    if 'z_submit_types'  in extra_job_args:
1✔
1979
        z_submit_types = extra_job_args['z_submit_types']
1✔
1980
        
1981
    ptable, internal_id = submit_redshifts(ptable, sciences, tnight, internal_id,
1✔
1982
                                    queue=queue, reservation=reservation,
1983
                                    dry_run=dry_run, strictly_successful=strictly_successful,
1984
                                    check_for_outputs=check_for_outputs,
1985
                                    resubmit_partial_complete=resubmit_partial_complete,
1986
                                    z_submit_types=z_submit_types,
1987
                                    system_name=system_name)
1988

1989
    if tnight is not None:
1✔
1990
        sciences = []
1✔
1991

1992
    return ptable, sciences, internal_id
1✔
1993

1994
def set_calibrator_flag(prows, ptable):
1✔
1995
    """
1996
    Sets the "CALIBRATOR" column of a procesing table row to 1 (integer representation of True)
1997
     for all input rows. Used within joint fitting code to flag the exposures that were input
1998
     to the psfnight or nightlyflat for later reference.
1999

2000
    Args:
2001
        prows, list or array of Table.Rows or dicts. The rows corresponding to the individual exposure jobs that are
2002
            inputs to the joint fit.
2003
        ptable, Table. The processing table where each row is a processed job.
2004

2005
    Returns:
2006
        Table: The same processing table as input except with added rows for the joint fit job and, in the case
2007
        of a stdstarfit, the poststdstar science exposure jobs.
2008
    """
2009
    for prow in prows:
1✔
2010
        ptable['CALIBRATOR'][ptable['INTID'] == prow['INTID']] = 1
1✔
2011
    return ptable
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc