• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

desihub / desispec / 8303331706

15 Mar 2024 11:09PM UTC coverage: 28.064% (+3.0%) from 25.113%
8303331706

Pull #2187

github

akremin
bug fix when no flats
Pull Request #2187: Introduce desi_proc_night to unify and simplify processing scripts

750 of 1111 new or added lines in 20 files covered. (67.51%)

1066 existing lines in 11 files now uncovered.

13143 of 46833 relevant lines covered (28.06%)

0.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.83
/py/desispec/workflow/processing.py
1
"""
2
desispec.workflow.processing
3
============================
4

5
"""
6
import sys, os, glob
1✔
7
import json
1✔
8
from astropy.io import fits
1✔
9
from astropy.table import Table, join
1✔
10
import numpy as np
1✔
11

12
import time, datetime
1✔
13
from collections import OrderedDict
1✔
14
import subprocess
1✔
15

16
from desispec.scripts.link_calibnight import derive_include_exclude
1✔
17
from desispec.scripts.tile_redshifts import generate_tile_redshift_scripts
1✔
18
from desispec.workflow.redshifts import get_ztile_script_pathname, \
1✔
19
                                        get_ztile_relpath, \
20
                                        get_ztile_script_suffix
21
from desispec.workflow.queue import get_resubmission_states, update_from_queue, queue_info_from_qids
1✔
22
from desispec.workflow.timing import what_night_is_it
1✔
23
from desispec.workflow.desi_proc_funcs import get_desi_proc_batch_file_pathname, \
1✔
24
    create_desi_proc_batch_script, \
25
    get_desi_proc_batch_file_path, \
26
    get_desi_proc_tilenight_batch_file_pathname, \
27
    create_desi_proc_tilenight_batch_script, create_linkcal_batch_script
28
from desispec.workflow.utils import pathjoin, sleep_and_report, \
1✔
29
    load_override_file
30
from desispec.workflow.tableio import write_table, load_table
1✔
31
from desispec.workflow.proctable import table_row_to_dict, erow_to_prow
1✔
32
from desiutil.log import get_logger
1✔
33

34
from desispec.io import findfile, specprod_root
1✔
35
from desispec.io.util import decode_camword, create_camword, \
1✔
36
    difference_camwords, \
37
    camword_to_spectros, camword_union, camword_intersection, parse_badamps
38

39

40
#################################################
41
############## Misc Functions ###################
42
#################################################
43
def night_to_starting_iid(night=None):
1✔
44
    """
45
    Creates an internal ID for a given night. The resulting integer is an 8 digit number.
46
    The digits are YYMMDDxxx where YY is the years since 2000, MM and DD are the month and day. xxx are 000,
47
    and are incremented for up to 1000 unique job ID's for a given night.
48

49
    Args:
50
        night (str or int): YYYYMMDD of the night to get the starting internal ID for.
51

52
    Returns:
53
        int: 9 digit number consisting of YYMMDD000. YY is years after 2000, MMDD is month and day.
54
        000 being the starting job number (0).
55
    """
56
    if night is None:
1✔
57
        night = what_night_is_it()
×
58
    night = int(night)
1✔
59
    internal_id = (night - 20000000) * 1000
1✔
60
    return internal_id
1✔
61

62
class ProcessingParams():
1✔
63
    def __init__(self, dry_run_level=0, queue='realtime',
1✔
64
                 reservation=None, strictly_successful=True,
65
                 check_for_outputs=True,
66
                 resubmit_partial_complete=True,
67
                 system_name='perlmutter', use_specter=True):
68

NEW
69
        self.dry_run_level = dry_run_level
×
NEW
70
        self.system_name = system_name
×
NEW
71
        self.queue = queue
×
NEW
72
        self.reservation = reservation
×
NEW
73
        self.strictly_successful = strictly_successful
×
NEW
74
        self.check_for_outputs = check_for_outputs
×
NEW
75
        self.resubmit_partial_complete = resubmit_partial_complete
×
NEW
76
        self.use_specter = use_specter
×
77

78
#################################################
79
############ Script Functions ###################
80
#################################################
81
def batch_script_name(prow):
1✔
82
    """
83
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, JOBDESC, PROCCAMWORD defined)
84
    and determines the script file pathname as defined by desi_proc's helper functions.
85

86
    Args:
87
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
88

89
    Returns:
90
        str: The complete pathname to the script file, as it is defined within the desi_proc ecosystem.
91
    """
92
    expids = prow['EXPID']
1✔
93
    if len(expids) == 0:
1✔
94
        expids = None
1✔
95
    if prow['JOBDESC'] == 'tilenight':
1✔
96
        pathname = get_desi_proc_tilenight_batch_file_pathname(night = prow['NIGHT'], tileid=prow['TILEID'])
1✔
97
    else:
98
        pathname = get_desi_proc_batch_file_pathname(night = prow['NIGHT'], exp=expids, \
1✔
99
                                             jobdesc=prow['JOBDESC'], cameras=prow['PROCCAMWORD'])
100
    scriptfile =  pathname + '.slurm'
1✔
101
    return scriptfile
1✔
102

103
def get_jobdesc_to_file_map():
1✔
104
    """
105
    Returns a mapping of job descriptions to the filenames of the output files
106

107
    Args:
108
        None
109

110
    Returns:
111
        dict. Dictionary with keys as lowercase job descriptions and to the
112
            filename of their expected outputs.
113

114
    """
115
    return {'prestdstar': 'sframe',
1✔
116
            'stdstarfit': 'stdstars',
117
            'poststdstar': 'cframe',
118
            'nightlybias': 'biasnight',
119
            # 'ccdcalib': 'badcolumns',
120
            'badcol': 'badcolumns',
121
            'arc': 'fitpsf',
122
            'flat': 'fiberflat',
123
            'psfnight': 'psfnight',
124
            'nightlyflat': 'fiberflatnight',
125
            'spectra': 'spectra_tile',
126
            'coadds': 'coadds_tile',
127
            'redshift': 'redrock_tile'}
128

129
def get_file_to_jobdesc_map():
1✔
130
    """
131
    Returns a mapping of output filenames to job descriptions
132

133
    Args:
134
        None
135

136
    Returns:
137
        dict. Dictionary with keys as filename of their expected outputs to
138
            the lowercase job descriptions
139
            .
140

141
    """
142
    job_to_file_map = get_jobdesc_to_file_map()
1✔
143
    job_to_file_map.pop('badcol') # these files can also be in a ccdcalib job
1✔
144
    job_to_file_map.pop('nightlybias') # these files can also be in a ccdcalib job
1✔
145
    return {value: key for key, value in job_to_file_map.items()}
1✔
146

147
def check_for_outputs_on_disk(prow, resubmit_partial_complete=True):
1✔
148
    """
149
    Args:
150
        prow (Table.Row or dict): Must include keyword accessible definitions for processing_table columns found in
151
            desispect.workflow.proctable.get_processing_table_column_defs()
152
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
153
            jobs with some prior data are pruned using PROCCAMWORD to only process the
154
            remaining cameras not found to exist.
155

156
    Returns:
157
        Table.Row or dict: The same prow type and keywords as input except with modified values updated to reflect
158
        the change in job status after creating and submitting the job for processing.
159
    """
160
    prow['STATUS'] = 'UNKNOWN'
1✔
161
    log = get_logger()
1✔
162

163
    if prow['JOBDESC'] in ['linkcal', 'ccdcalib']:
1✔
164
        log.info(f"jobdesc={prow['JOBDESC']} has indeterminated outputs, so "
1✔
165
                + "not checking for files on disk.")
166
        return prow
1✔
167

168
    job_to_file_map = get_jobdesc_to_file_map()
1✔
169

170
    night = prow['NIGHT']
1✔
171
    if prow['JOBDESC'] in ['cumulative','pernight-v0','pernight','perexp']:
1✔
172
        filetype = 'redrock_tile'
1✔
173
    else:
174
        filetype = job_to_file_map[prow['JOBDESC']]
1✔
175
    orig_camword = prow['PROCCAMWORD']
1✔
176

177
    ## if spectro based, look for spectros, else look for cameras
178
    if prow['JOBDESC'] in ['stdstarfit','spectra','coadds','redshift']:
1✔
179
        ## Spectrograph based
180
        spectros = camword_to_spectros(prow['PROCCAMWORD'])
×
181
        n_desired = len(spectros)
×
182
        ## Suppress outputs about using tile based files in findfile if only looking for stdstarfits
183
        if prow['JOBDESC'] == 'stdstarfit':
×
184
            tileid = None
×
185
        else:
186
            tileid = prow['TILEID']
×
187
        expid = prow['EXPID'][0]
×
188
        existing_spectros = []
×
189
        for spectro in spectros:
×
190
            if os.path.exists(findfile(filetype=filetype, night=night, expid=expid, spectrograph=spectro, tile=tileid)):
×
191
                existing_spectros.append(spectro)
×
192
        completed = (len(existing_spectros) == n_desired)
×
193
        if not completed and resubmit_partial_complete and len(existing_spectros) > 0:
×
194
            existing_camword = 'a' + ''.join([str(spec) for spec in sorted(existing_spectros)])
×
195
            prow['PROCCAMWORD'] = difference_camwords(prow['PROCCAMWORD'],existing_camword)
×
196
    elif prow['JOBDESC'] in ['cumulative','pernight-v0','pernight','perexp']:
1✔
197
        ## Spectrograph based
198
        spectros = camword_to_spectros(prow['PROCCAMWORD'])
1✔
199
        n_desired = len(spectros)
1✔
200
        ## Suppress outputs about using tile based files in findfile if only looking for stdstarfits
201
        tileid = prow['TILEID']
1✔
202
        expid = prow['EXPID'][0]
1✔
203
        redux_dir = specprod_root()
1✔
204
        outdir = os.path.join(redux_dir,get_ztile_relpath(tileid,group=prow['JOBDESC'],night=night,expid=expid))
1✔
205
        suffix = get_ztile_script_suffix(tileid, group=prow['JOBDESC'], night=night, expid=expid)
1✔
206
        existing_spectros = []
1✔
207
        for spectro in spectros:
1✔
208
            if os.path.exists(os.path.join(outdir, f"redrock-{spectro}-{suffix}.fits")):
1✔
209
                existing_spectros.append(spectro)
×
210
        completed = (len(existing_spectros) == n_desired)
1✔
211
        if not completed and resubmit_partial_complete and len(existing_spectros) > 0:
1✔
212
            existing_camword = 'a' + ''.join([str(spec) for spec in sorted(existing_spectros)])
×
213
            prow['PROCCAMWORD'] = difference_camwords(prow['PROCCAMWORD'],existing_camword)
×
214
    else:
215
        ## Otheriwse camera based
216
        cameras = decode_camword(prow['PROCCAMWORD'])
1✔
217
        n_desired = len(cameras)
1✔
218
        if len(prow['EXPID']) > 0:
1✔
219
            expid = prow['EXPID'][0]
1✔
220
        else:
221
            expid = None
1✔
222
        if len(prow['EXPID']) > 1 and prow['JOBDESC'] not in ['psfnight','nightlyflat']:
1✔
223
            log.warning(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']}. This job type only makes " +
×
224
                     f"sense with a single exposure. Proceeding with {expid}.")
225
        missing_cameras = []
1✔
226
        for cam in cameras:
1✔
227
            if not os.path.exists(findfile(filetype=filetype, night=night, expid=expid, camera=cam)):
1✔
228
                missing_cameras.append(cam)
1✔
229
        completed = (len(missing_cameras) == 0)
1✔
230
        if not completed and resubmit_partial_complete and len(missing_cameras) < n_desired:
1✔
231
            prow['PROCCAMWORD'] = create_camword(missing_cameras)
×
232

233
    if completed:
1✔
234
        prow['STATUS'] = 'COMPLETED'
×
235
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} already has " +
×
236
                 f"the desired {n_desired} {filetype}'s. Not submitting this job.")
237
    elif resubmit_partial_complete and orig_camword != prow['PROCCAMWORD']:
1✔
238
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} already has " +
×
239
                 f"some {filetype}'s. Submitting smaller camword={prow['PROCCAMWORD']}.")
240
    elif not resubmit_partial_complete:
1✔
241
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} doesn't have all " +
×
242
                 f"{filetype}'s and resubmit_partial_complete=False. "+
243
                 f"Submitting full camword={prow['PROCCAMWORD']}.")
244
    else:
245
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} has no " +
1✔
246
                 f"existing {filetype}'s. Submitting full camword={prow['PROCCAMWORD']}.")
247
    return prow
1✔
248

249
def create_and_submit(prow, queue='realtime', reservation=None, dry_run=0,
1✔
250
                      joint=False, strictly_successful=False,
251
                      check_for_outputs=True, resubmit_partial_complete=True,
252
                      system_name=None, use_specter=False,
253
                      extra_job_args=None):
254
    """
255
    Wrapper script that takes a processing table row and three modifier keywords, creates a submission script for the
256
    compute nodes, and then submits that script to the Slurm scheduler with appropriate dependencies.
257

258
    Args:
259
        prow (Table.Row or dict): Must include keyword accessible definitions for processing_table columns found in
260
            desispect.workflow.proctable.get_processing_table_column_defs()
261
        queue (str, optional): The name of the NERSC Slurm queue to submit to. Default is the realtime queue.
262
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
263
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
264
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
265
            for testing as though scripts are being submitted. Default is 0 (false).
266
        joint (bool, optional): Whether this is a joint fitting job (the job involves multiple exposures) and therefore needs to be
267
            run with desi_proc_joint_fit. Default is False.
268
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
269
            less desirable because e.g. the sciences can run with SVN default calibrations rather
270
            than failing completely from failed calibrations. Default is False.
271
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
272
            data products for the script being submitted. If all files exist and this is True,
273
            then the script will not be submitted. If some files exist and this is True, only the
274
            subset of the cameras without the final data products will be generated and submitted.
275
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
276
            jobs with some prior data are pruned using PROCCAMWORD to only process the
277
            remaining cameras not found to exist.
278
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
279
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
280
        extra_job_args (dict): Dictionary with key-value pairs that specify additional
281
            information used for a specific type of job. Examples include refnight
282
            and include/exclude lists for linkcals, laststeps for for tilenight, etc.
283

284
    Returns:
285
        Table.Row or dict: The same prow type and keywords as input except with modified values updated to reflect
286
        the change in job status after creating and submitting the job for processing.
287

288
    Note:
289
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
290
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
291
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
292
    """
293
    orig_prow = prow.copy()
1✔
294
    if check_for_outputs:
1✔
295
        prow = check_for_outputs_on_disk(prow, resubmit_partial_complete)
1✔
296
        if prow['STATUS'].upper() == 'COMPLETED':
1✔
297
            return prow
×
298

299
    prow = create_batch_script(prow, queue=queue, dry_run=dry_run, joint=joint,
1✔
300
                               system_name=system_name, use_specter=use_specter,
301
                               extra_job_args=extra_job_args)
302
    prow = submit_batch_script(prow, reservation=reservation, dry_run=dry_run,
1✔
303
                               strictly_successful=strictly_successful)
304

305
    ## If resubmitted partial, the PROCCAMWORD and SCRIPTNAME will correspond
306
    ## to the pruned values. But we want to
307
    ## retain the full job's value, so get those from the old job.
308
    if resubmit_partial_complete:
1✔
309
        prow['PROCCAMWORD'] = orig_prow['PROCCAMWORD']
1✔
310
        prow['SCRIPTNAME'] = orig_prow['SCRIPTNAME']
1✔
311
    return prow
1✔
312

313
def desi_link_calibnight_command(prow, refnight, include=None):
1✔
314
    """
315
    Wrapper script that takes a processing table row (or dictionary with
316
    REFNIGHT, NIGHT, PROCCAMWORD defined) and determines the proper command
317
    line call to link data defined by the input row/dict.
318

319
    Args:
320
        prow (Table.Row or dict): Must include keyword accessible definitions
321
            for 'NIGHT', 'REFNIGHT', and 'PROCCAMWORD'.
322
        refnight (str or int): The night with a valid set of calibrations
323
            be created.
324
        include (list): The filetypes to include in the linking.
325
    Returns:
326
        str: The proper command to be submitted to desi_link_calibnight
327
            to process the job defined by the prow values.
328
    """
329
    cmd = 'desi_link_calibnight'
1✔
330
    cmd += f' --refnight={refnight}'
1✔
331
    cmd += f' --newnight={prow["NIGHT"]}'
1✔
332
    cmd += f' --cameras={prow["PROCCAMWORD"]}'
1✔
333
    if include is not None:
1✔
334
        cmd += f' --include=' + ','.join(list(include))
1✔
335
    return cmd
1✔
336

337
def desi_proc_command(prow, system_name, use_specter=False, queue=None):
1✔
338
    """
339
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, OBSTYPE, JOBDESC, PROCCAMWORD defined)
340
    and determines the proper command line call to process the data defined by the input row/dict.
341

342
    Args:
343
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
344
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, perlmutter-gpu
345
        queue (str, optional): The name of the NERSC Slurm queue to submit to. Default is None (which leaves it to the desi_proc default).
346
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
347

348
    Returns:
349
        str: The proper command to be submitted to desi_proc to process the job defined by the prow values.
350
    """
351
    cmd = 'desi_proc'
1✔
352
    cmd += ' --batch'
1✔
353
    cmd += ' --nosubmit'
1✔
354
    if queue is not None:
1✔
355
        cmd += f' -q {queue}'
1✔
356
    if prow['OBSTYPE'].lower() == 'science':
1✔
357
        if prow['JOBDESC'] == 'prestdstar':
×
358
            cmd += ' --nostdstarfit --nofluxcalib'
×
359
        elif prow['JOBDESC'] == 'poststdstar':
×
360
            cmd += ' --noprestdstarfit --nostdstarfit'
×
361

362
        if use_specter:
×
363
            cmd += ' --use-specter'
×
364
    elif prow['JOBDESC'] in ['flat', 'prestdstar'] and use_specter:
1✔
365
        cmd += ' --use-specter'
×
366
    pcamw = str(prow['PROCCAMWORD'])
1✔
367
    cmd += f" --cameras={pcamw} -n {prow['NIGHT']}"
1✔
368
    if len(prow['EXPID']) > 0:
1✔
369
        ## If ccdcalib job without a dark exposure, don't assign the flat expid
370
        ## since it would incorrectly process the flat using desi_proc
371
        if prow['OBSTYPE'].lower() != 'flat' or prow['JOBDESC'] != 'ccdcalib':
1✔
372
            cmd += f" -e {prow['EXPID'][0]}"
1✔
373
    if prow['BADAMPS'] != '':
1✔
374
        cmd += ' --badamps={}'.format(prow['BADAMPS'])
×
375
    return cmd
1✔
376

377
def desi_proc_joint_fit_command(prow, queue=None):
1✔
378
    """
379
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, OBSTYPE, PROCCAMWORD defined)
380
    and determines the proper command line call to process the data defined by the input row/dict.
381

382
    Args:
383
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
384
        queue (str): The name of the NERSC Slurm queue to submit to. Default is None (which leaves it to the desi_proc default).
385

386
    Returns:
387
        str: The proper command to be submitted to desi_proc_joint_fit
388
            to process the job defined by the prow values.
389
    """
390
    cmd = 'desi_proc_joint_fit'
×
391
    cmd += ' --batch'
×
392
    cmd += ' --nosubmit'
×
393
    if queue is not None:
×
394
        cmd += f' -q {queue}'
×
395

396
    descriptor = prow['OBSTYPE'].lower()
×
397

398
    night = prow['NIGHT']
×
399
    specs = str(prow['PROCCAMWORD'])
×
400
    expid_str = ','.join([str(eid) for eid in prow['EXPID']])
×
401

402
    cmd += f' --obstype {descriptor}'
×
403
    cmd += f' --cameras={specs} -n {night}'
×
404
    if len(expid_str) > 0:
×
405
        cmd += f' -e {expid_str}'
×
406
    return cmd
×
407

408

409
def create_batch_script(prow, queue='realtime', dry_run=0, joint=False,
1✔
410
                        system_name=None, use_specter=False, extra_job_args=None):
411
    """
412
    Wrapper script that takes a processing table row and three modifier keywords and creates a submission script for the
413
    compute nodes.
414

415
    Args:
416
        prow, Table.Row or dict. Must include keyword accessible definitions for processing_table columns found in
417
            desispect.workflow.proctable.get_processing_table_column_defs()
418
        queue, str. The name of the NERSC Slurm queue to submit to. Default is the realtime queue.
419
        dry_run, int. If nonzero, this is a simulated run. If dry_run=1 the scripts will be written but not submitted.
420
            If dry_run=2, the scripts will not be written nor submitted. Logging will remain the same
421
            for testing as though scripts are being submitted. Default is 0 (false).
422
        joint, bool. Whether this is a joint fitting job (the job involves multiple exposures) and therefore needs to be
423
            run with desi_proc_joint_fit when not using tilenight. Default is False.
424
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
425
        use_specter, bool, optional. Default is False. If True, use specter, otherwise use gpu_specter by default.
426
        extra_job_args (dict): Dictionary with key-value pairs that specify additional
427
            information used for a specific type of job. Examples include refnight
428
            and include/exclude lists for linkcal, laststeps for tilenight, etc.
429

430
    Returns:
431
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
432
        scriptname.
433

434
    Note:
435
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
436
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
437
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
438
    """
439
    log = get_logger()
1✔
440

441
    if extra_job_args is None:
1✔
442
        extra_job_args = {}
1✔
443

444
    if prow['JOBDESC'] in ['perexp','pernight','pernight-v0','cumulative']:
1✔
445
        if dry_run > 1:
1✔
446
            scriptpathname = get_ztile_script_pathname(tileid=prow['TILEID'],group=prow['JOBDESC'],
1✔
447
                                                               night=prow['NIGHT'], expid=prow['EXPID'][0])
448

449
            log.info("Output file would have been: {}".format(scriptpathname))
1✔
450
        else:
451
            #- run zmtl for cumulative redshifts but not others
452
            run_zmtl = (prow['JOBDESC'] == 'cumulative')
1✔
453
            no_afterburners = False
1✔
454
            scripts, failed_scripts = generate_tile_redshift_scripts(tileid=prow['TILEID'], group=prow['JOBDESC'],
1✔
455
                                                                     nights=[prow['NIGHT']], expids=prow['EXPID'],
456
                                                                     batch_queue=queue, system_name=system_name,
457
                                                                     run_zmtl=run_zmtl,
458
                                                                     no_afterburners=no_afterburners,
459
                                                                     nosubmit=True)
460
            if len(failed_scripts) > 0:
1✔
461
                log.error(f"Redshifts failed for group={prow['JOBDESC']}, night={prow['NIGHT']}, "+
×
462
                          f"tileid={prow['TILEID']}, expid={prow['EXPID']}.")
463
                log.info(f"Returned failed scriptname is {failed_scripts}")
×
464
            elif len(scripts) > 1:
1✔
465
                log.error(f"More than one redshifts returned for group={prow['JOBDESC']}, night={prow['NIGHT']}, "+
×
466
                          f"tileid={prow['TILEID']}, expid={prow['EXPID']}.")
467
                log.info(f"Returned scriptnames were {scripts}")
×
468
            else:
469
                scriptpathname = scripts[0]
1✔
470
    elif prow['JOBDESC'] == 'linkcal':
1✔
471
        refnight, include, exclude = -99, None, None
1✔
472
        if 'refnight' in extra_job_args:
1✔
473
            refnight = extra_job_args['refnight']
1✔
474
        if 'include' in extra_job_args:
1✔
475
            include = extra_job_args['include']
1✔
476
        if 'exclude' in extra_job_args:
1✔
NEW
477
            exclude = extra_job_args['exclude']
×
478
        include, exclude = derive_include_exclude(include, exclude)
1✔
479
        ## Fiberflatnights shouldn't to be generated with psfs from same time, so
480
        ## shouldn't link psfs without also linking fiberflatnight
481
        ## However, this should be checked at a higher level. If set here,
482
        ## go ahead and do it
483
        # if 'psfnight' in include and not 'fiberflatnight' in include:
484
        #     err = "Must link fiberflatnight if linking psfnight"
485
        #     log.error(err)
486
        #     raise ValueError(err)
487
        if dry_run > 1:
1✔
488
            scriptpathname = batch_script_name(prow)
1✔
489
            log.info("Output file would have been: {}".format(scriptpathname))
1✔
490
            cmd = desi_link_calibnight_command(prow, refnight, include)
1✔
491
            log.info("Command to be run: {}".format(cmd.split()))
1✔
492
        else:
NEW
493
            if refnight == -99:
×
NEW
494
                err = f'For {prow=} asked to link calibration but not given' \
×
495
                      + ' a valid refnight'
NEW
496
                log.error(err)
×
NEW
497
                raise ValueError(err)
×
498

NEW
499
            cmd = desi_link_calibnight_command(prow, refnight, include)
×
NEW
500
            log.info(f"Running: {cmd.split()}")
×
NEW
501
            scriptpathname = create_linkcal_batch_script(newnight=prow['NIGHT'],
×
502
                                                        cameras=prow['PROCCAMWORD'],
503
                                                        queue=queue,
504
                                                        cmd=cmd,
505
                                                        system_name=system_name)
506
    else:
507
        if prow['JOBDESC'] != 'tilenight':
1✔
508
            nightlybias, nightlycte, cte_expids = False, False, None
1✔
509
            if 'nightlybias' in extra_job_args:
1✔
510
                nightlybias = extra_job_args['nightlybias']
1✔
511
            elif prow['JOBDESC'].lower() == 'nightlybias':
1✔
512
                nightlybias = True
1✔
513
            if 'nightlycte' in extra_job_args:
1✔
514
                nightlycte = extra_job_args['nightlycte']
1✔
515
            if 'cte_expids' in extra_job_args:
1✔
516
                cte_expids = extra_job_args['cte_expids']
1✔
517
            if joint:
1✔
518
                cmd = desi_proc_joint_fit_command(prow, queue=queue)
×
519
            else:
520
                cmd = desi_proc_command(prow, system_name, use_specter, queue=queue)
1✔
521
                if nightlybias:
1✔
522
                    cmd += ' --nightlybias'
1✔
523
                if nightlycte:
1✔
524
                    cmd += ' --nightlycte'
1✔
525
                    if cte_expids is not None:
1✔
526
                        cmd += ' --cte-expids '
1✔
527
                        cmd += ','.join(np.atleast_1d(cte_expids).astype(str))
1✔
528
        if dry_run > 1:
1✔
529
            scriptpathname = batch_script_name(prow)
1✔
530
            log.info("Output file would have been: {}".format(scriptpathname))
1✔
531
            if prow['JOBDESC'] != 'tilenight':
1✔
532
                log.info("Command to be run: {}".format(cmd.split()))
1✔
533
        else:
534
            expids = prow['EXPID']
1✔
535
            if len(expids) == 0:
1✔
536
                expids = None
×
537

538
            if prow['JOBDESC'] == 'tilenight':
1✔
539
                log.info("Creating tilenight script for tile {}".format(prow['TILEID']))
1✔
540
                if 'laststeps' in extra_job_args:
1✔
541
                    laststeps = extra_job_args['laststeps']
1✔
542
                else:
NEW
543
                    err = f'{prow=} job did not specify last steps to tilenight'
×
NEW
544
                    log.error(err)
×
NEW
545
                    raise ValueError(err)
×
546
                ncameras = len(decode_camword(prow['PROCCAMWORD']))
1✔
547
                scriptpathname = create_desi_proc_tilenight_batch_script(
1✔
548
                                                               night=prow['NIGHT'], exp=expids,
549
                                                               tileid=prow['TILEID'],
550
                                                               ncameras=ncameras,
551
                                                               queue=queue,
552
                                                               mpistdstars=True,
553
                                                               use_specter=use_specter,
554
                                                               system_name=system_name,
555
                                                               laststeps=laststeps)
556
            else:
557
                if expids is not None and len(expids) > 1:
1✔
558
                    expids = expids[:1]
1✔
559
                log.info("Running: {}".format(cmd.split()))
1✔
560
                scriptpathname = create_desi_proc_batch_script(night=prow['NIGHT'], exp=expids,
1✔
561
                                                               cameras=prow['PROCCAMWORD'],
562
                                                               jobdesc=prow['JOBDESC'],
563
                                                               queue=queue, cmdline=cmd,
564
                                                               use_specter=use_specter,
565
                                                               system_name=system_name,
566
                                                               nightlybias=nightlybias,
567
                                                               nightlycte=nightlycte,
568
                                                               cte_expids=cte_expids)
569
    log.info("Outfile is: {}".format(scriptpathname))
1✔
570
    prow['SCRIPTNAME'] = os.path.basename(scriptpathname)
1✔
571
    return prow
1✔
572

573
_fake_qid = int(time.time() - 1.7e9)
1✔
574
def _get_fake_qid():
1✔
575
    """
576
    Return fake slurm queue jobid to use for dry-run testing
577
    """
578
    # Note: not implemented as a yield generator so that this returns a
579
    # genuine int, not a generator object
580
    global _fake_qid
581
    _fake_qid += 1
1✔
582
    return _fake_qid
1✔
583

584
def submit_batch_script(prow, dry_run=0, reservation=None, strictly_successful=False):
1✔
585
    """
586
    Wrapper script that takes a processing table row and three modifier keywords and submits the scripts to the Slurm
587
    scheduler.
588

589
    Args:
590
        prow, Table.Row or dict. Must include keyword accessible definitions for processing_table columns found in
591
            desispect.workflow.proctable.get_processing_table_column_defs()
592
        dry_run, int. If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
593
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
594
            for testing as though scripts are being submitted. Default is 0 (false).
595
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
596
        strictly_successful, bool. Whether all jobs require all inputs to have succeeded. For daily processing, this is
597
            less desirable because e.g. the sciences can run with SVN default calibrations rather
598
            than failing completely from failed calibrations. Default is False.
599

600
    Returns:
601
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
602
        scriptname.
603

604
    Note:
605
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
606
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
607
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
608
    """
609
    log = get_logger()
1✔
610
    dep_qids = prow['LATEST_DEP_QID']
1✔
611
    dep_list, dep_str = '', ''
1✔
612

613
    # workaround for sbatch --dependency bug not tracking completed jobs correctly
614
    # see NERSC TICKET INC0203024
615
    if len(dep_qids) > 0 and not dry_run:
1✔
616
        dep_table = queue_info_from_qids(np.asarray(dep_qids), columns='jobid,state')
×
617
        for row in dep_table:
×
618
            if row['STATE'] == 'COMPLETED':
×
619
                log.info(f"removing completed jobid {row['JOBID']}")
×
620
                dep_qids = np.delete(dep_qids, np.argwhere(dep_qids==row['JOBID']))
×
621

622
    if len(dep_qids) > 0:
1✔
623
        jobtype = prow['JOBDESC']
1✔
624
        if strictly_successful:
1✔
625
            depcond = 'afterok'
1✔
626
        elif jobtype in ['arc', 'psfnight', 'prestdstar', 'stdstarfit']:
×
627
            ## (though psfnight and stdstarfit will require some inputs otherwise they'll go up in flames)
628
            depcond = 'afterany'
×
629
        else:
630
            ## if 'flat','nightlyflat','poststdstar', or any type of redshift, require strict success of inputs
631
            depcond = 'afterok'
×
632

633
        dep_str = f'--dependency={depcond}:'
1✔
634

635
        if np.isscalar(dep_qids):
1✔
636
            dep_list = str(dep_qids).strip(' \t')
×
637
            if dep_list == '':
×
638
                dep_str = ''
×
639
            else:
640
                dep_str += dep_list
×
641
        else:
642
            if len(dep_qids)>1:
1✔
643
                dep_list = ':'.join(np.array(dep_qids).astype(str))
1✔
644
                dep_str += dep_list
1✔
645
            elif len(dep_qids) == 1 and dep_qids[0] not in [None, 0]:
1✔
646
                dep_str += str(dep_qids[0])
1✔
647
            else:
648
                dep_str = ''
×
649

650
    # script = f'{jobname}.slurm'
651
    # script_path = pathjoin(batchdir, script)
652
    if prow['JOBDESC'] in ['pernight-v0','pernight','perexp','cumulative']:
1✔
653
        script_path = get_ztile_script_pathname(tileid=prow['TILEID'],group=prow['JOBDESC'],
1✔
654
                                                        night=prow['NIGHT'], expid=np.min(prow['EXPID']))
655
        jobname = os.path.basename(script_path)
1✔
656
    else:
657
        batchdir = get_desi_proc_batch_file_path(night=prow['NIGHT'])
1✔
658
        jobname = batch_script_name(prow)
1✔
659
        script_path = pathjoin(batchdir, jobname)
1✔
660

661
    batch_params = ['sbatch', '--parsable']
1✔
662
    if dep_str != '':
1✔
663
        batch_params.append(f'{dep_str}')
1✔
664
    if reservation is not None:
1✔
665
        batch_params.append(f'--reservation={reservation}')
×
666
    batch_params.append(f'{script_path}')
1✔
667

668
    if dry_run:
1✔
669
        current_qid = _get_fake_qid()
1✔
670
    else:
671
        #- sbatch sometimes fails; try several times before giving up
672
        max_attempts = 3
×
673
        for attempt in range(max_attempts):
×
674
            try:
×
675
                current_qid = subprocess.check_output(batch_params, stderr=subprocess.STDOUT, text=True)
×
676
                current_qid = int(current_qid.strip(' \t\n'))
×
677
                break
×
678
            except subprocess.CalledProcessError as err:
×
679
                log.error(f'{jobname} submission failure at {datetime.datetime.now()}')
×
680
                log.error(f'{jobname}   {batch_params}')
×
681
                log.error(f'{jobname}   {err.output=}')
×
682
                if attempt < max_attempts - 1:
×
683
                    log.info('Sleeping 60 seconds then retrying')
×
684
                    time.sleep(60)
×
685
        else:  #- for/else happens if loop doesn't succeed
686
            msg = f'{jobname} submission failed {max_attempts} times; exiting'
×
687
            log.critical(msg)
×
688
            raise RuntimeError(msg)
×
689

690
    log.info(batch_params)
1✔
691
    log.info(f'Submitted {jobname} with dependencies {dep_str} and reservation={reservation}. Returned qid: {current_qid}')
1✔
692

693
    prow['LATEST_QID'] = current_qid
1✔
694
    prow['ALL_QIDS'] = np.append(prow['ALL_QIDS'],current_qid)
1✔
695
    prow['STATUS'] = 'SUBMITTED'
1✔
696
    prow['SUBMIT_DATE'] = int(time.time())
1✔
697

698
    return prow
1✔
699

700

701
#############################################
702
##########   Row Manipulations   ############
703
#############################################
704
def define_and_assign_dependency(prow, calibjobs, use_tilenight=False,
1✔
705
                                 refnight=None):
706
    """
707
    Given input processing row and possible calibjobs, this defines the
708
    JOBDESC keyword and assigns the dependency appropriate for the job type of
709
    prow.
710

711
    Args:
712
        prow, Table.Row or dict. Must include keyword accessible definitions for
713
            'OBSTYPE'. A row must have column names for
714
            'JOBDESC', 'INT_DEP_IDS', and 'LATEST_DEP_ID'.
715
        calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
716
            and 'nightlyflat'. Each key corresponds to a Table.Row or
717
            None. The table.Row() values are for the corresponding
718
            calibration job. Each value that isn't None must contain
719
            'INTID', and 'LATEST_QID'. If None, it assumes the
720
            dependency doesn't exist and no dependency is assigned.
721
        use_tilenight, bool. Default is False. If True, use desi_proc_tilenight
722
            for prestdstar, stdstar,and poststdstar steps for
723
            science exposures.
724
        refnight, int. The reference night for linking jobs
725

726
    Returns:
727
        Table.Row or dict: The same prow type and keywords as input except
728
        with modified values updated values for 'JOBDESC', 'INT_DEP_IDS'. and 'LATEST_DEP_ID'.
729

730
    Note:
731
        This modifies the input. Though Table.Row objects are generally copied
732
        on modification, so the change to the input object in memory may or may
733
        not be changed. As of writing, a row from a table given to this function
734
        will not change during the execution of this function (but can be
735
        overwritten explicitly with the returned row if desired).
736
    """
737
    if prow['OBSTYPE'] in ['science', 'twiflat']:
1✔
738
        if calibjobs['nightlyflat'] is not None:
1✔
739
            dependency = calibjobs['nightlyflat']
1✔
740
        elif calibjobs['psfnight'] is not None:
1✔
741
            dependency = calibjobs['psfnight']
1✔
742
        elif calibjobs['ccdcalib'] is not None:
1✔
743
            dependency = calibjobs['ccdcalib']
1✔
744
        elif calibjobs['nightlybias'] is not None:
1✔
745
            dependency = calibjobs['nightlybias']
×
746
        elif calibjobs['badcol'] is not None:
1✔
NEW
747
            dependency = calibjobs['badcol']
×
748
        else:
749
            dependency = calibjobs['linkcal']
1✔
750
        if not use_tilenight:
1✔
751
            prow['JOBDESC'] = 'prestdstar'
1✔
752
    elif prow['OBSTYPE'] == 'flat':
1✔
753
        if calibjobs['psfnight'] is not None:
1✔
754
            dependency = calibjobs['psfnight']
1✔
755
        elif calibjobs['ccdcalib'] is not None:
1✔
756
            dependency = calibjobs['ccdcalib']
1✔
757
        elif calibjobs['nightlybias'] is not None:
1✔
758
            dependency = calibjobs['nightlybias']
×
759
        elif calibjobs['badcol'] is not None:
1✔
NEW
760
            dependency = calibjobs['badcol']
×
761
        else:
762
            dependency = calibjobs['linkcal']
1✔
763
    elif prow['OBSTYPE'] == 'arc':
1✔
764
        if calibjobs['ccdcalib'] is not None:
1✔
765
            dependency = calibjobs['ccdcalib']
1✔
766
        elif calibjobs['nightlybias'] is not None:
1✔
767
            dependency = calibjobs['nightlybias']
1✔
768
        elif calibjobs['badcol'] is not None:
1✔
769
            dependency = calibjobs['badcol']
×
770
        else:
771
            dependency = calibjobs['linkcal']
1✔
772
    elif prow['JOBDESC'] in ['badcol', 'nightlybias', 'ccdcalib']:
1✔
773
        dependency = calibjobs['linkcal']
1✔
774
    elif prow['OBSTYPE'] == 'dark':
1✔
NEW
775
        if calibjobs['ccdcalib'] is not None:
×
NEW
776
            dependency = calibjobs['ccdcalib']
×
NEW
777
        elif calibjobs['nightlybias'] is not None:
×
NEW
778
            dependency = calibjobs['nightlybias']
×
NEW
779
        elif calibjobs['badcol'] is not None:
×
NEW
780
            dependency = calibjobs['badcol']
×
781
        else:
NEW
782
            dependency = calibjobs['linkcal']
×
783
    elif prow['JOBDESC'] == 'linkcal' and refnight is not None:
1✔
784
        dependency = None
1✔
785
        ## For link cals only, enable cross-night dependencies if available
786
        refproctable = findfile('proctable', night=refnight)
1✔
787
        if os.path.exists(refproctable):
1✔
NEW
788
            ptab = load_table(tablename=refproctable, tabletype='proctable')
×
789
            ## This isn't perfect because we may depend on jobs that aren't
790
            ## actually being linked
791
            ## Also allows us to proceed even if jobs don't exist yet
NEW
792
            deps = []
×
NEW
793
            for job in ['nightlybias', 'ccdcalib', 'psfnight', 'nightlyflat']:
×
NEW
794
                if job in ptab['JOBDESC']:
×
795
                    ## add prow to dependencies
NEW
796
                    deps.append(ptab[ptab['JOBDESC']==job][0])
×
NEW
797
            if len(deps) > 0:
×
NEW
798
                dependency = deps
×
799
    else:
800
        dependency = None
×
801

802
    prow = assign_dependency(prow, dependency)
1✔
803

804
    return prow
1✔
805

806

807
def assign_dependency(prow, dependency):
1✔
808
    """
809
    Given input processing row and possible arcjob (processing row for psfnight) and flatjob (processing row for
810
    nightlyflat), this defines the JOBDESC keyword and assigns the dependency appropriate for the job type of prow.
811

812
    Args:
813
        prow, Table.Row or dict. Must include keyword accessible definitions for 'OBSTYPE'. A row must have column names for
814
            'JOBDESC', 'INT_DEP_IDS', and 'LATEST_DEP_ID'.
815
        dependency, NoneType or scalar/list/array of Table.Row, dict. Processing row corresponding to the required input
816
            for the job in prow. This must contain keyword
817
            accessible values for 'INTID', and 'LATEST_QID'.
818
            If None, it assumes the dependency doesn't exist
819
            and no dependency is assigned.
820

821
    Returns:
822
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
823
        'JOBDESC', 'INT_DEP_IDS'. and 'LATEST_DEP_ID'.
824

825
    Note:
826
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
827
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
828
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
829
    """
830
    prow['INT_DEP_IDS'] = np.ndarray(shape=0).astype(int)
1✔
831
    prow['LATEST_DEP_QID'] = np.ndarray(shape=0).astype(int)
1✔
832
    if dependency is not None:
1✔
833
        if type(dependency) in [list, np.array]:
1✔
834
            ids, qids = [], []
1✔
835
            for curdep in dependency:
1✔
836
                if still_a_dependency(curdep):
1✔
837
                    ids.append(curdep['INTID'])
1✔
838
                    qids.append(curdep['LATEST_QID'])
1✔
839
            prow['INT_DEP_IDS'] = np.array(ids, dtype=int)
1✔
840
            prow['LATEST_DEP_QID'] = np.array(qids, dtype=int)
1✔
841
        elif type(dependency) in [dict, OrderedDict, Table.Row] and still_a_dependency(dependency):
1✔
842
            prow['INT_DEP_IDS'] = np.array([dependency['INTID']], dtype=int)
1✔
843
            prow['LATEST_DEP_QID'] = np.array([dependency['LATEST_QID']], dtype=int)
1✔
844
    return prow
1✔
845

846
def still_a_dependency(dependency):
1✔
847
    """
848
    Defines the criteria for which a dependency is deemed complete (and therefore no longer a dependency).
849

850
     Args:
851
        dependency, Table.Row or dict. Processing row corresponding to the required input for the job in prow.
852
            This must contain keyword accessible values for 'STATUS', and 'LATEST_QID'.
853

854
    Returns:
855
        bool: False if the criteria indicate that the dependency is completed and no longer a blocking factor (ie no longer
856
        a genuine dependency). Returns True if the dependency is still a blocking factor such that the slurm
857
        scheduler needs to be aware of the pending job.
858

859
    """
860
    return dependency['LATEST_QID'] > 0 and dependency['STATUS'] != 'COMPLETED'
1✔
861

862
def get_type_and_tile(erow):
1✔
863
    """
864
    Trivial function to return the OBSTYPE and the TILEID from an exposure table row
865

866
    Args:
867
        erow, Table.Row or dict. Must contain 'OBSTYPE' and 'TILEID' as keywords.
868

869
    Returns:
870
        tuple (str, str), corresponding to the OBSTYPE and TILEID values of the input erow.
871
    """
872
    return str(erow['OBSTYPE']).lower(), erow['TILEID']
×
873

874

875
#############################################
876
#########   Table manipulators   ############
877
#############################################
878
def parse_previous_tables(etable, ptable, night):
1✔
879
    """
880
    This takes in the exposure and processing tables and regenerates all the working memory variables needed for the
881
    daily processing script.
882

883
    Used by the daily processing to define most of its state-ful variables into working memory.
884
    If the processing table is empty, these are simply declared and returned for use.
885
    If the code had previously run and exited (or crashed), however, this will all the code to
886
    re-establish itself by redefining these values.
887

888
    Args:
889
        etable, Table, Exposure table of all exposures that have been dealt with thus far.
890
        ptable, Table, Processing table of all exposures that have been processed.
891
        night, str or int, the night the data was taken.
892

893
    Returns:
894
        tuple: A tuple containing:
895

896
        * arcs, list of dicts, list of the individual arc jobs used for the psfnight (NOT all
897
          the arcs, if multiple sets existed)
898
        * flats, list of dicts, list of the individual flat jobs used for the nightlyflat (NOT
899
          all the flats, if multiple sets existed)
900
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
901
          (if currently processing that tile)
902
        * calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'badcol', 'psfnight'
903
          and 'nightlyflat'. Each key corresponds to a Table.Row or
904
          None. The table.Row() values are for the corresponding
905
          calibration job.
906
        * curtype, None, the obstype of the current job being run. Always None as first new job will define this.
907
        * lasttype, str or None, the obstype of the last individual exposure row to be processed.
908
        * curtile, None, the tileid of the current job (if science). Otherwise None. Always None as first
909
          new job will define this.
910
        * lasttile, str or None, the tileid of the last job (if science). Otherwise None.
911
        * internal_id, int, an internal identifier unique to each job. Increments with each new job. This
912
          is the latest unassigned value.
913
    """
914
    log = get_logger()
×
915
    arcs, flats, sciences = [], [], []
×
916
    calibjobs = {'nightlybias': None, 'ccdcalib': None, 'badcol': None, 'psfnight': None,
×
917
                 'nightlyflat': None}
918
    curtype,lasttype = None,None
×
919
    curtile,lasttile = None,None
×
920

921
    if len(ptable) > 0:
×
922
        prow = ptable[-1]
×
923
        internal_id = int(prow['INTID'])+1
×
924
        lasttype,lasttile = get_type_and_tile(ptable[-1])
×
925
        jobtypes = ptable['JOBDESC']
×
926

927
        if 'nightlybias' in jobtypes:
×
928
            calibjobs['nightlybias'] = table_row_to_dict(ptable[jobtypes=='nightlybias'][0])
×
929
            log.info("Located nightlybias job in exposure table: {}".format(calibjobs['nightlybias']))
×
930

931
        if 'ccdcalib' in jobtypes:
×
932
            calibjobs['ccdcalib'] = table_row_to_dict(ptable[jobtypes=='ccdcalib'][0])
×
933
            log.info("Located ccdcalib job in exposure table: {}".format(calibjobs['ccdcalib']))
×
934

935
        if 'psfnight' in jobtypes:
×
936
            calibjobs['psfnight'] = table_row_to_dict(ptable[jobtypes=='psfnight'][0])
×
937
            log.info("Located joint fit psfnight job in exposure table: {}".format(calibjobs['psfnight']))
×
938
        elif lasttype == 'arc':
×
939
            seqnum = 10
×
940
            for row in ptable[::-1]:
×
941
                erow = etable[etable['EXPID']==row['EXPID'][0]]
×
942
                if row['OBSTYPE'].lower() == 'arc' and int(erow['SEQNUM'])<seqnum:
×
943
                    arcs.append(table_row_to_dict(row))
×
944
                    seqnum = int(erow['SEQNUM'])
×
945
                else:
946
                    break
×
947
            ## Because we work backword to fill in, we need to reverse them to get chronological order back
948
            arcs = arcs[::-1]
×
949

950
        if 'nightlyflat' in jobtypes:
×
951
            calibjobs['nightlyflat'] = table_row_to_dict(ptable[jobtypes=='nightlyflat'][0])
×
952
            log.info("Located joint fit nightlyflat job in exposure table: {}".format(calibjobs['nightlyflat']))
×
953
        elif lasttype == 'flat':
×
954
            for row in ptable[::-1]:
×
955
                erow = etable[etable['EXPID']==row['EXPID'][0]]
×
956
                if row['OBSTYPE'].lower() == 'flat' and int(erow['SEQTOT']) < 5:
×
957
                    if float(erow['EXPTIME']) > 100.:
×
958
                        flats.append(table_row_to_dict(row))
×
959
                else:
960
                    break
×
961
            flats = flats[::-1]
×
962

963
        if lasttype.lower() == 'science':
×
964
            for row in ptable[::-1]:
×
965
                if row['OBSTYPE'].lower() == 'science' and row['TILEID'] == lasttile and \
×
966
                   row['JOBDESC'] == 'prestdstar' and row['LASTSTEP'] != 'skysub':
967
                    sciences.append(table_row_to_dict(row))
×
968
                else:
969
                    break
×
970
            sciences = sciences[::-1]
×
971
    else:
972
        internal_id = night_to_starting_iid(night)
×
973

974
    return arcs,flats,sciences, \
×
975
           calibjobs, \
976
           curtype, lasttype, \
977
           curtile, lasttile,\
978
           internal_id
979

980
def generate_calibration_dict(ptable, files_to_link=None):
1✔
981
    """
982
    This takes in a processing table and regenerates the working memory calibration
983
    dictionary for dependency tracking. Used by the daily processing to define 
984
    most of its state-ful variables into working memory.
985
    If the processing table is empty, these are simply declared and returned for use.
986
    If the code had previously run and exited (or crashed), however, this will all the code to
987
    re-establish itself by redefining these values.
988

989
    Args:
990
        ptable, Table, Processing table of all exposures that have been processed.
991
        files_to_link, set, Set of filenames that the linkcal job will link.
992

993
    Returns:
994
        calibjobs, dict. Dictionary containing 'nightlybias', 'badcol', 'ccdcalib',
995
            'psfnight', 'nightlyflat', 'linkcal', and 'completed'. Each key corresponds to a
996
            Table.Row or None. The table.Row() values are for the corresponding
997
            calibration job.
998
    """
999
    log = get_logger()
1✔
1000
    job_to_file_map = get_jobdesc_to_file_map()
1✔
1001
    calibjobs = {'nightlybias': None, 'ccdcalib': None, 'badcol': None,
1✔
1002
                 'psfnight': None, 'nightlyflat': None, 'linkcal': None,
1003
                 'completed': dict()}
1004
    ptable_jobtypes = ptable['JOBDESC']
1✔
1005

1006
    for jobtype in calibjobs.keys():
1✔
1007
        calibjobs["completed"][jobtype] = False
1✔
1008
        if jobtype in ptable_jobtypes:
1✔
NEW
1009
            calibjobs[jobtype] = table_row_to_dict(ptable[ptable_jobtypes==jobtype][0])
×
NEW
1010
            log.info(f"Located {jobtype} job in exposure table: {calibjobs[jobtype]}")
×
NEW
1011
            calibjobs["completed"][jobtype] = True
×
1012

1013
    if calibjobs["completed"]['linkcal'] and files_to_link is not None:
1✔
NEW
1014
        calibjobs = update_calibjobs_with_linking(calibjobs, files_to_link)
×
1015

1016
    return calibjobs
1✔
1017

1018

1019
def update_calibjobs_with_linking(calibjobs, files_to_link):
1✔
1020
    """
1021
    This takes in a dictionary summarizing the calibration jobs and updates it
1022
    based on the files_to_link, which are assumed to have already been linked
1023
    such that those files already exist on disk and don't need ot be generated.
1024

1025
    Parameters
1026
    ----------
1027
        calibjobs: dict
1028
            Dictionary containing "nightlybias", "badcol", "ccdcalib",
1029
            "psfnight", "nightlyflat", "linkcal", and "completed". Each key corresponds to a
1030
            Table.Row or None. The table.Row() values are for the corresponding
1031
            calibration job.
1032
        files_to_link: set
1033
            Set of filenames that the linkcal job will link.
1034

1035
    Returns
1036
    -------
1037
        calibjobs, dict
1038
            Dictionary containing 'nightlybias', 'badcol', 'ccdcalib',
1039
            'psfnight', 'nightlyflat', 'linkcal', and 'completed'. Each key corresponds to a
1040
            Table.Row or None. The table.Row() values are for the corresponding
1041
            calibration job.
1042
    """
1043
    log = get_logger()
1✔
1044
    
1045
    file_job_map = get_file_to_jobdesc_map()
1✔
1046
    for fil in files_to_link:
1✔
1047
        if fil in file_job_map:
1✔
1048
            calibjobs['completed'][file_job_map[fil]] = True
1✔
1049
        elif fil in ['biasnight', 'badcolumns', 'ctecorrnight']:
1✔
1050
            continue
1✔
1051
        else:
NEW
1052
            err = f"Filetype {fil} doesn't map to a known job description: "
×
NEW
1053
            err += f"{file_job_map=}"
×
NEW
1054
            log.error(err)
×
NEW
1055
            raise ValueError(err)
×
1056
        
1057
    if 'biasnight' in files_to_link and 'badcolumns' in files_to_link \
1✔
1058
            and 'ctecorrnight' in files_to_link:
1059
        calibjobs['completed']['ccdcalib'] = True
1✔
1060

1061
    return calibjobs
1✔
1062

1063
def all_calibs_submitted(completed):
1✔
1064
    """
1065
    Function that returns the boolean logic to determine if the necessary
1066
    calibration jobs have been submitted for calibration.
1067

1068
    Args:
1069
        completed, dict, Dictionary with keys corresponding to the calibration job descriptions and values of True or False.
1070

1071
    Returns:
1072
        bool, True if all necessary calibrations have been submitted or handled, False otherwise.
1073
    """
1074
    ccdlevel_completed = (completed['nightlybias'] or completed['badcol']
1✔
1075
                         or completed['ccdcalib'])
1076
    return ccdlevel_completed and completed['psfnight'] and completed['nightlyflat']
1✔
1077

1078
def update_and_recurvsively_submit(proc_table, submits=0, resubmission_states=None,
1✔
1079
                                   ptab_name=None, dry_run=0,reservation=None):
1080
    """
1081
    Given an processing table, this loops over job rows and resubmits failed jobs (as defined by resubmission_states).
1082
    Before submitting a job, it checks the dependencies for failures. If a dependency needs to be resubmitted, it recursively
1083
    follows dependencies until it finds the first job without a failed dependency and resubmits that. Then resubmits the
1084
    other jobs with the new Slurm jobID's for proper dependency coordination within Slurm.
1085

1086
    Args:
1087
        proc_table, Table, the processing table with a row per job.
1088
        submits, int, the number of submissions made to the queue. Used for saving files and in not overloading the scheduler.
1089
        resubmission_states, list or array of strings, each element should be a capitalized string corresponding to a
1090
            possible Slurm scheduler state, where you wish for jobs with that
1091
            outcome to be resubmitted
1092
        ptab_name, str, the full pathname where the processing table should be saved.
1093
        dry_run, int, If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1094
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1095
            for testing as though scripts are being submitted. Default is 0 (false).
1096
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
1097

1098
    Returns:
1099
        tuple: A tuple containing:
1100

1101
        * proc_table: Table, a table with the same rows as the input except that Slurm and jobid relevant columns have
1102
          been updated for those jobs that needed to be resubmitted.
1103
        * submits: int, the number of submissions made to the queue. This is incremented from the input submits, so it is
1104
          the number of submissions made from this function call plus the input submits value.
1105

1106
    Note:
1107
        This modifies the inputs of both proc_table and submits and returns them.
1108
    """
1109
    log = get_logger()
×
1110
    if resubmission_states is None:
×
1111
        resubmission_states = get_resubmission_states()
×
1112
    log.info(f"Resubmitting jobs with current states in the following: {resubmission_states}")
×
1113
    proc_table = update_from_queue(proc_table, dry_run=False)
×
1114
    log.info("Updated processing table queue information:")
×
1115
    cols = ['INTID', 'INT_DEP_IDS', 'EXPID', 'TILEID',
×
1116
            'OBSTYPE', 'JOBDESC', 'LATEST_QID', 'STATUS']
1117
    print(np.array(cols))
×
1118
    for row in proc_table:
×
1119
        print(np.array(row[cols]))
×
1120
    print("\n")
×
1121
    id_to_row_map = {row['INTID']: rown for rown, row in enumerate(proc_table)}
×
1122
    for rown in range(len(proc_table)):
×
1123
        if proc_table['STATUS'][rown] in resubmission_states:
×
1124
            proc_table, submits = recursive_submit_failed(rown, proc_table, submits,
×
1125
                                                          id_to_row_map, ptab_name,
1126
                                                          resubmission_states,
1127
                                                          reservation, dry_run)
1128
    return proc_table, submits
×
1129

1130
def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name=None,
1✔
1131
                            resubmission_states=None, reservation=None, dry_run=0):
1132
    """
1133
    Given a row of a processing table and the full processing table, this resubmits the given job.
1134
    Before submitting a job, it checks the dependencies for failures in the processing table. If a dependency needs to
1135
    be resubmitted, it recursively follows dependencies until it finds the first job without a failed dependency and
1136
    resubmits that. Then resubmits the other jobs with the new Slurm jobID's for proper dependency coordination within Slurm.
1137

1138
    Args:
1139
        rown, Table.Row, the row of the processing table that you want to resubmit.
1140
        proc_table, Table, the processing table with a row per job.
1141
        submits, int, the number of submissions made to the queue. Used for saving files and in not overloading the scheduler.
1142
        id_to_row_map, dict, lookup dictionary where the keys are internal ids (INTID's) and the values are the row position
1143
            in the processing table.
1144
        ptab_name, str, the full pathname where the processing table should be saved.
1145
        resubmission_states, list or array of strings, each element should be a capitalized string corresponding to a
1146
            possible Slurm scheduler state, where you wish for jobs with that
1147
            outcome to be resubmitted
1148
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
1149
        dry_run, int, If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1150
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1151
            for testing as though scripts are being submitted. Default is 0 (false).
1152

1153
    Returns:
1154
        tuple: A tuple containing:
1155

1156
        * proc_table: Table, a table with the same rows as the input except that Slurm and jobid relevant columns have
1157
          been updated for those jobs that needed to be resubmitted.
1158
        * submits: int, the number of submissions made to the queue. This is incremented from the input submits, so it is
1159
          the number of submissions made from this function call plus the input submits value.
1160

1161
    Note:
1162
        This modifies the inputs of both proc_table and submits and returns them.
1163
    """
1164
    log = get_logger()
×
1165
    row = proc_table[rown]
×
1166
    log.info(f"Identified row {row['INTID']} as needing resubmission.")
×
1167
    log.info(f"{row['INTID']}: Expid(s): {row['EXPID']}  Job: {row['JOBDESC']}")
×
1168
    if resubmission_states is None:
×
1169
        resubmission_states = get_resubmission_states()
×
1170
    ideps = proc_table['INT_DEP_IDS'][rown]
×
1171
    if ideps is None:
×
1172
        proc_table['LATEST_DEP_QID'][rown] = np.ndarray(shape=0).astype(int)
×
1173
    else:
1174
        all_valid_states = list(resubmission_states.copy())
×
1175
        all_valid_states.extend(['RUNNING','PENDING','SUBMITTED','COMPLETED'])
×
1176
        for idep in np.sort(np.atleast_1d(ideps)):
×
NEW
1177
            if idep not in id_to_row_map and idep // 1000 != row['INTID'] // 1000:
×
NEW
1178
                log.warning(f"Internal ID: {idep} not in id_to_row_map. "
×
1179
                            + "This is expected since it's from another day. "
1180
                            + f" This dependency will not be checked or "
1181
                            + "resubmitted")
NEW
1182
                continue
×
1183
            if proc_table['STATUS'][id_to_row_map[idep]] not in all_valid_states:
×
1184
                log.warning(f"Proc INTID: {proc_table['INTID'][rown]} depended on" +
×
1185
                            f" INTID {proc_table['INTID'][id_to_row_map[idep]]}" +
1186
                            f" but that exposure has state" +
1187
                            f" {proc_table['STATUS'][id_to_row_map[idep]]} that" +
1188
                            f" isn't in the list of resubmission states." +
1189
                            f" Exiting this job's resubmission attempt.")
1190
                proc_table['STATUS'][rown] = "DEP_NOT_SUBD"
×
1191
                return proc_table, submits
×
1192
        qdeps = []
×
1193
        for idep in np.sort(np.atleast_1d(ideps)):
×
NEW
1194
            if idep not in id_to_row_map and idep // 1000 != row['INTID'] // 1000:
×
NEW
1195
                log.warning(f"Internal ID: {idep} not in id_to_row_map. "
×
1196
                            + "This is expected since it's from another day. "
1197
                            + f" This dependency will not be checked or "
1198
                            + "resubmitted")
NEW
1199
                continue
×
1200
            if proc_table['STATUS'][id_to_row_map[idep]] in resubmission_states:
×
1201
                proc_table, submits = recursive_submit_failed(id_to_row_map[idep],
×
1202
                                                              proc_table, submits,
1203
                                                              id_to_row_map,
1204
                                                              reservation=reservation,
1205
                                                              dry_run=dry_run)
1206
            qdeps.append(proc_table['LATEST_QID'][id_to_row_map[idep]])
×
1207

1208
        qdeps = np.atleast_1d(qdeps)
×
1209
        if len(qdeps) > 0:
×
1210
            proc_table['LATEST_DEP_QID'][rown] = qdeps
×
1211
        else:
1212
            log.error(f"number of qdeps should be 1 or more: Rown {rown}, ideps {ideps}")
×
1213

1214
    proc_table[rown] = submit_batch_script(proc_table[rown], reservation=reservation,
×
1215
                                           strictly_successful=True, dry_run=dry_run)
1216
    submits += 1
×
1217

1218
    if not dry_run:
×
1219
        sleep_and_report(1, message_suffix=f"after submitting job to queue")
×
1220
        if submits % 10 == 0:
×
1221
            if ptab_name is None:
×
1222
                write_table(proc_table, tabletype='processing', overwrite=True)
×
1223
            else:
1224
                write_table(proc_table, tablename=ptab_name, overwrite=True)
×
1225
            sleep_and_report(2, message_suffix=f"after writing to disk")
×
1226
        if submits % 100 == 0:
×
1227
            proc_table = update_from_queue(proc_table)
×
1228
            if ptab_name is None:
×
1229
                write_table(proc_table, tabletype='processing', overwrite=True)
×
1230
            else:
1231
                write_table(proc_table, tablename=ptab_name, overwrite=True)
×
1232
            sleep_and_report(10, message_suffix=f"after updating queue and writing to disk")
×
1233
    return proc_table, submits
×
1234

1235

1236
#########################################
1237
########     Joint fit     ##############
1238
#########################################
1239
def joint_fit(ptable, prows, internal_id, queue, reservation, descriptor, z_submit_types=None,
1✔
1240
              dry_run=0, strictly_successful=False, check_for_outputs=True, resubmit_partial_complete=True,
1241
              system_name=None):
1242
    """
1243
    DEPRECATED
1244
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1245
    joint fitting job given by descriptor. If the joint fitting job is standard star fitting, the post standard star fits
1246
    for all the individual exposures also created and submitted. The returned ptable has all of these rows added to the
1247
    table given as input.
1248

1249
    Args:
1250
        ptable (Table): The processing table where each row is a processed job.
1251
        prows (list or array of dict): The rows corresponding to the individual exposure jobs that are
1252
            inputs to the joint fit.
1253
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1254
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1255
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1256
        descriptor (str): Description of the joint fitting job. Can either be 'science' or 'stdstarfit', 'arc' or 'psfnight',
1257
            or 'flat' or 'nightlyflat'.
1258
        z_submit_types (list of str, optional): The "group" types of redshifts that should be submitted with each
1259
            exposure. If not specified or None, then no redshifts are submitted.
1260
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1261
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1262
            for testing as though scripts are being submitted. Default is 0 (false).
1263
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1264
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1265
            than failing completely from failed calibrations. Default is False.
1266
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1267
            data products for the script being submitted. If all files exist and this is True,
1268
            then the script will not be submitted. If some files exist and this is True, only the
1269
            subset of the cameras without the final data products will be generated and submitted.
1270
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1271
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1272
            remaining cameras not found to exist.
1273
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1274

1275
    Returns:
1276
        tuple: A tuple containing:
1277

1278
        * ptable, Table. The same processing table as input except with added rows for the joint fit job and, in the case
1279
          of a stdstarfit, the poststdstar science exposure jobs.
1280
        * joint_prow, dict. Row of a processing table corresponding to the joint fit job.
1281
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1282
    """
1283
    log = get_logger()
×
1284
    if len(prows) < 1:
×
1285
        return ptable, None, internal_id
×
1286

1287
    if descriptor is None:
×
1288
        return ptable, None
×
1289
    elif descriptor == 'arc':
×
1290
        descriptor = 'psfnight'
×
1291
    elif descriptor == 'flat':
×
1292
        descriptor = 'nightlyflat'
×
1293
    elif descriptor == 'science':
×
1294
        if z_submit_types is None or len(z_submit_types) == 0:
×
1295
            descriptor = 'stdstarfit'
×
1296

1297
    if descriptor not in ['psfnight', 'nightlyflat', 'science','stdstarfit']:
×
1298
        return ptable, None, internal_id
×
1299

1300
    log.info(" ")
×
1301
    log.info(f"Joint fit criteria found. Running {descriptor}.\n")
×
1302

1303
    if descriptor == 'science':
×
NEW
1304
        joint_prow, internal_id = make_joint_prow(prows, descriptor='stdstarfit', internal_id=internal_id)
×
1305
    else:
NEW
1306
        joint_prow, internal_id = make_joint_prow(prows, descriptor=descriptor, internal_id=internal_id)
×
1307
    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1308
                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1309
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1310
    ptable.add_row(joint_prow)
×
1311

1312
    if descriptor in ['science','stdstarfit']:
×
1313
        if descriptor == 'science':
×
1314
            zprows = []
×
1315
        log.info(" ")
×
1316
        log.info(f"Submitting individual science exposures now that joint fitting of standard stars is submitted.\n")
×
1317
        for row in prows:
×
1318
            if row['LASTSTEP'] == 'stdstarfit':
×
1319
                continue
×
1320
            row['JOBDESC'] = 'poststdstar'
×
1321

1322
            # poststdstar job can't process cameras not included in its stdstar joint fit
1323
            stdcamword = joint_prow['PROCCAMWORD']
×
1324
            thiscamword = row['PROCCAMWORD']
×
1325
            proccamword = camword_intersection([stdcamword, thiscamword])
×
1326
            if proccamword != thiscamword:
×
1327
                dropcams = difference_camwords(thiscamword, proccamword)
×
1328
                assert dropcams != ''  #- i.e. if they differ, we should be dropping something
×
1329
                log.warning(f"Dropping exp {row['EXPID']} poststdstar cameras {dropcams} since they weren't included in stdstar fit {stdcamword}")
×
1330
                row['PROCCAMWORD'] = proccamword
×
1331

1332
            row['INTID'] = internal_id
×
1333
            internal_id += 1
×
1334
            row['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
×
1335
            row = assign_dependency(row, joint_prow)
×
1336
            row = create_and_submit(row, queue=queue, reservation=reservation, dry_run=dry_run,
×
1337
                                    strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1338
                                    resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1339
            ptable.add_row(row)
×
1340
            if descriptor == 'science' and row['LASTSTEP'] == 'all':
×
1341
                zprows.append(row)
×
1342

1343
    ## Now run redshifts
1344
    if descriptor == 'science' and len(zprows) > 0 and z_submit_types is not None:
×
1345
        prow_selection = (  (ptable['OBSTYPE'] == 'science')
×
1346
                          & (ptable['LASTSTEP'] == 'all')
1347
                          & (ptable['JOBDESC'] == 'poststdstar')
1348
                          & (ptable['TILEID'] == int(zprows[0]['TILEID'])) )
1349
        nightly_zprows = []
×
1350
        if np.sum(prow_selection) == len(zprows):
×
1351
            nightly_zprows = zprows.copy()
×
1352
        else:
1353
            for prow in ptable[prow_selection]:
×
1354
                nightly_zprows.append(table_row_to_dict(prow))
×
1355

1356
        for zsubtype in z_submit_types:
×
1357
            if zsubtype == 'perexp':
×
1358
                for zprow in zprows:
×
1359
                    log.info(" ")
×
1360
                    log.info(f"Submitting redshift fit of type {zsubtype} for TILEID {zprow['TILEID']} and EXPID {zprow['EXPID']}.\n")
×
NEW
1361
                    joint_prow, internal_id = make_joint_prow([zprow], descriptor=zsubtype, internal_id=internal_id)
×
1362
                    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1363
                                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1364
                                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1365
                    ptable.add_row(joint_prow)
×
1366
            else:
1367
                log.info(" ")
×
1368
                log.info(f"Submitting joint redshift fits of type {zsubtype} for TILEID {nightly_zprows[0]['TILEID']}.")
×
1369
                expids = [prow['EXPID'][0] for prow in nightly_zprows]
×
1370
                log.info(f"Expids: {expids}.\n")
×
NEW
1371
                joint_prow, internal_id = make_joint_prow(nightly_zprows, descriptor=zsubtype, internal_id=internal_id)
×
1372
                joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1373
                                               strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1374
                                               resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1375
                ptable.add_row(joint_prow)
×
1376

1377
    if descriptor in ['psfnight', 'nightlyflat']:
×
1378
        log.info(f"Setting the calibration exposures as calibrators in the processing table.\n")
×
1379
        ptable = set_calibrator_flag(prows, ptable)
×
1380

1381
    return ptable, joint_prow, internal_id
×
1382

1383
def joint_cal_fit(descriptor, ptable, prows, internal_id, queue, reservation,
1✔
1384
                  dry_run=0, strictly_successful=False, check_for_outputs=True,
1385
                  resubmit_partial_complete=True, system_name=None):
1386
    """
1387
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1388
    joint fitting job given by descriptor. If the joint fitting job is standard star fitting, the post standard star fits
1389
    for all the individual exposures also created and submitted. The returned ptable has all of these rows added to the
1390
    table given as input.
1391

1392
    Args:
1393
        descriptor (str): Description of the joint fitting job. Can either be 'science' or 'stdstarfit', 'arc' or 'psfnight',
1394
            or 'flat' or 'nightlyflat'.
1395
        prows (list or array of dict): The rows corresponding to the individual exposure jobs that are
1396
            inputs to the joint fit.
1397
        ptable (Table): The processing table where each row is a processed job.
1398
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1399
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1400
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1401
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1402
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1403
            for testing as though scripts are being submitted. Default is 0 (false).
1404
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1405
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1406
            than failing completely from failed calibrations. Default is False.
1407
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1408
            data products for the script being submitted. If all files exist and this is True,
1409
            then the script will not be submitted. If some files exist and this is True, only the
1410
            subset of the cameras without the final data products will be generated and submitted.
1411
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1412
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1413
            remaining cameras not found to exist.
1414
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1415

1416
    Returns:
1417
        tuple: A tuple containing:
1418

1419
        * ptable, Table. The same processing table as input except with added rows for the joint fit job and, in the case
1420
          of a stdstarfit, the poststdstar science exposure jobs.
1421
        * joint_prow, dict. Row of a processing table corresponding to the joint fit job.
1422
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1423
    """
NEW
1424
    log = get_logger()
×
NEW
1425
    if len(prows) < 1:
×
NEW
1426
        return ptable, None, internal_id
×
1427

NEW
1428
    if descriptor is None:
×
NEW
1429
        return ptable, None
×
NEW
1430
    elif descriptor == 'arc':
×
NEW
1431
        descriptor = 'psfnight'
×
NEW
1432
    elif descriptor == 'flat':
×
NEW
1433
        descriptor = 'nightlyflat'
×
1434

NEW
1435
    if descriptor not in ['psfnight', 'nightlyflat']:
×
NEW
1436
        return ptable, None, internal_id
×
1437

NEW
1438
    log.info(" ")
×
NEW
1439
    log.info(f"Joint fit criteria found. Running {descriptor}.\n")
×
1440

NEW
1441
    joint_prow, internal_id = make_joint_prow(prows, descriptor=descriptor, internal_id=internal_id)
×
NEW
1442
    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1443
                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1444
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
NEW
1445
    ptable.add_row(joint_prow)
×
1446

NEW
1447
    if descriptor in ['psfnight', 'nightlyflat']:
×
NEW
1448
        log.info(f"Setting the calibration exposures as calibrators in the processing table.\n")
×
NEW
1449
        ptable = set_calibrator_flag(prows, ptable)
×
1450

NEW
1451
    return ptable, joint_prow, internal_id
×
1452

1453

1454
#########################################
1455
########     Redshifts     ##############
1456
#########################################
1457
def submit_redshifts(ptable, prows, tnight, internal_id, queue, reservation,
1✔
1458
              dry_run=0, strictly_successful=False,
1459
              check_for_outputs=True, resubmit_partial_complete=True,
1460
              z_submit_types=None, system_name=None):
1461
    """
1462
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1463
    tilenight job given by descriptor. The returned ptable has all of these rows added to the
1464
    table given as input.
1465

1466
    Args:
1467
        ptable (Table): The processing table where each row is a processed job.
1468
        prows (list or array of dict): Unsubmitted prestdstar jobs that are first steps in tilenight.
1469
        tnight (Table.Row): The processing table row of the tilenight job on which the redshifts depend.
1470
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1471
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1472
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1473
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1474
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1475
            for testing as though scripts are being submitted. Default is 0 (false).
1476
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1477
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1478
            than failing completely from failed calibrations. Default is False.
1479
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1480
            data products for the script being submitted. If all files exist and this is True,
1481
            then the script will not be submitted. If some files exist and this is True, only the
1482
            subset of the cameras without the final data products will be generated and submitted.
1483
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1484
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1485
            remaining cameras not found to exist.
1486
        z_submit_types (list of str): The "group" types of redshifts that should be submitted with each
1487
            exposure. If not specified or None, then no redshifts are submitted.
1488
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1489

1490
    Returns:
1491
        tuple: A tuple containing:
1492

1493
        * ptable, Table. The same processing table as input except with added rows for the joint fit job.
1494
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1495
    """
1496
    log = get_logger()
1✔
1497
    if len(prows) < 1 or z_submit_types == None:
1✔
1498
        return ptable, internal_id
1✔
1499

1500
    log.info(" ")
1✔
1501
    log.info(f"Running redshifts.\n")
1✔
1502

1503
    ## Now run redshifts
1504
    zprows = []
1✔
1505
    for row in prows:
1✔
1506
        if row['LASTSTEP'] == 'all':
1✔
1507
            zprows.append(row)
1✔
1508

1509
    if len(zprows) > 0:
1✔
1510
        for zsubtype in z_submit_types:
1✔
1511
            if zsubtype == 'perexp':
1✔
1512
                for zprow in zprows:
×
1513
                    log.info(" ")
×
1514
                    log.info(f"Submitting redshift fit of type {zsubtype} for TILEID {zprow['TILEID']} and EXPID {zprow['EXPID']}.\n")
×
1515
                    redshift_prow = make_redshift_prow([zprow], tnight, descriptor=zsubtype, internal_id=internal_id)
×
1516
                    internal_id += 1
×
1517
                    redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1518
                                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1519
                                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1520
                    ptable.add_row(redshift_prow)
×
1521
            else:
1522
                log.info(" ")
1✔
1523
                log.info(f"Submitting joint redshift fits of type {zsubtype} for TILEID {zprows[0]['TILEID']}.")
1✔
1524
                expids = [prow['EXPID'][0] for prow in zprows]
1✔
1525
                log.info(f"Expids: {expids}.\n")
1✔
1526
                redshift_prow = make_redshift_prow(zprows, tnight, descriptor=zsubtype, internal_id=internal_id)
1✔
1527
                internal_id += 1
1✔
1528
                redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
1✔
1529
                                               strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1530
                                               resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1531
                ptable.add_row(redshift_prow)
1✔
1532

1533
    return ptable, internal_id
1✔
1534

1535
#########################################
1536
########     Tilenight     ##############
1537
#########################################
1538
def submit_tilenight(ptable, prows, calibjobs, internal_id, queue, reservation,
1✔
1539
              dry_run=0, strictly_successful=False, resubmit_partial_complete=True,
1540
              system_name=None, use_specter=False, extra_job_args=None):
1541
    """
1542
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1543
    tilenight job given by descriptor. The returned ptable has all of these rows added to the
1544
    table given as input.
1545

1546
    Args:
1547
        ptable (Table): The processing table where each row is a processed job.
1548
        prows (list or array of dict): Unsubmitted prestdstar jobs that are first steps in tilenight.
1549
        calibjobs (dict): Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
1550
            and 'nightlyflat'. Each key corresponds to a Table.Row or
1551
            None. The table.Row() values are for the corresponding
1552
            calibration job.
1553
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1554
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1555
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1556
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1557
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1558
            for testing as though scripts are being submitted. Default is 0 (false).
1559
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1560
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1561
            than failing completely from failed calibrations. Default is False.
1562
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1563
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1564
            remaining cameras not found to exist.
1565
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1566
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
1567
        extra_job_args (dict): Dictionary with key-value pairs that specify additional
1568
            information used for a specific type of job. Examples include
1569
            laststeps for for tilenight, etc.
1570

1571
    Returns:
1572
        tuple: A tuple containing:
1573

1574
        * ptable, Table. The same processing table as input except with added rows for the joint fit job.
1575
        * tnight_prow, dict. Row of a processing table corresponding to the tilenight job.
1576
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1577
    """
1578
    log = get_logger()
1✔
1579
    if len(prows) < 1:
1✔
1580
        return ptable, None, internal_id
×
1581

1582
    log.info(" ")
1✔
1583
    log.info(f"Running tilenight.\n")
1✔
1584

1585
    tnight_prow = make_tnight_prow(prows, calibjobs, internal_id=internal_id)
1✔
1586
    internal_id += 1
1✔
1587
    tnight_prow = create_and_submit(tnight_prow, queue=queue, reservation=reservation, dry_run=dry_run,
1✔
1588
                                   strictly_successful=strictly_successful, check_for_outputs=False,
1589
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name,
1590
                                   use_specter=use_specter, extra_job_args=extra_job_args)
1591
    ptable.add_row(tnight_prow)
1✔
1592

1593
    return ptable, tnight_prow, internal_id
1✔
1594

1595
## wrapper functions for joint fitting
1596
def science_joint_fit(ptable, sciences, internal_id, queue='realtime', reservation=None,
1✔
1597
                      z_submit_types=None, dry_run=0, strictly_successful=False,
1598
                      check_for_outputs=True, resubmit_partial_complete=True,
1599
                      system_name=None):
1600
    """
1601
    Wrapper function for desiproc.workflow.processing.joint_fit specific to the stdstarfit joint fit and redshift fitting.
1602

1603
    All variables are the same except::
1604

1605
        Arg 'sciences' is mapped to the prows argument of joint_fit.
1606
        The joint_fit argument descriptor is pre-defined as 'science'.
1607
    """
1608
    return joint_fit(ptable=ptable, prows=sciences, internal_id=internal_id, queue=queue, reservation=reservation,
×
1609
                     descriptor='science', z_submit_types=z_submit_types, dry_run=dry_run,
1610
                     strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1611
                     resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1612

1613

1614
def flat_joint_fit(ptable, flats, internal_id, queue='realtime',
1✔
1615
                   reservation=None, dry_run=0, strictly_successful=False,
1616
                   check_for_outputs=True, resubmit_partial_complete=True,
1617
                   system_name=None):
1618
    """
1619
    Wrapper function for desiproc.workflow.processing.joint_fit specific to the nightlyflat joint fit.
1620

1621
    All variables are the same except::
1622

1623
        Arg 'flats' is mapped to the prows argument of joint_fit.
1624
        The joint_fit argument descriptor is pre-defined as 'nightlyflat'.
1625
    """
1626
    return joint_fit(ptable=ptable, prows=flats, internal_id=internal_id, queue=queue, reservation=reservation,
×
1627
                     descriptor='nightlyflat', dry_run=dry_run, strictly_successful=strictly_successful,
1628
                     check_for_outputs=check_for_outputs, resubmit_partial_complete=resubmit_partial_complete,
1629
                     system_name=system_name)
1630

1631

1632
def arc_joint_fit(ptable, arcs, internal_id, queue='realtime',
1✔
1633
                  reservation=None, dry_run=0, strictly_successful=False,
1634
                  check_for_outputs=True, resubmit_partial_complete=True,
1635
                  system_name=None):
1636
    """
1637
    Wrapper function for desiproc.workflow.processing.joint_fit specific to the psfnight joint fit.
1638

1639
    All variables are the same except::
1640

1641
        Arg 'arcs' is mapped to the prows argument of joint_fit.
1642
        The joint_fit argument descriptor is pre-defined as 'psfnight'.
1643
    """
1644
    return joint_fit(ptable=ptable, prows=arcs, internal_id=internal_id, queue=queue, reservation=reservation,
×
1645
                     descriptor='psfnight', dry_run=dry_run, strictly_successful=strictly_successful,
1646
                     check_for_outputs=check_for_outputs, resubmit_partial_complete=resubmit_partial_complete,
1647
                     system_name=system_name)
1648

1649
def make_joint_prow(prows, descriptor, internal_id):
1✔
1650
    """
1651
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
1652
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
1653
    input prows).
1654

1655
    Args:
1656
        prows, list or array of dicts. The rows corresponding to the individual exposure jobs that are
1657
            inputs to the joint fit.
1658
        descriptor, str. Description of the joint fitting job. Can either be 'stdstarfit', 'psfnight', or 'nightlyflat'.
1659
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1660

1661
    Returns:
1662
        dict: Row of a processing table corresponding to the joint fit job.
1663
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1664
    """
1665
    first_row = table_row_to_dict(prows[0])
1✔
1666
    joint_prow = first_row.copy()
1✔
1667

1668
    joint_prow['INTID'] = internal_id
1✔
1669
    internal_id += 1
1✔
1670
    joint_prow['JOBDESC'] = descriptor
1✔
1671
    joint_prow['LATEST_QID'] = -99
1✔
1672
    joint_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
1✔
1673
    joint_prow['SUBMIT_DATE'] = -99
1✔
1674
    joint_prow['STATUS'] = 'U'
1✔
1675
    joint_prow['SCRIPTNAME'] = ''
1✔
1676
    joint_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int)
1✔
1677

1678
    ## Assign the PROCCAMWORD based on the descriptor and the input exposures
1679
    if descriptor == 'stdstarfit':
1✔
1680
        pcamwords = [prow['PROCCAMWORD'] for prow in prows]
×
1681
        joint_prow['PROCCAMWORD'] = camword_union(pcamwords,
×
1682
                                                  full_spectros_only=True)
1683
    else:
1684
        ## For arcs and flats, a BADAMP takes out the camera, so remove those
1685
        ## cameras from the proccamword
1686
        pcamwords = []
1✔
1687
        for prow in prows:
1✔
1688
            if len(prow['BADAMPS']) > 0:
1✔
1689
                badcams = []
×
1690
                for (camera, petal, amplifier) in parse_badamps(prow['BADAMPS']):
×
1691
                    badcams.append(f'{camera}{petal}')
×
1692
                badampcamword = create_camword(list(set(badcams)))
×
1693
                pcamword = difference_camwords(prow['PROCCAMWORD'], badampcamword)
×
1694
            else:
1695
                pcamword = prow['PROCCAMWORD']
1✔
1696
            pcamwords.append(pcamword)
1✔
1697

1698
        ## For flats we want any camera that exists in all 12 exposures
1699
        ## For arcs we want any camera that exists in at least 3 exposures
1700
        if descriptor == 'nightlyflat':
1✔
1701
            joint_prow['PROCCAMWORD'] = camword_intersection(pcamwords,
1✔
1702
                                                             full_spectros_only=False)
1703
        elif descriptor == 'psfnight':
1✔
1704
            ## Count number of exposures each camera is present for
1705
            camcheck = {}
1✔
1706
            for camword in pcamwords:
1✔
1707
                for cam in decode_camword(camword):
1✔
1708
                    if cam in camcheck:
1✔
1709
                        camcheck[cam] += 1
1✔
1710
                    else:
1711
                        camcheck[cam] = 1
1✔
1712
            ## if exists in 3 or more exposures, then include it
1713
            goodcams = []
1✔
1714
            for cam,camcount in camcheck.items():
1✔
1715
                if camcount >= 3:
1✔
1716
                    goodcams.append(cam)
1✔
1717
            joint_prow['PROCCAMWORD'] = create_camword(goodcams)
1✔
1718

1719
    joint_prow = assign_dependency(joint_prow, dependency=prows)
1✔
1720
    return joint_prow, internal_id
1✔
1721

1722
def make_exposure_prow(erow, int_id, calibjobs, jobdesc=None):
1✔
1723
    prow = erow_to_prow(erow)
1✔
1724
    prow['INTID'] = int_id
1✔
1725
    int_id += 1
1✔
1726
    if jobdesc is None:
1✔
1727
        prow['JOBDESC'] = prow['OBSTYPE']
1✔
1728
    else:
1729
        prow['JOBDESC'] = jobdesc
1✔
1730
    prow = define_and_assign_dependency(prow, calibjobs)
1✔
1731
    return prow, int_id
1✔
1732

1733
def make_tnight_prow(prows, calibjobs, internal_id):
1✔
1734
    """
1735
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
1736
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
1737
    input prows).
1738

1739
    Args:
1740
        prows, list or array of dicts. Unsumbitted rows corresponding to the individual prestdstar jobs that are
1741
            the first steps of tilenight.
1742
        calibjobs, dict. Dictionary containing keys that each corresponds to a Table.Row or
1743
            None, with each table.Row() value corresponding to a calibration job
1744
            on which the tilenight job depends.
1745
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1746

1747
    Returns:
1748
        dict: Row of a processing table corresponding to the tilenight job.
1749
    """
1750
    first_row = table_row_to_dict(prows[0])
1✔
1751
    joint_prow = first_row.copy()
1✔
1752

1753
    joint_prow['INTID'] = internal_id
1✔
1754
    joint_prow['JOBDESC'] = 'tilenight'
1✔
1755
    joint_prow['LATEST_QID'] = -99
1✔
1756
    joint_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
1✔
1757
    joint_prow['SUBMIT_DATE'] = -99
1✔
1758
    joint_prow['STATUS'] = 'U'
1✔
1759
    joint_prow['SCRIPTNAME'] = ''
1✔
1760
    joint_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int)
1✔
1761

1762
    joint_prow = define_and_assign_dependency(joint_prow,calibjobs,use_tilenight=True)
1✔
1763

1764
    return joint_prow
1✔
1765

1766
def make_redshift_prow(prows, tnight, descriptor, internal_id):
1✔
1767
    """
1768
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
1769
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
1770
    input prows).
1771

1772
    Args:
1773
        prows, list or array of dicts. Unsumbitted rows corresponding to the individual prestdstar jobs that are
1774
            the first steps of tilenight.
1775
        tnight, Table.Row object. Row corresponding to the tilenight job on which the redshift job depends.
1776
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1777

1778
    Returns:
1779
        dict: Row of a processing table corresponding to the tilenight job.
1780
    """
1781
    first_row = table_row_to_dict(prows[0])
1✔
1782
    redshift_prow = first_row.copy()
1✔
1783

1784
    redshift_prow['INTID'] = internal_id
1✔
1785
    redshift_prow['JOBDESC'] = descriptor
1✔
1786
    redshift_prow['LATEST_QID'] = -99
1✔
1787
    redshift_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
1✔
1788
    redshift_prow['SUBMIT_DATE'] = -99
1✔
1789
    redshift_prow['STATUS'] = 'U'
1✔
1790
    redshift_prow['SCRIPTNAME'] = ''
1✔
1791
    redshift_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int)
1✔
1792

1793
    redshift_prow = assign_dependency(redshift_prow,dependency=tnight)
1✔
1794

1795
    return redshift_prow
1✔
1796

1797
def checkfor_and_submit_joint_job(ptable, arcs, flats, sciences, calibjobs,
1✔
1798
                                  lasttype, internal_id, z_submit_types=None, dry_run=0,
1799
                                  queue='realtime', reservation=None, strictly_successful=False,
1800
                                  check_for_outputs=True, resubmit_partial_complete=True,
1801
                                  system_name=None):
1802
    """
1803
    Takes all the state-ful data from daily processing and determines whether a joint fit needs to be submitted. Places
1804
    the decision criteria into a single function for easier maintainability over time. These are separate from the
1805
    new standard manifest*.json method of indicating a calibration sequence is complete. That is checked independently
1806
    elsewhere and doesn't interact with this.
1807

1808
    Args:
1809
        ptable (Table): Processing table of all exposures that have been processed.
1810
        arcs (list of dict): list of the individual arc jobs to be used for the psfnight (NOT all
1811
            the arcs, if multiple sets existed). May be empty if none identified yet.
1812
        flats (list of dict): list of the individual flat jobs to be used for the nightlyflat (NOT
1813
            all the flats, if multiple sets existed). May be empty if none identified yet.
1814
        sciences (list of dict): list of the most recent individual prestdstar science exposures
1815
            (if currently processing that tile). May be empty if none identified yet.
1816
        calibjobs (dict): Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
1817
            and 'nightlyflat'. Each key corresponds to a Table.Row or
1818
            None. The table.Row() values are for the corresponding
1819
            calibration job.
1820
        lasttype (str or None): the obstype of the last individual exposure row to be processed.
1821
        internal_id (int): an internal identifier unique to each job. Increments with each new job. This
1822
            is the smallest unassigned value.
1823
        z_submit_types (list of str): The "group" types of redshifts that should be submitted with each
1824
            exposure. If not specified or None, then no redshifts are submitted.
1825
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1826
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1827
            for testing as though scripts are being submitted. Default is 0 (false).
1828
        queue (str, optional): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1829
        reservation (str, optional): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1830
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1831
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1832
            than failing completely from failed calibrations. Default is False.
1833
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1834
            data products for the script being submitted. If all files exist and this is True,
1835
            then the script will not be submitted. If some files exist and this is True, only the
1836
            subset of the cameras without the final data products will be generated and submitted.
1837
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1838
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1839
            remaining cameras not found to exist.
1840
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, permutter-gpu
1841

1842
    Returns:
1843
        tuple: A tuple containing:
1844

1845
        * ptable, Table, Processing table of all exposures that have been processed.
1846
        * calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
1847
          and 'nightlyflat'. Each key corresponds to a Table.Row or
1848
          None. The table.Row() values are for the corresponding
1849
          calibration job.
1850
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
1851
          (if currently processing that tile). May be empty if none identified yet or
1852
          we just submitted them for processing.
1853
        * internal_id, int, if no job is submitted, this is the same as the input, otherwise it is incremented upward from
1854
          from the input such that it represents the smallest unused ID.
1855
    """
1856
    if lasttype == 'science' and len(sciences) > 0:
×
1857
        log = get_logger()
×
1858
        skysubonly = np.array([sci['LASTSTEP'] == 'skysub' for sci in sciences])
×
1859
        if np.all(skysubonly):
×
1860
            log.error("Identified all exposures in joint fitting request as skysub-only. Not submitting")
×
1861
            sciences = []
×
1862
            return ptable, calibjobs, sciences, internal_id
×
1863

1864
        if np.any(skysubonly):
×
1865
            log.error("Identified skysub-only exposures in joint fitting request")
×
1866
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
1867
            log.info("LASTSTEP's: {}".format([row['LASTSTEP'] for row in sciences]))
×
1868
            sciences = (np.array(sciences,dtype=object)[~skysubonly]).tolist()
×
1869
            log.info("Removed skysub only exposures in joint fitting:")
×
1870
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
1871
            log.info("LASTSTEP's: {}".format([row['LASTSTEP'] for row in sciences]))
×
1872

1873
        from collections import Counter
×
1874
        tiles = np.array([sci['TILEID'] for sci in sciences])
×
1875
        counts = Counter(tiles)
×
1876
        if len(counts.most_common()) > 1:
×
1877
            log.error("Identified more than one tile in a joint fitting request")
×
1878
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
1879
            log.info("Tileid's: {}".format(tiles))
×
1880
            log.info("Returning without joint fitting any of these exposures.")
×
1881
            # most_common, nmost_common = counts.most_common()[0]
1882
            # if most_common == -99:
1883
            #     most_common, nmost_common = counts.most_common()[1]
1884
            # log.warning(f"Given multiple tiles to jointly fit: {counts}. "+
1885
            #             "Only processing the most common non-default " +
1886
            #             f"tile: {most_common} with {nmost_common} exposures")
1887
            # sciences = (np.array(sciences,dtype=object)[tiles == most_common]).tolist()
1888
            # log.info("Tiles and exposure id's being submitted for joint fitting:")
1889
            # log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
1890
            # log.info("Tileid's: {}".format([row['TILEID'] for row in sciences]))
1891
            sciences = []
×
1892
            return ptable, calibjobs, sciences, internal_id
×
1893

1894
        ptable, tilejob, internal_id = science_joint_fit(ptable, sciences, internal_id, z_submit_types=z_submit_types,
×
1895
                                                         dry_run=dry_run, queue=queue, reservation=reservation,
1896
                                                         strictly_successful=strictly_successful,
1897
                                                         check_for_outputs=check_for_outputs,
1898
                                                         resubmit_partial_complete=resubmit_partial_complete,
1899
                                                         system_name=system_name)
1900
        if tilejob is not None:
×
1901
            sciences = []
×
1902

1903
    elif lasttype == 'flat' and calibjobs['nightlyflat'] is None and len(flats) == 12:
×
1904
        ## Note here we have an assumption about the number of expected flats being greater than 11
1905
        ptable, calibjobs['nightlyflat'], internal_id \
×
1906
            = flat_joint_fit(ptable, flats, internal_id, dry_run=dry_run, queue=queue,
1907
                             reservation=reservation, strictly_successful=strictly_successful,
1908
                             check_for_outputs=check_for_outputs,
1909
                             resubmit_partial_complete=resubmit_partial_complete,
1910
                             system_name=system_name
1911
                            )
1912

1913
    elif lasttype == 'arc' and calibjobs['psfnight'] is None and len(arcs) == 5:
×
1914
        ## Note here we have an assumption about the number of expected arcs being greater than 4
1915
        ptable, calibjobs['psfnight'], internal_id \
×
1916
            = arc_joint_fit(ptable, arcs, internal_id, dry_run=dry_run, queue=queue,
1917
                            reservation=reservation, strictly_successful=strictly_successful,
1918
                            check_for_outputs=check_for_outputs,
1919
                            resubmit_partial_complete=resubmit_partial_complete,
1920
                            system_name=system_name
1921
                            )
1922
    return ptable, calibjobs, sciences, internal_id
×
1923

1924
def submit_tilenight_and_redshifts(ptable, sciences, calibjobs, internal_id, dry_run=0,
1✔
1925
                                  queue='realtime', reservation=None, strictly_successful=False,
1926
                                  check_for_outputs=True, resubmit_partial_complete=True,
1927
                                  system_name=None,use_specter=False, extra_job_args=None):
1928
    """
1929
    Takes all the state-ful data from daily processing and determines whether a tilenight job needs to be submitted.
1930

1931
    Args:
1932
        ptable (Table): Processing table of all exposures that have been processed.
1933
        sciences (list of dict): list of the most recent individual prestdstar science exposures
1934
            (if currently processing that tile). May be empty if none identified yet.
1935
        internal_id (int): an internal identifier unique to each job. Increments with each new job. This
1936
            is the smallest unassigned value.
1937
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1938
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1939
            for testing as though scripts are being submitted. Default is 0 (false).
1940
        queue (str, optional): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1941
        reservation (str, optional): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1942
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1943
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1944
            than failing completely from failed calibrations. Default is False.
1945
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1946
            data products for the script being submitted. If all files exist and this is True,
1947
            then the script will not be submitted. If some files exist and this is True, only the
1948
            subset of the cameras without the final data products will be generated and submitted.
1949
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1950
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1951
            remaining cameras not found to exist.
1952
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, permutter-gpu
1953
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
1954
        extra_job_args (dict, optional): Dictionary with key-value pairs that specify additional
1955
            information used for a specific type of job. Examples include
1956
            laststeps for tilenight, z_submit_types for redshifts, etc.
1957

1958
    Returns:
1959
        tuple: A tuple containing:
1960

1961
        * ptable, Table, Processing table of all exposures that have been processed.
1962
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
1963
          (if currently processing that tile). May be empty if none identified yet or
1964
          we just submitted them for processing.
1965
        * internal_id, int, if no job is submitted, this is the same as the input, otherwise it is incremented upward from
1966
          from the input such that it represents the smallest unused ID.
1967
    """
1968
    ptable, tnight, internal_id = submit_tilenight(ptable, sciences, calibjobs, internal_id,
1✔
1969
                                             queue=queue, reservation=reservation,
1970
                                             dry_run=dry_run, strictly_successful=strictly_successful,
1971
                                             resubmit_partial_complete=resubmit_partial_complete,
1972
                                             system_name=system_name,use_specter=use_specter,
1973
                                             extra_job_args=extra_job_args)
1974

1975
    z_submit_types = None
1✔
1976
    if 'z_submit_types'  in extra_job_args:
1✔
1977
        z_submit_types = extra_job_args['z_submit_types']
1✔
1978
        
1979
    ptable, internal_id = submit_redshifts(ptable, sciences, tnight, internal_id,
1✔
1980
                                    queue=queue, reservation=reservation,
1981
                                    dry_run=dry_run, strictly_successful=strictly_successful,
1982
                                    check_for_outputs=check_for_outputs,
1983
                                    resubmit_partial_complete=resubmit_partial_complete,
1984
                                    z_submit_types=z_submit_types,
1985
                                    system_name=system_name)
1986

1987
    if tnight is not None:
1✔
1988
        sciences = []
1✔
1989

1990
    return ptable, sciences, internal_id
1✔
1991

1992
def set_calibrator_flag(prows, ptable):
1✔
1993
    """
1994
    Sets the "CALIBRATOR" column of a procesing table row to 1 (integer representation of True)
1995
     for all input rows. Used within joint fitting code to flag the exposures that were input
1996
     to the psfnight or nightlyflat for later reference.
1997

1998
    Args:
1999
        prows, list or array of Table.Rows or dicts. The rows corresponding to the individual exposure jobs that are
2000
            inputs to the joint fit.
2001
        ptable, Table. The processing table where each row is a processed job.
2002

2003
    Returns:
2004
        Table: The same processing table as input except with added rows for the joint fit job and, in the case
2005
        of a stdstarfit, the poststdstar science exposure jobs.
2006
    """
2007
    for prow in prows:
1✔
2008
        ptable['CALIBRATOR'][ptable['INTID'] == prow['INTID']] = 1
1✔
2009
    return ptable
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc