• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

desihub / desispec / 8440689206

26 Mar 2024 05:57PM UTC coverage: 28.101% (+3.1%) from 25.01%
8440689206

push

github

web-flow
Merge pull request #2187 from desihub/pipelinerefactor

Introduce desi_proc_night to unify and simplify processing scripts

769 of 1188 new or added lines in 20 files covered. (64.73%)

10 existing lines in 6 files now uncovered.

13231 of 47084 relevant lines covered (28.1%)

0.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.95
/py/desispec/workflow/processing.py
1
"""
2
desispec.workflow.processing
3
============================
4

5
"""
6
import sys, os, glob
1✔
7
import json
1✔
8
from astropy.io import fits
1✔
9
from astropy.table import Table, join
1✔
10
import numpy as np
1✔
11

12
import time, datetime
1✔
13
from collections import OrderedDict
1✔
14
import subprocess
1✔
15

16
from desispec.scripts.link_calibnight import derive_include_exclude
1✔
17
from desispec.scripts.tile_redshifts import generate_tile_redshift_scripts
1✔
18
from desispec.workflow.redshifts import get_ztile_script_pathname, \
1✔
19
                                        get_ztile_relpath, \
20
                                        get_ztile_script_suffix
21
from desispec.workflow.queue import get_resubmission_states, update_from_queue, queue_info_from_qids
1✔
22
from desispec.workflow.timing import what_night_is_it
1✔
23
from desispec.workflow.desi_proc_funcs import get_desi_proc_batch_file_pathname, \
1✔
24
    create_desi_proc_batch_script, \
25
    get_desi_proc_batch_file_path, \
26
    get_desi_proc_tilenight_batch_file_pathname, \
27
    create_desi_proc_tilenight_batch_script, create_linkcal_batch_script
28
from desispec.workflow.utils import pathjoin, sleep_and_report, \
1✔
29
    load_override_file
30
from desispec.workflow.tableio import write_table, load_table
1✔
31
from desispec.workflow.proctable import table_row_to_dict, erow_to_prow
1✔
32
from desiutil.log import get_logger
1✔
33

34
from desispec.io import findfile, specprod_root
1✔
35
from desispec.io.util import decode_camword, create_camword, \
1✔
36
    difference_camwords, \
37
    camword_to_spectros, camword_union, camword_intersection, parse_badamps
38

39

40
#################################################
41
############## Misc Functions ###################
42
#################################################
43
def night_to_starting_iid(night=None):
1✔
44
    """
45
    Creates an internal ID for a given night. The resulting integer is an 8 digit number.
46
    The digits are YYMMDDxxx where YY is the years since 2000, MM and DD are the month and day. xxx are 000,
47
    and are incremented for up to 1000 unique job ID's for a given night.
48

49
    Args:
50
        night (str or int): YYYYMMDD of the night to get the starting internal ID for.
51

52
    Returns:
53
        int: 9 digit number consisting of YYMMDD000. YY is years after 2000, MMDD is month and day.
54
        000 being the starting job number (0).
55
    """
56
    if night is None:
1✔
57
        night = what_night_is_it()
×
58
    night = int(night)
1✔
59
    internal_id = (night - 20000000) * 1000
1✔
60
    return internal_id
1✔
61

62
class ProcessingParams():
1✔
63
    def __init__(self, dry_run_level=0, queue='realtime',
1✔
64
                 reservation=None, strictly_successful=True,
65
                 check_for_outputs=True,
66
                 resubmit_partial_complete=True,
67
                 system_name='perlmutter', use_specter=True):
68

NEW
69
        self.dry_run_level = dry_run_level
×
NEW
70
        self.system_name = system_name
×
NEW
71
        self.queue = queue
×
NEW
72
        self.reservation = reservation
×
NEW
73
        self.strictly_successful = strictly_successful
×
NEW
74
        self.check_for_outputs = check_for_outputs
×
NEW
75
        self.resubmit_partial_complete = resubmit_partial_complete
×
NEW
76
        self.use_specter = use_specter
×
77

78
#################################################
79
############ Script Functions ###################
80
#################################################
81
def batch_script_name(prow):
1✔
82
    """
83
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, JOBDESC, PROCCAMWORD defined)
84
    and determines the script file pathname as defined by desi_proc's helper functions.
85

86
    Args:
87
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
88

89
    Returns:
90
        str: The complete pathname to the script file, as it is defined within the desi_proc ecosystem.
91
    """
92
    expids = prow['EXPID']
1✔
93
    if len(expids) == 0:
1✔
94
        expids = None
1✔
95
    if prow['JOBDESC'] == 'tilenight':
1✔
96
        pathname = get_desi_proc_tilenight_batch_file_pathname(night = prow['NIGHT'], tileid=prow['TILEID'])
1✔
97
    else:
98
        pathname = get_desi_proc_batch_file_pathname(night = prow['NIGHT'], exp=expids, \
1✔
99
                                             jobdesc=prow['JOBDESC'], cameras=prow['PROCCAMWORD'])
100
    scriptfile =  pathname + '.slurm'
1✔
101
    return scriptfile
1✔
102

103
def get_jobdesc_to_file_map():
1✔
104
    """
105
    Returns a mapping of job descriptions to the filenames of the output files
106

107
    Args:
108
        None
109

110
    Returns:
111
        dict. Dictionary with keys as lowercase job descriptions and to the
112
            filename of their expected outputs.
113

114
    """
115
    return {'prestdstar': 'sframe',
1✔
116
            'stdstarfit': 'stdstars',
117
            'poststdstar': 'cframe',
118
            'nightlybias': 'biasnight',
119
            # 'ccdcalib': 'badcolumns',
120
            'badcol': 'badcolumns',
121
            'arc': 'fitpsf',
122
            'flat': 'fiberflat',
123
            'psfnight': 'psfnight',
124
            'nightlyflat': 'fiberflatnight',
125
            'spectra': 'spectra_tile',
126
            'coadds': 'coadds_tile',
127
            'redshift': 'redrock_tile'}
128

129
def get_file_to_jobdesc_map():
1✔
130
    """
131
    Returns a mapping of output filenames to job descriptions
132

133
    Args:
134
        None
135

136
    Returns:
137
        dict. Dictionary with keys as filename of their expected outputs to
138
            the lowercase job descriptions
139
            .
140

141
    """
NEW
142
    job_to_file_map = get_jobdesc_to_file_map()
×
NEW
143
    job_to_file_map.pop('badcol') # these files can also be in a ccdcalib job
×
NEW
144
    job_to_file_map.pop('nightlybias') # these files can also be in a ccdcalib job
×
NEW
145
    return {value: key for key, value in job_to_file_map.items()}
×
146

147
def check_for_outputs_on_disk(prow, resubmit_partial_complete=True):
1✔
148
    """
149
    Args:
150
        prow (Table.Row or dict): Must include keyword accessible definitions for processing_table columns found in
151
            desispect.workflow.proctable.get_processing_table_column_defs()
152
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
153
            jobs with some prior data are pruned using PROCCAMWORD to only process the
154
            remaining cameras not found to exist.
155

156
    Returns:
157
        Table.Row or dict: The same prow type and keywords as input except with modified values updated to reflect
158
        the change in job status after creating and submitting the job for processing.
159
    """
160
    prow['STATUS'] = 'UNKNOWN'
1✔
161
    log = get_logger()
1✔
162

163
    if prow['JOBDESC'] in ['linkcal', 'ccdcalib']:
1✔
164
        log.info(f"jobdesc={prow['JOBDESC']} has indeterminated outputs, so "
1✔
165
                + "not checking for files on disk.")
166
        return prow
1✔
167

168
    job_to_file_map = get_jobdesc_to_file_map()
1✔
169

170
    night = prow['NIGHT']
1✔
171
    if prow['JOBDESC'] in ['cumulative','pernight-v0','pernight','perexp']:
1✔
172
        filetype = 'redrock_tile'
1✔
173
    else:
174
        filetype = job_to_file_map[prow['JOBDESC']]
1✔
175
    orig_camword = prow['PROCCAMWORD']
1✔
176

177
    ## if spectro based, look for spectros, else look for cameras
178
    if prow['JOBDESC'] in ['stdstarfit','spectra','coadds','redshift']:
1✔
179
        ## Spectrograph based
180
        spectros = camword_to_spectros(prow['PROCCAMWORD'])
×
181
        n_desired = len(spectros)
×
182
        ## Suppress outputs about using tile based files in findfile if only looking for stdstarfits
183
        if prow['JOBDESC'] == 'stdstarfit':
×
184
            tileid = None
×
185
        else:
186
            tileid = prow['TILEID']
×
187
        expid = prow['EXPID'][0]
×
188
        existing_spectros = []
×
189
        for spectro in spectros:
×
190
            if os.path.exists(findfile(filetype=filetype, night=night, expid=expid, spectrograph=spectro, tile=tileid)):
×
191
                existing_spectros.append(spectro)
×
192
        completed = (len(existing_spectros) == n_desired)
×
193
        if not completed and resubmit_partial_complete and len(existing_spectros) > 0:
×
194
            existing_camword = 'a' + ''.join([str(spec) for spec in sorted(existing_spectros)])
×
195
            prow['PROCCAMWORD'] = difference_camwords(prow['PROCCAMWORD'],existing_camword)
×
196
    elif prow['JOBDESC'] in ['cumulative','pernight-v0','pernight','perexp']:
1✔
197
        ## Spectrograph based
198
        spectros = camword_to_spectros(prow['PROCCAMWORD'])
1✔
199
        n_desired = len(spectros)
1✔
200
        ## Suppress outputs about using tile based files in findfile if only looking for stdstarfits
201
        tileid = prow['TILEID']
1✔
202
        expid = prow['EXPID'][0]
1✔
203
        redux_dir = specprod_root()
1✔
204
        outdir = os.path.join(redux_dir,get_ztile_relpath(tileid,group=prow['JOBDESC'],night=night,expid=expid))
1✔
205
        suffix = get_ztile_script_suffix(tileid, group=prow['JOBDESC'], night=night, expid=expid)
1✔
206
        existing_spectros = []
1✔
207
        for spectro in spectros:
1✔
208
            if os.path.exists(os.path.join(outdir, f"redrock-{spectro}-{suffix}.fits")):
1✔
209
                existing_spectros.append(spectro)
×
210
        completed = (len(existing_spectros) == n_desired)
1✔
211
        if not completed and resubmit_partial_complete and len(existing_spectros) > 0:
1✔
212
            existing_camword = 'a' + ''.join([str(spec) for spec in sorted(existing_spectros)])
×
213
            prow['PROCCAMWORD'] = difference_camwords(prow['PROCCAMWORD'],existing_camword)
×
214
    else:
215
        ## Otheriwse camera based
216
        cameras = decode_camword(prow['PROCCAMWORD'])
1✔
217
        n_desired = len(cameras)
1✔
218
        if len(prow['EXPID']) > 0:
1✔
219
            expid = prow['EXPID'][0]
1✔
220
        else:
221
            expid = None
1✔
222
        if len(prow['EXPID']) > 1 and prow['JOBDESC'] not in ['psfnight','nightlyflat']:
1✔
223
            log.warning(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']}. This job type only makes " +
×
224
                     f"sense with a single exposure. Proceeding with {expid}.")
225
        missing_cameras = []
1✔
226
        for cam in cameras:
1✔
227
            if not os.path.exists(findfile(filetype=filetype, night=night, expid=expid, camera=cam)):
1✔
228
                missing_cameras.append(cam)
1✔
229
        completed = (len(missing_cameras) == 0)
1✔
230
        if not completed and resubmit_partial_complete and len(missing_cameras) < n_desired:
1✔
231
            prow['PROCCAMWORD'] = create_camword(missing_cameras)
×
232

233
    if completed:
1✔
234
        prow['STATUS'] = 'COMPLETED'
×
235
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} already has " +
×
236
                 f"the desired {n_desired} {filetype}'s. Not submitting this job.")
237
    elif resubmit_partial_complete and orig_camword != prow['PROCCAMWORD']:
1✔
238
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} already has " +
×
239
                 f"some {filetype}'s. Submitting smaller camword={prow['PROCCAMWORD']}.")
240
    elif not resubmit_partial_complete:
1✔
241
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} doesn't have all " +
×
242
                 f"{filetype}'s and resubmit_partial_complete=False. "+
243
                 f"Submitting full camword={prow['PROCCAMWORD']}.")
244
    else:
245
        log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} has no " +
1✔
246
                 f"existing {filetype}'s. Submitting full camword={prow['PROCCAMWORD']}.")
247
    return prow
1✔
248

249
def create_and_submit(prow, queue='realtime', reservation=None, dry_run=0,
1✔
250
                      joint=False, strictly_successful=False,
251
                      check_for_outputs=True, resubmit_partial_complete=True,
252
                      system_name=None, use_specter=False,
253
                      extra_job_args=None):
254
    """
255
    Wrapper script that takes a processing table row and three modifier keywords, creates a submission script for the
256
    compute nodes, and then submits that script to the Slurm scheduler with appropriate dependencies.
257

258
    Args:
259
        prow (Table.Row or dict): Must include keyword accessible definitions for processing_table columns found in
260
            desispect.workflow.proctable.get_processing_table_column_defs()
261
        queue (str, optional): The name of the NERSC Slurm queue to submit to. Default is the realtime queue.
262
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
263
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
264
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
265
            for testing as though scripts are being submitted. Default is 0 (false).
266
        joint (bool, optional): Whether this is a joint fitting job (the job involves multiple exposures) and therefore needs to be
267
            run with desi_proc_joint_fit. Default is False.
268
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
269
            less desirable because e.g. the sciences can run with SVN default calibrations rather
270
            than failing completely from failed calibrations. Default is False.
271
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
272
            data products for the script being submitted. If all files exist and this is True,
273
            then the script will not be submitted. If some files exist and this is True, only the
274
            subset of the cameras without the final data products will be generated and submitted.
275
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
276
            jobs with some prior data are pruned using PROCCAMWORD to only process the
277
            remaining cameras not found to exist.
278
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
279
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
280
        extra_job_args (dict): Dictionary with key-value pairs that specify additional
281
            information used for a specific type of job. Examples include refnight
282
            and include/exclude lists for linkcals, laststeps for for tilenight, etc.
283

284
    Returns:
285
        Table.Row or dict: The same prow type and keywords as input except with modified values updated to reflect
286
        the change in job status after creating and submitting the job for processing.
287

288
    Note:
289
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
290
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
291
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
292
    """
293
    orig_prow = prow.copy()
1✔
294
    if check_for_outputs:
1✔
295
        prow = check_for_outputs_on_disk(prow, resubmit_partial_complete)
1✔
296
        if prow['STATUS'].upper() == 'COMPLETED':
1✔
297
            return prow
×
298

299
    prow = create_batch_script(prow, queue=queue, dry_run=dry_run, joint=joint,
1✔
300
                               system_name=system_name, use_specter=use_specter,
301
                               extra_job_args=extra_job_args)
302
    prow = submit_batch_script(prow, reservation=reservation, dry_run=dry_run,
1✔
303
                               strictly_successful=strictly_successful)
304

305
    ## If resubmitted partial, the PROCCAMWORD and SCRIPTNAME will correspond
306
    ## to the pruned values. But we want to
307
    ## retain the full job's value, so get those from the old job.
308
    if resubmit_partial_complete:
1✔
309
        prow['PROCCAMWORD'] = orig_prow['PROCCAMWORD']
1✔
310
        prow['SCRIPTNAME'] = orig_prow['SCRIPTNAME']
1✔
311
    return prow
1✔
312

313
def desi_link_calibnight_command(prow, refnight, include=None):
1✔
314
    """
315
    Wrapper script that takes a processing table row (or dictionary with
316
    REFNIGHT, NIGHT, PROCCAMWORD defined) and determines the proper command
317
    line call to link data defined by the input row/dict.
318

319
    Args:
320
        prow (Table.Row or dict): Must include keyword accessible definitions
321
            for 'NIGHT', 'REFNIGHT', and 'PROCCAMWORD'.
322
        refnight (str or int): The night with a valid set of calibrations
323
            be created.
324
        include (list): The filetypes to include in the linking.
325
    Returns:
326
        str: The proper command to be submitted to desi_link_calibnight
327
            to process the job defined by the prow values.
328
    """
329
    cmd = 'desi_link_calibnight'
1✔
330
    cmd += f' --refnight={refnight}'
1✔
331
    cmd += f' --newnight={prow["NIGHT"]}'
1✔
332
    cmd += f' --cameras={prow["PROCCAMWORD"]}'
1✔
333
    if include is not None:
1✔
334
        cmd += f' --include=' + ','.join(list(include))
1✔
335
    return cmd
1✔
336

337
def desi_proc_command(prow, system_name, use_specter=False, queue=None):
1✔
338
    """
339
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, OBSTYPE, JOBDESC, PROCCAMWORD defined)
340
    and determines the proper command line call to process the data defined by the input row/dict.
341

342
    Args:
343
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
344
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, perlmutter-gpu
345
        queue (str, optional): The name of the NERSC Slurm queue to submit to. Default is None (which leaves it to the desi_proc default).
346
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
347

348
    Returns:
349
        str: The proper command to be submitted to desi_proc to process the job defined by the prow values.
350
    """
351
    cmd = 'desi_proc'
1✔
352
    cmd += ' --batch'
1✔
353
    cmd += ' --nosubmit'
1✔
354
    if queue is not None:
1✔
355
        cmd += f' -q {queue}'
1✔
356
    if prow['OBSTYPE'].lower() == 'science':
1✔
357
        if prow['JOBDESC'] == 'prestdstar':
×
358
            cmd += ' --nostdstarfit --nofluxcalib'
×
359
        elif prow['JOBDESC'] == 'poststdstar':
×
360
            cmd += ' --noprestdstarfit --nostdstarfit'
×
361

362
        if use_specter:
×
363
            cmd += ' --use-specter'
×
364
    elif prow['JOBDESC'] in ['flat', 'prestdstar'] and use_specter:
1✔
365
        cmd += ' --use-specter'
×
366
    pcamw = str(prow['PROCCAMWORD'])
1✔
367
    cmd += f" --cameras={pcamw} -n {prow['NIGHT']}"
1✔
368
    if len(prow['EXPID']) > 0:
1✔
369
        ## If ccdcalib job without a dark exposure, don't assign the flat expid
370
        ## since it would incorrectly process the flat using desi_proc
371
        if prow['OBSTYPE'].lower() != 'flat' or prow['JOBDESC'] != 'ccdcalib':
1✔
372
            cmd += f" -e {prow['EXPID'][0]}"
1✔
373
    if prow['BADAMPS'] != '':
1✔
374
        cmd += ' --badamps={}'.format(prow['BADAMPS'])
×
375
    return cmd
1✔
376

377
def desi_proc_joint_fit_command(prow, queue=None):
1✔
378
    """
379
    Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, OBSTYPE, PROCCAMWORD defined)
380
    and determines the proper command line call to process the data defined by the input row/dict.
381

382
    Args:
383
        prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
384
        queue (str): The name of the NERSC Slurm queue to submit to. Default is None (which leaves it to the desi_proc default).
385

386
    Returns:
387
        str: The proper command to be submitted to desi_proc_joint_fit
388
            to process the job defined by the prow values.
389
    """
390
    cmd = 'desi_proc_joint_fit'
1✔
391
    cmd += ' --batch'
1✔
392
    cmd += ' --nosubmit'
1✔
393
    if queue is not None:
1✔
394
        cmd += f' -q {queue}'
1✔
395

396
    descriptor = prow['OBSTYPE'].lower()
1✔
397

398
    night = prow['NIGHT']
1✔
399
    specs = str(prow['PROCCAMWORD'])
1✔
400
    expid_str = ','.join([str(eid) for eid in prow['EXPID']])
1✔
401

402
    cmd += f' --obstype {descriptor}'
1✔
403
    cmd += f' --cameras={specs} -n {night}'
1✔
404
    if len(expid_str) > 0:
1✔
405
        cmd += f' -e {expid_str}'
1✔
406
    return cmd
1✔
407

408

409
def create_batch_script(prow, queue='realtime', dry_run=0, joint=False,
1✔
410
                        system_name=None, use_specter=False, extra_job_args=None):
411
    """
412
    Wrapper script that takes a processing table row and three modifier keywords and creates a submission script for the
413
    compute nodes.
414

415
    Args:
416
        prow, Table.Row or dict. Must include keyword accessible definitions for processing_table columns found in
417
            desispect.workflow.proctable.get_processing_table_column_defs()
418
        queue, str. The name of the NERSC Slurm queue to submit to. Default is the realtime queue.
419
        dry_run, int. If nonzero, this is a simulated run. If dry_run=1 the scripts will be written but not submitted.
420
            If dry_run=2, the scripts will not be written nor submitted. Logging will remain the same
421
            for testing as though scripts are being submitted. Default is 0 (false).
422
        joint, bool. Whether this is a joint fitting job (the job involves multiple exposures) and therefore needs to be
423
            run with desi_proc_joint_fit when not using tilenight. Default is False.
424
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
425
        use_specter, bool, optional. Default is False. If True, use specter, otherwise use gpu_specter by default.
426
        extra_job_args (dict): Dictionary with key-value pairs that specify additional
427
            information used for a specific type of job. Examples include refnight
428
            and include/exclude lists for linkcal, laststeps for tilenight, etc.
429

430
    Returns:
431
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
432
        scriptname.
433

434
    Note:
435
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
436
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
437
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
438
    """
439
    log = get_logger()
1✔
440

441
    if extra_job_args is None:
1✔
442
        extra_job_args = {}
1✔
443

444
    if prow['JOBDESC'] in ['perexp','pernight','pernight-v0','cumulative']:
1✔
445
        if dry_run > 1:
1✔
446
            scriptpathname = get_ztile_script_pathname(tileid=prow['TILEID'],group=prow['JOBDESC'],
1✔
447
                                                               night=prow['NIGHT'], expid=prow['EXPID'][0])
448

449
            log.info("Output file would have been: {}".format(scriptpathname))
1✔
450
        else:
451
            #- run zmtl for cumulative redshifts but not others
452
            run_zmtl = (prow['JOBDESC'] == 'cumulative')
1✔
453
            no_afterburners = False
1✔
454
            print(f"entering tileredshiftscript: {prow}")
1✔
455
            scripts, failed_scripts = generate_tile_redshift_scripts(tileid=prow['TILEID'], group=prow['JOBDESC'],
1✔
456
                                                                     nights=[prow['NIGHT']], expids=prow['EXPID'],
457
                                                                     batch_queue=queue, system_name=system_name,
458
                                                                     run_zmtl=run_zmtl,
459
                                                                     no_afterburners=no_afterburners,
460
                                                                     nosubmit=True)
461
            if len(failed_scripts) > 0:
1✔
462
                log.error(f"Redshifts failed for group={prow['JOBDESC']}, night={prow['NIGHT']}, "+
×
463
                          f"tileid={prow['TILEID']}, expid={prow['EXPID']}.")
464
                log.info(f"Returned failed scriptname is {failed_scripts}")
×
465
            elif len(scripts) > 1:
1✔
466
                log.error(f"More than one redshifts returned for group={prow['JOBDESC']}, night={prow['NIGHT']}, "+
×
467
                          f"tileid={prow['TILEID']}, expid={prow['EXPID']}.")
468
                log.info(f"Returned scriptnames were {scripts}")
×
469
            elif len(scripts) == 0:
1✔
NEW
470
                msg = f'No scripts were generated for {prow=}'
×
NEW
471
                log.critical(prow)
×
NEW
472
                raise ValueError(msg)
×
473
            else:
474
                scriptpathname = scripts[0]
1✔
475

476
    elif prow['JOBDESC'] == 'linkcal':
1✔
477
        refnight, include, exclude = -99, None, None
1✔
478
        if 'refnight' in extra_job_args:
1✔
479
            refnight = extra_job_args['refnight']
1✔
480
        if 'include' in extra_job_args:
1✔
481
            include = extra_job_args['include']
1✔
482
        if 'exclude' in extra_job_args:
1✔
NEW
483
            exclude = extra_job_args['exclude']
×
484
        include, exclude = derive_include_exclude(include, exclude)
1✔
485
        ## Fiberflatnights shouldn't to be generated with psfs from same time, so
486
        ## shouldn't link psfs without also linking fiberflatnight
487
        ## However, this should be checked at a higher level. If set here,
488
        ## go ahead and do it
489
        # if 'psfnight' in include and not 'fiberflatnight' in include:
490
        #     err = "Must link fiberflatnight if linking psfnight"
491
        #     log.error(err)
492
        #     raise ValueError(err)
493
        if dry_run > 1:
1✔
494
            scriptpathname = batch_script_name(prow)
1✔
495
            log.info("Output file would have been: {}".format(scriptpathname))
1✔
496
            cmd = desi_link_calibnight_command(prow, refnight, include)
1✔
497
            log.info("Command to be run: {}".format(cmd.split()))
1✔
498
        else:
NEW
499
            if refnight == -99:
×
NEW
500
                err = f'For {prow=} asked to link calibration but not given' \
×
501
                      + ' a valid refnight'
NEW
502
                log.error(err)
×
NEW
503
                raise ValueError(err)
×
504

NEW
505
            cmd = desi_link_calibnight_command(prow, refnight, include)
×
NEW
506
            log.info(f"Running: {cmd.split()}")
×
NEW
507
            scriptpathname = create_linkcal_batch_script(newnight=prow['NIGHT'],
×
508
                                                        cameras=prow['PROCCAMWORD'],
509
                                                        queue=queue,
510
                                                        cmd=cmd,
511
                                                        system_name=system_name)
512
    else:
513
        if prow['JOBDESC'] != 'tilenight':
1✔
514
            nightlybias, nightlycte, cte_expids = False, False, None
1✔
515
            if 'nightlybias' in extra_job_args:
1✔
516
                nightlybias = extra_job_args['nightlybias']
1✔
517
            elif prow['JOBDESC'].lower() == 'nightlybias':
1✔
518
                nightlybias = True
1✔
519
            if 'nightlycte' in extra_job_args:
1✔
520
                nightlycte = extra_job_args['nightlycte']
1✔
521
            if 'cte_expids' in extra_job_args:
1✔
522
                cte_expids = extra_job_args['cte_expids']
1✔
523
            ## run known joint jobs as joint even if unspecified
524
            ## in the future we can eliminate the need for "joint"
525
            if joint or prow['JOBDESC'].lower() in ['psfnight', 'nightlyflat']:
1✔
526
                cmd = desi_proc_joint_fit_command(prow, queue=queue)
1✔
527
            else:
528
                cmd = desi_proc_command(prow, system_name, use_specter, queue=queue)
1✔
529
                if nightlybias:
1✔
530
                    cmd += ' --nightlybias'
1✔
531
                if nightlycte:
1✔
532
                    cmd += ' --nightlycte'
1✔
533
                    if cte_expids is not None:
1✔
534
                        cmd += ' --cte-expids '
1✔
535
                        cmd += ','.join(np.atleast_1d(cte_expids).astype(str))
1✔
536
        if dry_run > 1:
1✔
537
            scriptpathname = batch_script_name(prow)
1✔
538
            log.info("Output file would have been: {}".format(scriptpathname))
1✔
539
            if prow['JOBDESC'] != 'tilenight':
1✔
540
                log.info("Command to be run: {}".format(cmd.split()))
1✔
541
        else:
542
            expids = prow['EXPID']
1✔
543
            if len(expids) == 0:
1✔
544
                expids = None
×
545

546
            if prow['JOBDESC'] == 'tilenight':
1✔
547
                log.info("Creating tilenight script for tile {}".format(prow['TILEID']))
1✔
548
                if 'laststeps' in extra_job_args:
1✔
549
                    laststeps = extra_job_args['laststeps']
1✔
550
                else:
NEW
551
                    err = f'{prow=} job did not specify last steps to tilenight'
×
NEW
552
                    log.error(err)
×
NEW
553
                    raise ValueError(err)
×
554
                ncameras = len(decode_camword(prow['PROCCAMWORD']))
1✔
555
                scriptpathname = create_desi_proc_tilenight_batch_script(
1✔
556
                                                               night=prow['NIGHT'], exp=expids,
557
                                                               tileid=prow['TILEID'],
558
                                                               ncameras=ncameras,
559
                                                               queue=queue,
560
                                                               mpistdstars=True,
561
                                                               use_specter=use_specter,
562
                                                               system_name=system_name,
563
                                                               laststeps=laststeps)
564
            else:
565
                if expids is not None and len(expids) > 1:
1✔
566
                    expids = expids[:1]
1✔
567
                log.info("Running: {}".format(cmd.split()))
1✔
568
                scriptpathname = create_desi_proc_batch_script(night=prow['NIGHT'], exp=expids,
1✔
569
                                                               cameras=prow['PROCCAMWORD'],
570
                                                               jobdesc=prow['JOBDESC'],
571
                                                               queue=queue, cmdline=cmd,
572
                                                               use_specter=use_specter,
573
                                                               system_name=system_name,
574
                                                               nightlybias=nightlybias,
575
                                                               nightlycte=nightlycte,
576
                                                               cte_expids=cte_expids)
577
    log.info("Outfile is: {}".format(scriptpathname))
1✔
578
    prow['SCRIPTNAME'] = os.path.basename(scriptpathname)
1✔
579
    return prow
1✔
580

581
_fake_qid = int(time.time() - 1.7e9)
1✔
582
def _get_fake_qid():
1✔
583
    """
584
    Return fake slurm queue jobid to use for dry-run testing
585
    """
586
    # Note: not implemented as a yield generator so that this returns a
587
    # genuine int, not a generator object
588
    global _fake_qid
589
    _fake_qid += 1
1✔
590
    return _fake_qid
1✔
591

592
def submit_batch_script(prow, dry_run=0, reservation=None, strictly_successful=False):
1✔
593
    """
594
    Wrapper script that takes a processing table row and three modifier keywords and submits the scripts to the Slurm
595
    scheduler.
596

597
    Args:
598
        prow, Table.Row or dict. Must include keyword accessible definitions for processing_table columns found in
599
            desispect.workflow.proctable.get_processing_table_column_defs()
600
        dry_run, int. If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
601
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
602
            for testing as though scripts are being submitted. Default is 0 (false).
603
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
604
        strictly_successful, bool. Whether all jobs require all inputs to have succeeded. For daily processing, this is
605
            less desirable because e.g. the sciences can run with SVN default calibrations rather
606
            than failing completely from failed calibrations. Default is False.
607

608
    Returns:
609
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
610
        scriptname.
611

612
    Note:
613
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
614
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
615
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
616
    """
617
    log = get_logger()
1✔
618
    dep_qids = prow['LATEST_DEP_QID']
1✔
619
    dep_list, dep_str = '', ''
1✔
620

621
    # workaround for sbatch --dependency bug not tracking completed jobs correctly
622
    # see NERSC TICKET INC0203024
623
    if len(dep_qids) > 0 and not dry_run:
1✔
624
        dep_table = queue_info_from_qids(np.asarray(dep_qids), columns='jobid,state')
×
625
        for row in dep_table:
×
626
            if row['STATE'] == 'COMPLETED':
×
627
                log.info(f"removing completed jobid {row['JOBID']}")
×
628
                dep_qids = np.delete(dep_qids, np.argwhere(dep_qids==row['JOBID']))
×
629

630
    if len(dep_qids) > 0:
1✔
631
        jobtype = prow['JOBDESC']
1✔
632
        if strictly_successful:
1✔
633
            depcond = 'afterok'
1✔
634
        elif jobtype in ['arc', 'psfnight', 'prestdstar', 'stdstarfit']:
×
635
            ## (though psfnight and stdstarfit will require some inputs otherwise they'll go up in flames)
636
            depcond = 'afterany'
×
637
        else:
638
            ## if 'flat','nightlyflat','poststdstar', or any type of redshift, require strict success of inputs
639
            depcond = 'afterok'
×
640

641
        dep_str = f'--dependency={depcond}:'
1✔
642

643
        if np.isscalar(dep_qids):
1✔
644
            dep_list = str(dep_qids).strip(' \t')
×
645
            if dep_list == '':
×
646
                dep_str = ''
×
647
            else:
648
                dep_str += dep_list
×
649
        else:
650
            if len(dep_qids)>1:
1✔
651
                dep_list = ':'.join(np.array(dep_qids).astype(str))
1✔
652
                dep_str += dep_list
1✔
653
            elif len(dep_qids) == 1 and dep_qids[0] not in [None, 0]:
1✔
654
                dep_str += str(dep_qids[0])
1✔
655
            else:
656
                dep_str = ''
×
657

658
    # script = f'{jobname}.slurm'
659
    # script_path = pathjoin(batchdir, script)
660
    if prow['JOBDESC'] in ['pernight-v0','pernight','perexp','cumulative']:
1✔
661
        script_path = get_ztile_script_pathname(tileid=prow['TILEID'],group=prow['JOBDESC'],
1✔
662
                                                        night=prow['NIGHT'], expid=np.min(prow['EXPID']))
663
        jobname = os.path.basename(script_path)
1✔
664
    else:
665
        batchdir = get_desi_proc_batch_file_path(night=prow['NIGHT'])
1✔
666
        jobname = batch_script_name(prow)
1✔
667
        script_path = pathjoin(batchdir, jobname)
1✔
668

669
    batch_params = ['sbatch', '--parsable']
1✔
670
    if dep_str != '':
1✔
671
        batch_params.append(f'{dep_str}')
1✔
672
    if reservation is not None:
1✔
673
        batch_params.append(f'--reservation={reservation}')
×
674
    batch_params.append(f'{script_path}')
1✔
675

676
    if dry_run:
1✔
677
        current_qid = _get_fake_qid()
1✔
678
    else:
679
        #- sbatch sometimes fails; try several times before giving up
680
        max_attempts = 3
×
681
        for attempt in range(max_attempts):
×
682
            try:
×
683
                current_qid = subprocess.check_output(batch_params, stderr=subprocess.STDOUT, text=True)
×
684
                current_qid = int(current_qid.strip(' \t\n'))
×
685
                break
×
686
            except subprocess.CalledProcessError as err:
×
687
                log.error(f'{jobname} submission failure at {datetime.datetime.now()}')
×
688
                log.error(f'{jobname}   {batch_params}')
×
689
                log.error(f'{jobname}   {err.output=}')
×
690
                if attempt < max_attempts - 1:
×
691
                    log.info('Sleeping 60 seconds then retrying')
×
692
                    time.sleep(60)
×
693
        else:  #- for/else happens if loop doesn't succeed
694
            msg = f'{jobname} submission failed {max_attempts} times; exiting'
×
695
            log.critical(msg)
×
696
            raise RuntimeError(msg)
×
697

698
    log.info(batch_params)
1✔
699
    log.info(f'Submitted {jobname} with dependencies {dep_str} and reservation={reservation}. Returned qid: {current_qid}')
1✔
700

701
    prow['LATEST_QID'] = current_qid
1✔
702
    prow['ALL_QIDS'] = np.append(prow['ALL_QIDS'],current_qid)
1✔
703
    prow['STATUS'] = 'SUBMITTED'
1✔
704
    prow['SUBMIT_DATE'] = int(time.time())
1✔
705

706
    return prow
1✔
707

708

709
#############################################
710
##########   Row Manipulations   ############
711
#############################################
712
def define_and_assign_dependency(prow, calibjobs, use_tilenight=False,
1✔
713
                                 refnight=None):
714
    """
715
    Given input processing row and possible calibjobs, this defines the
716
    JOBDESC keyword and assigns the dependency appropriate for the job type of
717
    prow.
718

719
    Args:
720
        prow, Table.Row or dict. Must include keyword accessible definitions for
721
            'OBSTYPE'. A row must have column names for
722
            'JOBDESC', 'INT_DEP_IDS', and 'LATEST_DEP_ID'.
723
        calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
724
            and 'nightlyflat'. Each key corresponds to a Table.Row or
725
            None. The table.Row() values are for the corresponding
726
            calibration job. Each value that isn't None must contain
727
            'INTID', and 'LATEST_QID'. If None, it assumes the
728
            dependency doesn't exist and no dependency is assigned.
729
        use_tilenight, bool. Default is False. If True, use desi_proc_tilenight
730
            for prestdstar, stdstar,and poststdstar steps for
731
            science exposures.
732
        refnight, int. The reference night for linking jobs
733

734
    Returns:
735
        Table.Row or dict: The same prow type and keywords as input except
736
        with modified values updated values for 'JOBDESC', 'INT_DEP_IDS'. and 'LATEST_DEP_ID'.
737

738
    Note:
739
        This modifies the input. Though Table.Row objects are generally copied
740
        on modification, so the change to the input object in memory may or may
741
        not be changed. As of writing, a row from a table given to this function
742
        will not change during the execution of this function (but can be
743
        overwritten explicitly with the returned row if desired).
744
    """
745
    if prow['OBSTYPE'] in ['science', 'twiflat']:
1✔
746
        if calibjobs['nightlyflat'] is not None:
1✔
747
            dependency = calibjobs['nightlyflat']
1✔
748
        elif calibjobs['psfnight'] is not None:
1✔
749
            dependency = calibjobs['psfnight']
1✔
750
        elif calibjobs['ccdcalib'] is not None:
1✔
751
            dependency = calibjobs['ccdcalib']
1✔
752
        elif calibjobs['nightlybias'] is not None:
1✔
753
            dependency = calibjobs['nightlybias']
×
754
        elif calibjobs['badcol'] is not None:
1✔
NEW
755
            dependency = calibjobs['badcol']
×
756
        else:
757
            dependency = calibjobs['linkcal']
1✔
758
        if not use_tilenight:
1✔
759
            prow['JOBDESC'] = 'prestdstar'
1✔
760
    elif prow['OBSTYPE'] == 'flat':
1✔
761
        if calibjobs['psfnight'] is not None:
1✔
762
            dependency = calibjobs['psfnight']
1✔
763
        elif calibjobs['ccdcalib'] is not None:
1✔
764
            dependency = calibjobs['ccdcalib']
1✔
765
        elif calibjobs['nightlybias'] is not None:
1✔
766
            dependency = calibjobs['nightlybias']
×
767
        elif calibjobs['badcol'] is not None:
1✔
NEW
768
            dependency = calibjobs['badcol']
×
769
        else:
770
            dependency = calibjobs['linkcal']
1✔
771
    elif prow['OBSTYPE'] == 'arc':
1✔
772
        if calibjobs['ccdcalib'] is not None:
1✔
773
            dependency = calibjobs['ccdcalib']
1✔
774
        elif calibjobs['nightlybias'] is not None:
1✔
775
            dependency = calibjobs['nightlybias']
1✔
776
        elif calibjobs['badcol'] is not None:
1✔
777
            dependency = calibjobs['badcol']
×
778
        else:
779
            dependency = calibjobs['linkcal']
1✔
780
    elif prow['JOBDESC'] in ['badcol', 'nightlybias', 'ccdcalib']:
1✔
781
        dependency = calibjobs['linkcal']
1✔
782
    elif prow['OBSTYPE'] == 'dark':
1✔
NEW
783
        if calibjobs['ccdcalib'] is not None:
×
NEW
784
            dependency = calibjobs['ccdcalib']
×
NEW
785
        elif calibjobs['nightlybias'] is not None:
×
NEW
786
            dependency = calibjobs['nightlybias']
×
NEW
787
        elif calibjobs['badcol'] is not None:
×
NEW
788
            dependency = calibjobs['badcol']
×
789
        else:
NEW
790
            dependency = calibjobs['linkcal']
×
791
    elif prow['JOBDESC'] == 'linkcal' and refnight is not None:
1✔
792
        dependency = None
1✔
793
        ## For link cals only, enable cross-night dependencies if available
794
        refproctable = findfile('proctable', night=refnight)
1✔
795
        if os.path.exists(refproctable):
1✔
NEW
796
            ptab = load_table(tablename=refproctable, tabletype='proctable')
×
797
            ## This isn't perfect because we may depend on jobs that aren't
798
            ## actually being linked
799
            ## Also allows us to proceed even if jobs don't exist yet
NEW
800
            deps = []
×
NEW
801
            for job in ['nightlybias', 'ccdcalib', 'psfnight', 'nightlyflat']:
×
NEW
802
                if job in ptab['JOBDESC']:
×
803
                    ## add prow to dependencies
NEW
804
                    deps.append(ptab[ptab['JOBDESC']==job][0])
×
NEW
805
            if len(deps) > 0:
×
NEW
806
                dependency = deps
×
807
    else:
808
        dependency = None
×
809

810
    prow = assign_dependency(prow, dependency)
1✔
811

812
    return prow
1✔
813

814

815
def assign_dependency(prow, dependency):
1✔
816
    """
817
    Given input processing row and possible arcjob (processing row for psfnight) and flatjob (processing row for
818
    nightlyflat), this defines the JOBDESC keyword and assigns the dependency appropriate for the job type of prow.
819

820
    Args:
821
        prow, Table.Row or dict. Must include keyword accessible definitions for 'OBSTYPE'. A row must have column names for
822
            'JOBDESC', 'INT_DEP_IDS', and 'LATEST_DEP_ID'.
823
        dependency, NoneType or scalar/list/array of Table.Row, dict. Processing row corresponding to the required input
824
            for the job in prow. This must contain keyword
825
            accessible values for 'INTID', and 'LATEST_QID'.
826
            If None, it assumes the dependency doesn't exist
827
            and no dependency is assigned.
828

829
    Returns:
830
        Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
831
        'JOBDESC', 'INT_DEP_IDS'. and 'LATEST_DEP_ID'.
832

833
    Note:
834
        This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
835
        input object in memory may or may not be changed. As of writing, a row from a table given to this function will
836
        not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
837
    """
838
    prow['INT_DEP_IDS'] = np.ndarray(shape=0).astype(int)
1✔
839
    prow['LATEST_DEP_QID'] = np.ndarray(shape=0).astype(int)
1✔
840
    if dependency is not None:
1✔
841
        if type(dependency) in [list, np.array]:
1✔
842
            ids, qids = [], []
1✔
843
            for curdep in dependency:
1✔
844
                if still_a_dependency(curdep):
1✔
845
                    ids.append(curdep['INTID'])
1✔
846
                    qids.append(curdep['LATEST_QID'])
1✔
847
            prow['INT_DEP_IDS'] = np.array(ids, dtype=int)
1✔
848
            prow['LATEST_DEP_QID'] = np.array(qids, dtype=int)
1✔
849
        elif type(dependency) in [dict, OrderedDict, Table.Row] and still_a_dependency(dependency):
1✔
850
            prow['INT_DEP_IDS'] = np.array([dependency['INTID']], dtype=int)
1✔
851
            prow['LATEST_DEP_QID'] = np.array([dependency['LATEST_QID']], dtype=int)
1✔
852
    return prow
1✔
853

854
def still_a_dependency(dependency):
1✔
855
    """
856
    Defines the criteria for which a dependency is deemed complete (and therefore no longer a dependency).
857

858
     Args:
859
        dependency, Table.Row or dict. Processing row corresponding to the required input for the job in prow.
860
            This must contain keyword accessible values for 'STATUS', and 'LATEST_QID'.
861

862
    Returns:
863
        bool: False if the criteria indicate that the dependency is completed and no longer a blocking factor (ie no longer
864
        a genuine dependency). Returns True if the dependency is still a blocking factor such that the slurm
865
        scheduler needs to be aware of the pending job.
866

867
    """
868
    return dependency['LATEST_QID'] > 0 and dependency['STATUS'] != 'COMPLETED'
1✔
869

870
def get_type_and_tile(erow):
1✔
871
    """
872
    Trivial function to return the OBSTYPE and the TILEID from an exposure table row
873

874
    Args:
875
        erow, Table.Row or dict. Must contain 'OBSTYPE' and 'TILEID' as keywords.
876

877
    Returns:
878
        tuple (str, str), corresponding to the OBSTYPE and TILEID values of the input erow.
879
    """
880
    return str(erow['OBSTYPE']).lower(), erow['TILEID']
×
881

882

883
#############################################
884
#########   Table manipulators   ############
885
#############################################
886
def parse_previous_tables(etable, ptable, night):
1✔
887
    """
888
    This takes in the exposure and processing tables and regenerates all the working memory variables needed for the
889
    daily processing script.
890

891
    Used by the daily processing to define most of its state-ful variables into working memory.
892
    If the processing table is empty, these are simply declared and returned for use.
893
    If the code had previously run and exited (or crashed), however, this will all the code to
894
    re-establish itself by redefining these values.
895

896
    Args:
897
        etable, Table, Exposure table of all exposures that have been dealt with thus far.
898
        ptable, Table, Processing table of all exposures that have been processed.
899
        night, str or int, the night the data was taken.
900

901
    Returns:
902
        tuple: A tuple containing:
903

904
        * arcs, list of dicts, list of the individual arc jobs used for the psfnight (NOT all
905
          the arcs, if multiple sets existed)
906
        * flats, list of dicts, list of the individual flat jobs used for the nightlyflat (NOT
907
          all the flats, if multiple sets existed)
908
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
909
          (if currently processing that tile)
910
        * calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'badcol', 'psfnight'
911
          and 'nightlyflat'. Each key corresponds to a Table.Row or
912
          None. The table.Row() values are for the corresponding
913
          calibration job.
914
        * curtype, None, the obstype of the current job being run. Always None as first new job will define this.
915
        * lasttype, str or None, the obstype of the last individual exposure row to be processed.
916
        * curtile, None, the tileid of the current job (if science). Otherwise None. Always None as first
917
          new job will define this.
918
        * lasttile, str or None, the tileid of the last job (if science). Otherwise None.
919
        * internal_id, int, an internal identifier unique to each job. Increments with each new job. This
920
          is the latest unassigned value.
921
    """
922
    log = get_logger()
×
923
    arcs, flats, sciences = [], [], []
×
NEW
924
    calibjobs = {'nightlybias': None, 'ccdcalib': None, 'badcol': None,
×
925
                 'psfnight': None, 'nightlyflat': None, 'linkcal': None,
926
                 'accounted_for': dict()}
927
    curtype,lasttype = None,None
×
928
    curtile,lasttile = None,None
×
929

930
    if len(ptable) > 0:
×
931
        prow = ptable[-1]
×
932
        internal_id = int(prow['INTID'])+1
×
933
        lasttype,lasttile = get_type_and_tile(ptable[-1])
×
934
        jobtypes = ptable['JOBDESC']
×
935

936
        if 'nightlybias' in jobtypes:
×
937
            calibjobs['nightlybias'] = table_row_to_dict(ptable[jobtypes=='nightlybias'][0])
×
938
            log.info("Located nightlybias job in exposure table: {}".format(calibjobs['nightlybias']))
×
939

940
        if 'ccdcalib' in jobtypes:
×
941
            calibjobs['ccdcalib'] = table_row_to_dict(ptable[jobtypes=='ccdcalib'][0])
×
942
            log.info("Located ccdcalib job in exposure table: {}".format(calibjobs['ccdcalib']))
×
943

944
        if 'psfnight' in jobtypes:
×
945
            calibjobs['psfnight'] = table_row_to_dict(ptable[jobtypes=='psfnight'][0])
×
946
            log.info("Located joint fit psfnight job in exposure table: {}".format(calibjobs['psfnight']))
×
947
        elif lasttype == 'arc':
×
948
            seqnum = 10
×
949
            for row in ptable[::-1]:
×
950
                erow = etable[etable['EXPID']==row['EXPID'][0]]
×
951
                if row['OBSTYPE'].lower() == 'arc' and int(erow['SEQNUM'])<seqnum:
×
952
                    arcs.append(table_row_to_dict(row))
×
953
                    seqnum = int(erow['SEQNUM'])
×
954
                else:
955
                    break
×
956
            ## Because we work backword to fill in, we need to reverse them to get chronological order back
957
            arcs = arcs[::-1]
×
958

959
        if 'nightlyflat' in jobtypes:
×
960
            calibjobs['nightlyflat'] = table_row_to_dict(ptable[jobtypes=='nightlyflat'][0])
×
961
            log.info("Located joint fit nightlyflat job in exposure table: {}".format(calibjobs['nightlyflat']))
×
962
        elif lasttype == 'flat':
×
963
            for row in ptable[::-1]:
×
964
                erow = etable[etable['EXPID']==row['EXPID'][0]]
×
965
                if row['OBSTYPE'].lower() == 'flat' and int(erow['SEQTOT']) < 5:
×
966
                    if float(erow['EXPTIME']) > 100.:
×
967
                        flats.append(table_row_to_dict(row))
×
968
                else:
969
                    break
×
970
            flats = flats[::-1]
×
971

972
        if lasttype.lower() == 'science':
×
973
            for row in ptable[::-1]:
×
974
                if row['OBSTYPE'].lower() == 'science' and row['TILEID'] == lasttile and \
×
975
                   row['JOBDESC'] == 'prestdstar' and row['LASTSTEP'] != 'skysub':
976
                    sciences.append(table_row_to_dict(row))
×
977
                else:
978
                    break
×
979
            sciences = sciences[::-1]
×
980
    else:
981
        internal_id = night_to_starting_iid(night)
×
982

983
    return arcs,flats,sciences, \
×
984
           calibjobs, \
985
           curtype, lasttype, \
986
           curtile, lasttile,\
987
           internal_id
988

989
def generate_calibration_dict(ptable, files_to_link=None):
1✔
990
    """
991
    This takes in a processing table and regenerates the working memory calibration
992
    dictionary for dependency tracking. Used by the daily processing to define 
993
    most of its state-ful variables into working memory.
994
    If the processing table is empty, these are simply declared and returned for use.
995
    If the code had previously run and exited (or crashed), however, this will all the code to
996
    re-establish itself by redefining these values.
997

998
    Args:
999
        ptable, Table, Processing table of all exposures that have been processed.
1000
        files_to_link, set, Set of filenames that the linkcal job will link.
1001

1002
    Returns:
1003
        calibjobs, dict. Dictionary containing 'nightlybias', 'badcol', 'ccdcalib',
1004
            'psfnight', 'nightlyflat', 'linkcal', and 'completed'. Each key corresponds to a
1005
            Table.Row or None. The table.Row() values are for the corresponding
1006
            calibration job.
1007
    """
1008
    log = get_logger()
1✔
1009
    job_to_file_map = get_jobdesc_to_file_map()
1✔
1010
    accounted_for = {'biasnight': False, 'badcolumns': False,
1✔
1011
                     'ctecorrnight': False, 'psfnight': False,
1012
                     'fiberflatnight': False}
1013
    calibjobs = {'nightlybias': None, 'ccdcalib': None, 'badcol': None,
1✔
1014
                 'psfnight': None, 'nightlyflat': None, 'linkcal': None}
1015

1016
    ptable_jobtypes = ptable['JOBDESC']
1✔
1017

1018
    for jobtype in calibjobs.keys():
1✔
1019
        if jobtype in ptable_jobtypes:
1✔
NEW
1020
            calibjobs[jobtype] = table_row_to_dict(ptable[ptable_jobtypes==jobtype][0])
×
NEW
1021
            log.info(f"Located {jobtype} job in exposure table: {calibjobs[jobtype]}")
×
NEW
1022
            if jobtype == 'linkcal':
×
NEW
1023
                if files_to_link is not None and len(files_to_link) > 0:
×
NEW
1024
                    log.info(f"Assuming existing linkcal job processed "
×
1025
                             + f"{files_to_link} since given in override file.")
NEW
1026
                    calibjobs = update_calibjobs_with_linking(calibjobs, files_to_link)
×
1027
                else:
NEW
1028
                    err = f"linkcal job exists but no files given: {files_to_link=}"
×
NEW
1029
                    log.error(err)
×
NEW
1030
                    raise ValueError(err)
×
NEW
1031
            elif jobtype == 'ccdcalib':
×
NEW
1032
                possible_ccd_files = set(['biasnight', 'badcolumns', 'ctecorrnight'])
×
NEW
1033
                if files_to_link is None:
×
NEW
1034
                    files_accounted_for = possible_ccd_files
×
1035
                else:
NEW
1036
                    files_accounted_for = possible_ccd_files.difference(files_to_link)
×
NEW
1037
                    ccd_files_linked = possible_ccd_files.intersection(files_to_link)
×
NEW
1038
                    log.info(f"Assuming existing ccdcalib job processed "
×
1039
                             + f"{files_accounted_for} since {ccd_files_linked} "
1040
                             + f"are linked.")
NEW
1041
                for fil in files_accounted_for:
×
NEW
1042
                    accounted_for[fil] = True
×
1043
            else:
NEW
1044
                accounted_for[job_to_file_map[jobtype]] = True
×
1045

1046
    calibjobs['accounted_for'] = accounted_for
1✔
1047
    return calibjobs
1✔
1048

1049
def update_calibjobs_with_linking(calibjobs, files_to_link):
1✔
1050
    """
1051
    This takes in a dictionary summarizing the calibration jobs and updates it
1052
    based on the files_to_link, which are assumed to have already been linked
1053
    such that those files already exist on disk and don't need ot be generated.
1054

1055
    Parameters
1056
    ----------
1057
        calibjobs: dict
1058
            Dictionary containing "nightlybias", "badcol", "ccdcalib",
1059
            "psfnight", "nightlyflat", "linkcal", and "accounted_for". Each key corresponds to a
1060
            Table.Row or None. The table.Row() values are for the corresponding
1061
            calibration job.
1062
        files_to_link: set
1063
            Set of filenames that the linkcal job will link.
1064

1065
    Returns
1066
    -------
1067
        calibjobs, dict
1068
            Dictionary containing 'nightlybias', 'badcol', 'ccdcalib',
1069
            'psfnight', 'nightlyflat', 'linkcal', and 'accounted_for'. Each key corresponds to a
1070
            Table.Row or None. The table.Row() values are for the corresponding
1071
            calibration job.
1072
    """
1073
    log = get_logger()
1✔
1074
    
1075
    for fil in files_to_link:
1✔
1076
        if fil in calibjobs['accounted_for']:
1✔
1077
            calibjobs['accounted_for'][fil] = True
1✔
1078
        else:
NEW
1079
            err = f"{fil} doesn't match an expected filetype: "
×
NEW
1080
            err += f"{calibjobs['accounted_for'].keys()}"
×
NEW
1081
            log.error(err)
×
NEW
1082
            raise ValueError(err)
×
1083

1084
    return calibjobs
1✔
1085

1086
def all_calibs_submitted(accounted_for, do_cte_flats):
1✔
1087
    """
1088
    Function that returns the boolean logic to determine if the necessary
1089
    calibration jobs have been submitted for calibration.
1090

1091
    Args:
1092
        accounted_for, dict, Dictionary with keys corresponding to the calibration
1093
            filenames and values of True or False.
1094
        do_cte_flats, bool, whether ctecorrnight files are expected or not.
1095

1096
    Returns:
1097
        bool, True if all necessary calibrations have been submitted or handled, False otherwise.
1098
    """
1099
    test_dict = accounted_for.copy()
1✔
1100
    if not do_cte_flats:
1✔
1101
        test_dict.pop('ctecorrnight')
1✔
1102

1103
    return np.all(list(test_dict.values()))
1✔
1104

1105
def update_and_recurvsively_submit(proc_table, submits=0, resubmission_states=None,
1✔
1106
                                   ptab_name=None, dry_run=0,reservation=None):
1107
    """
1108
    Given an processing table, this loops over job rows and resubmits failed jobs (as defined by resubmission_states).
1109
    Before submitting a job, it checks the dependencies for failures. If a dependency needs to be resubmitted, it recursively
1110
    follows dependencies until it finds the first job without a failed dependency and resubmits that. Then resubmits the
1111
    other jobs with the new Slurm jobID's for proper dependency coordination within Slurm.
1112

1113
    Args:
1114
        proc_table, Table, the processing table with a row per job.
1115
        submits, int, the number of submissions made to the queue. Used for saving files and in not overloading the scheduler.
1116
        resubmission_states, list or array of strings, each element should be a capitalized string corresponding to a
1117
            possible Slurm scheduler state, where you wish for jobs with that
1118
            outcome to be resubmitted
1119
        ptab_name, str, the full pathname where the processing table should be saved.
1120
        dry_run, int, If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1121
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1122
            for testing as though scripts are being submitted. Default is 0 (false).
1123
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
1124

1125
    Returns:
1126
        tuple: A tuple containing:
1127

1128
        * proc_table: Table, a table with the same rows as the input except that Slurm and jobid relevant columns have
1129
          been updated for those jobs that needed to be resubmitted.
1130
        * submits: int, the number of submissions made to the queue. This is incremented from the input submits, so it is
1131
          the number of submissions made from this function call plus the input submits value.
1132

1133
    Note:
1134
        This modifies the inputs of both proc_table and submits and returns them.
1135
    """
1136
    log = get_logger()
×
1137
    if resubmission_states is None:
×
1138
        resubmission_states = get_resubmission_states()
×
1139
    log.info(f"Resubmitting jobs with current states in the following: {resubmission_states}")
×
1140
    proc_table = update_from_queue(proc_table, dry_run=False)
×
1141
    log.info("Updated processing table queue information:")
×
1142
    cols = ['INTID', 'INT_DEP_IDS', 'EXPID', 'TILEID',
×
1143
            'OBSTYPE', 'JOBDESC', 'LATEST_QID', 'STATUS']
1144
    print(np.array(cols))
×
1145
    for row in proc_table:
×
1146
        print(np.array(row[cols]))
×
1147
    print("\n")
×
1148
    id_to_row_map = {row['INTID']: rown for rown, row in enumerate(proc_table)}
×
1149
    for rown in range(len(proc_table)):
×
1150
        if proc_table['STATUS'][rown] in resubmission_states:
×
1151
            proc_table, submits = recursive_submit_failed(rown, proc_table, submits,
×
1152
                                                          id_to_row_map, ptab_name,
1153
                                                          resubmission_states,
1154
                                                          reservation, dry_run)
1155
    return proc_table, submits
×
1156

1157
def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name=None,
1✔
1158
                            resubmission_states=None, reservation=None, dry_run=0):
1159
    """
1160
    Given a row of a processing table and the full processing table, this resubmits the given job.
1161
    Before submitting a job, it checks the dependencies for failures in the processing table. If a dependency needs to
1162
    be resubmitted, it recursively follows dependencies until it finds the first job without a failed dependency and
1163
    resubmits that. Then resubmits the other jobs with the new Slurm jobID's for proper dependency coordination within Slurm.
1164

1165
    Args:
1166
        rown, Table.Row, the row of the processing table that you want to resubmit.
1167
        proc_table, Table, the processing table with a row per job.
1168
        submits, int, the number of submissions made to the queue. Used for saving files and in not overloading the scheduler.
1169
        id_to_row_map, dict, lookup dictionary where the keys are internal ids (INTID's) and the values are the row position
1170
            in the processing table.
1171
        ptab_name, str, the full pathname where the processing table should be saved.
1172
        resubmission_states, list or array of strings, each element should be a capitalized string corresponding to a
1173
            possible Slurm scheduler state, where you wish for jobs with that
1174
            outcome to be resubmitted
1175
        reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
1176
        dry_run, int, If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1177
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1178
            for testing as though scripts are being submitted. Default is 0 (false).
1179

1180
    Returns:
1181
        tuple: A tuple containing:
1182

1183
        * proc_table: Table, a table with the same rows as the input except that Slurm and jobid relevant columns have
1184
          been updated for those jobs that needed to be resubmitted.
1185
        * submits: int, the number of submissions made to the queue. This is incremented from the input submits, so it is
1186
          the number of submissions made from this function call plus the input submits value.
1187

1188
    Note:
1189
        This modifies the inputs of both proc_table and submits and returns them.
1190
    """
1191
    log = get_logger()
×
1192
    row = proc_table[rown]
×
1193
    log.info(f"Identified row {row['INTID']} as needing resubmission.")
×
1194
    log.info(f"{row['INTID']}: Expid(s): {row['EXPID']}  Job: {row['JOBDESC']}")
×
1195
    if resubmission_states is None:
×
1196
        resubmission_states = get_resubmission_states()
×
1197
    ideps = proc_table['INT_DEP_IDS'][rown]
×
1198
    if ideps is None:
×
1199
        proc_table['LATEST_DEP_QID'][rown] = np.ndarray(shape=0).astype(int)
×
1200
    else:
1201
        all_valid_states = list(resubmission_states.copy())
×
1202
        all_valid_states.extend(['RUNNING','PENDING','SUBMITTED','COMPLETED'])
×
1203
        for idep in np.sort(np.atleast_1d(ideps)):
×
NEW
1204
            if idep not in id_to_row_map and idep // 1000 != row['INTID'] // 1000:
×
NEW
1205
                log.warning(f"Internal ID: {idep} not in id_to_row_map. "
×
1206
                            + "This is expected since it's from another day. "
1207
                            + f" This dependency will not be checked or "
1208
                            + "resubmitted")
NEW
1209
                continue
×
1210
            if proc_table['STATUS'][id_to_row_map[idep]] not in all_valid_states:
×
1211
                log.warning(f"Proc INTID: {proc_table['INTID'][rown]} depended on" +
×
1212
                            f" INTID {proc_table['INTID'][id_to_row_map[idep]]}" +
1213
                            f" but that exposure has state" +
1214
                            f" {proc_table['STATUS'][id_to_row_map[idep]]} that" +
1215
                            f" isn't in the list of resubmission states." +
1216
                            f" Exiting this job's resubmission attempt.")
1217
                proc_table['STATUS'][rown] = "DEP_NOT_SUBD"
×
1218
                return proc_table, submits
×
1219
        qdeps = []
×
1220
        for idep in np.sort(np.atleast_1d(ideps)):
×
NEW
1221
            if idep not in id_to_row_map and idep // 1000 != row['INTID'] // 1000:
×
NEW
1222
                log.warning(f"Internal ID: {idep} not in id_to_row_map. "
×
1223
                            + "This is expected since it's from another day. "
1224
                            + f" This dependency will not be checked or "
1225
                            + "resubmitted")
NEW
1226
                continue
×
1227
            if proc_table['STATUS'][id_to_row_map[idep]] in resubmission_states:
×
1228
                proc_table, submits = recursive_submit_failed(id_to_row_map[idep],
×
1229
                                                              proc_table, submits,
1230
                                                              id_to_row_map,
1231
                                                              reservation=reservation,
1232
                                                              dry_run=dry_run)
1233
            qdeps.append(proc_table['LATEST_QID'][id_to_row_map[idep]])
×
1234

1235
        qdeps = np.atleast_1d(qdeps)
×
1236
        if len(qdeps) > 0:
×
1237
            proc_table['LATEST_DEP_QID'][rown] = qdeps
×
1238
        else:
1239
            log.error(f"number of qdeps should be 1 or more: Rown {rown}, ideps {ideps}")
×
1240

1241
    proc_table[rown] = submit_batch_script(proc_table[rown], reservation=reservation,
×
1242
                                           strictly_successful=True, dry_run=dry_run)
1243
    submits += 1
×
1244

1245
    if not dry_run:
×
1246
        sleep_and_report(1, message_suffix=f"after submitting job to queue")
×
1247
        if submits % 10 == 0:
×
1248
            if ptab_name is None:
×
1249
                write_table(proc_table, tabletype='processing', overwrite=True)
×
1250
            else:
1251
                write_table(proc_table, tablename=ptab_name, overwrite=True)
×
1252
            sleep_and_report(2, message_suffix=f"after writing to disk")
×
1253
        if submits % 100 == 0:
×
1254
            proc_table = update_from_queue(proc_table)
×
1255
            if ptab_name is None:
×
1256
                write_table(proc_table, tabletype='processing', overwrite=True)
×
1257
            else:
1258
                write_table(proc_table, tablename=ptab_name, overwrite=True)
×
1259
            sleep_and_report(10, message_suffix=f"after updating queue and writing to disk")
×
1260
    return proc_table, submits
×
1261

1262

1263
#########################################
1264
########     Joint fit     ##############
1265
#########################################
1266
def joint_fit(ptable, prows, internal_id, queue, reservation, descriptor, z_submit_types=None,
1✔
1267
              dry_run=0, strictly_successful=False, check_for_outputs=True, resubmit_partial_complete=True,
1268
              system_name=None):
1269
    """
1270
    DEPRECATED
1271
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1272
    joint fitting job given by descriptor. If the joint fitting job is standard star fitting, the post standard star fits
1273
    for all the individual exposures also created and submitted. The returned ptable has all of these rows added to the
1274
    table given as input.
1275

1276
    Args:
1277
        ptable (Table): The processing table where each row is a processed job.
1278
        prows (list or array of dict): The rows corresponding to the individual exposure jobs that are
1279
            inputs to the joint fit.
1280
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1281
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1282
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1283
        descriptor (str): Description of the joint fitting job. Can either be 'science' or 'stdstarfit', 'arc' or 'psfnight',
1284
            or 'flat' or 'nightlyflat'.
1285
        z_submit_types (list of str, optional): The "group" types of redshifts that should be submitted with each
1286
            exposure. If not specified or None, then no redshifts are submitted.
1287
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1288
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1289
            for testing as though scripts are being submitted. Default is 0 (false).
1290
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1291
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1292
            than failing completely from failed calibrations. Default is False.
1293
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1294
            data products for the script being submitted. If all files exist and this is True,
1295
            then the script will not be submitted. If some files exist and this is True, only the
1296
            subset of the cameras without the final data products will be generated and submitted.
1297
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1298
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1299
            remaining cameras not found to exist.
1300
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1301

1302
    Returns:
1303
        tuple: A tuple containing:
1304

1305
        * ptable, Table. The same processing table as input except with added rows for the joint fit job and, in the case
1306
          of a stdstarfit, the poststdstar science exposure jobs.
1307
        * joint_prow, dict. Row of a processing table corresponding to the joint fit job.
1308
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1309
    """
1310
    log = get_logger()
×
1311
    if len(prows) < 1:
×
1312
        return ptable, None, internal_id
×
1313

1314
    if descriptor is None:
×
1315
        return ptable, None
×
1316
    elif descriptor == 'arc':
×
1317
        descriptor = 'psfnight'
×
1318
    elif descriptor == 'flat':
×
1319
        descriptor = 'nightlyflat'
×
1320
    elif descriptor == 'science':
×
1321
        if z_submit_types is None or len(z_submit_types) == 0:
×
1322
            descriptor = 'stdstarfit'
×
1323

1324
    if descriptor not in ['psfnight', 'nightlyflat', 'science','stdstarfit']:
×
1325
        return ptable, None, internal_id
×
1326

1327
    log.info(" ")
×
1328
    log.info(f"Joint fit criteria found. Running {descriptor}.\n")
×
1329

1330
    if descriptor == 'science':
×
NEW
1331
        joint_prow, internal_id = make_joint_prow(prows, descriptor='stdstarfit', internal_id=internal_id)
×
1332
    else:
NEW
1333
        joint_prow, internal_id = make_joint_prow(prows, descriptor=descriptor, internal_id=internal_id)
×
1334
    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1335
                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1336
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1337
    ptable.add_row(joint_prow)
×
1338

1339
    if descriptor in ['science','stdstarfit']:
×
1340
        if descriptor == 'science':
×
1341
            zprows = []
×
1342
        log.info(" ")
×
1343
        log.info(f"Submitting individual science exposures now that joint fitting of standard stars is submitted.\n")
×
1344
        for row in prows:
×
1345
            if row['LASTSTEP'] == 'stdstarfit':
×
1346
                continue
×
1347
            row['JOBDESC'] = 'poststdstar'
×
1348

1349
            # poststdstar job can't process cameras not included in its stdstar joint fit
1350
            stdcamword = joint_prow['PROCCAMWORD']
×
1351
            thiscamword = row['PROCCAMWORD']
×
1352
            proccamword = camword_intersection([stdcamword, thiscamword])
×
1353
            if proccamword != thiscamword:
×
1354
                dropcams = difference_camwords(thiscamword, proccamword)
×
1355
                assert dropcams != ''  #- i.e. if they differ, we should be dropping something
×
1356
                log.warning(f"Dropping exp {row['EXPID']} poststdstar cameras {dropcams} since they weren't included in stdstar fit {stdcamword}")
×
1357
                row['PROCCAMWORD'] = proccamword
×
1358

1359
            row['INTID'] = internal_id
×
1360
            internal_id += 1
×
1361
            row['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
×
1362
            row = assign_dependency(row, joint_prow)
×
1363
            row = create_and_submit(row, queue=queue, reservation=reservation, dry_run=dry_run,
×
1364
                                    strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1365
                                    resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1366
            ptable.add_row(row)
×
1367
            if descriptor == 'science' and row['LASTSTEP'] == 'all':
×
1368
                zprows.append(row)
×
1369

1370
    ## Now run redshifts
1371
    if descriptor == 'science' and len(zprows) > 0 and z_submit_types is not None:
×
1372
        prow_selection = (  (ptable['OBSTYPE'] == 'science')
×
1373
                          & (ptable['LASTSTEP'] == 'all')
1374
                          & (ptable['JOBDESC'] == 'poststdstar')
1375
                          & (ptable['TILEID'] == int(zprows[0]['TILEID'])) )
1376
        nightly_zprows = []
×
1377
        if np.sum(prow_selection) == len(zprows):
×
1378
            nightly_zprows = zprows.copy()
×
1379
        else:
1380
            for prow in ptable[prow_selection]:
×
1381
                nightly_zprows.append(table_row_to_dict(prow))
×
1382

1383
        for zsubtype in z_submit_types:
×
1384
            if zsubtype == 'perexp':
×
1385
                for zprow in zprows:
×
1386
                    log.info(" ")
×
1387
                    log.info(f"Submitting redshift fit of type {zsubtype} for TILEID {zprow['TILEID']} and EXPID {zprow['EXPID']}.\n")
×
NEW
1388
                    joint_prow, internal_id = make_joint_prow([zprow], descriptor=zsubtype, internal_id=internal_id)
×
1389
                    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1390
                                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1391
                                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1392
                    ptable.add_row(joint_prow)
×
1393
            else:
1394
                log.info(" ")
×
1395
                log.info(f"Submitting joint redshift fits of type {zsubtype} for TILEID {nightly_zprows[0]['TILEID']}.")
×
1396
                expids = [prow['EXPID'][0] for prow in nightly_zprows]
×
1397
                log.info(f"Expids: {expids}.\n")
×
NEW
1398
                joint_prow, internal_id = make_joint_prow(nightly_zprows, descriptor=zsubtype, internal_id=internal_id)
×
1399
                joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1400
                                               strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1401
                                               resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1402
                ptable.add_row(joint_prow)
×
1403

1404
    if descriptor in ['psfnight', 'nightlyflat']:
×
1405
        log.info(f"Setting the calibration exposures as calibrators in the processing table.\n")
×
1406
        ptable = set_calibrator_flag(prows, ptable)
×
1407

1408
    return ptable, joint_prow, internal_id
×
1409

1410
def joint_cal_fit(descriptor, ptable, prows, internal_id, queue, reservation,
1✔
1411
                  dry_run=0, strictly_successful=False, check_for_outputs=True,
1412
                  resubmit_partial_complete=True, system_name=None):
1413
    """
1414
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1415
    joint fitting job given by descriptor. If the joint fitting job is standard star fitting, the post standard star fits
1416
    for all the individual exposures also created and submitted. The returned ptable has all of these rows added to the
1417
    table given as input.
1418

1419
    Args:
1420
        descriptor (str): Description of the joint fitting job. Can either be 'science' or 'stdstarfit', 'arc' or 'psfnight',
1421
            or 'flat' or 'nightlyflat'.
1422
        prows (list or array of dict): The rows corresponding to the individual exposure jobs that are
1423
            inputs to the joint fit.
1424
        ptable (Table): The processing table where each row is a processed job.
1425
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1426
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1427
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1428
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1429
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1430
            for testing as though scripts are being submitted. Default is 0 (false).
1431
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1432
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1433
            than failing completely from failed calibrations. Default is False.
1434
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1435
            data products for the script being submitted. If all files exist and this is True,
1436
            then the script will not be submitted. If some files exist and this is True, only the
1437
            subset of the cameras without the final data products will be generated and submitted.
1438
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1439
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1440
            remaining cameras not found to exist.
1441
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1442

1443
    Returns:
1444
        tuple: A tuple containing:
1445

1446
        * ptable, Table. The same processing table as input except with added rows for the joint fit job and, in the case
1447
          of a stdstarfit, the poststdstar science exposure jobs.
1448
        * joint_prow, dict. Row of a processing table corresponding to the joint fit job.
1449
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1450
    """
NEW
1451
    log = get_logger()
×
NEW
1452
    if len(prows) < 1:
×
NEW
1453
        return ptable, None, internal_id
×
1454

NEW
1455
    if descriptor is None:
×
NEW
1456
        return ptable, None
×
NEW
1457
    elif descriptor == 'arc':
×
NEW
1458
        descriptor = 'psfnight'
×
NEW
1459
    elif descriptor == 'flat':
×
NEW
1460
        descriptor = 'nightlyflat'
×
1461

NEW
1462
    if descriptor not in ['psfnight', 'nightlyflat']:
×
NEW
1463
        return ptable, None, internal_id
×
1464

NEW
1465
    log.info(" ")
×
NEW
1466
    log.info(f"Joint fit criteria found. Running {descriptor}.\n")
×
1467

NEW
1468
    joint_prow, internal_id = make_joint_prow(prows, descriptor=descriptor, internal_id=internal_id)
×
NEW
1469
    joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1470
                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1471
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
NEW
1472
    ptable.add_row(joint_prow)
×
1473

NEW
1474
    if descriptor in ['psfnight', 'nightlyflat']:
×
NEW
1475
        log.info(f"Setting the calibration exposures as calibrators in the processing table.\n")
×
NEW
1476
        ptable = set_calibrator_flag(prows, ptable)
×
1477

NEW
1478
    return ptable, joint_prow, internal_id
×
1479

1480

1481
#########################################
1482
########     Redshifts     ##############
1483
#########################################
1484
def submit_redshifts(ptable, prows, tnight, internal_id, queue, reservation,
1✔
1485
              dry_run=0, strictly_successful=False,
1486
              check_for_outputs=True, resubmit_partial_complete=True,
1487
              z_submit_types=None, system_name=None):
1488
    """
1489
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1490
    tilenight job given by descriptor. The returned ptable has all of these rows added to the
1491
    table given as input.
1492

1493
    Args:
1494
        ptable (Table): The processing table where each row is a processed job.
1495
        prows (list or array of dict): Unsubmitted prestdstar jobs that are first steps in tilenight.
1496
        tnight (Table.Row): The processing table row of the tilenight job on which the redshifts depend.
1497
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1498
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1499
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1500
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1501
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1502
            for testing as though scripts are being submitted. Default is 0 (false).
1503
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1504
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1505
            than failing completely from failed calibrations. Default is False.
1506
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1507
            data products for the script being submitted. If all files exist and this is True,
1508
            then the script will not be submitted. If some files exist and this is True, only the
1509
            subset of the cameras without the final data products will be generated and submitted.
1510
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1511
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1512
            remaining cameras not found to exist.
1513
        z_submit_types (list of str): The "group" types of redshifts that should be submitted with each
1514
            exposure. If not specified or None, then no redshifts are submitted.
1515
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1516

1517
    Returns:
1518
        tuple: A tuple containing:
1519

1520
        * ptable, Table. The same processing table as input except with added rows for the joint fit job.
1521
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1522
    """
1523
    log = get_logger()
1✔
1524
    if len(prows) < 1 or z_submit_types == None:
1✔
1525
        return ptable, internal_id
1✔
1526

1527
    log.info(" ")
1✔
1528
    log.info(f"Running redshifts.\n")
1✔
1529

1530
    ## Now run redshifts
1531
    zprows = []
1✔
1532
    for row in prows:
1✔
1533
        if row['LASTSTEP'] == 'all':
1✔
1534
            zprows.append(row)
1✔
1535

1536
    if len(zprows) > 0:
1✔
1537
        for zsubtype in z_submit_types:
1✔
1538
            if zsubtype == 'perexp':
1✔
1539
                for zprow in zprows:
×
1540
                    log.info(" ")
×
1541
                    log.info(f"Submitting redshift fit of type {zsubtype} for TILEID {zprow['TILEID']} and EXPID {zprow['EXPID']}.\n")
×
1542
                    redshift_prow = make_redshift_prow([zprow], tnight, descriptor=zsubtype, internal_id=internal_id)
×
1543
                    internal_id += 1
×
1544
                    redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
×
1545
                                                   strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1546
                                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1547
                    ptable.add_row(redshift_prow)
×
1548
            else:
1549
                log.info(" ")
1✔
1550
                log.info(f"Submitting joint redshift fits of type {zsubtype} for TILEID {zprows[0]['TILEID']}.")
1✔
1551
                expids = [prow['EXPID'][0] for prow in zprows]
1✔
1552
                log.info(f"Expids: {expids}.\n")
1✔
1553
                redshift_prow = make_redshift_prow(zprows, tnight, descriptor=zsubtype, internal_id=internal_id)
1✔
1554
                internal_id += 1
1✔
1555
                redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
1✔
1556
                                               strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1557
                                               resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1558
                ptable.add_row(redshift_prow)
1✔
1559

1560
    return ptable, internal_id
1✔
1561

1562
#########################################
1563
########     Tilenight     ##############
1564
#########################################
1565
def submit_tilenight(ptable, prows, calibjobs, internal_id, queue, reservation,
1✔
1566
              dry_run=0, strictly_successful=False, resubmit_partial_complete=True,
1567
              system_name=None, use_specter=False, extra_job_args=None):
1568
    """
1569
    Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
1570
    tilenight job given by descriptor. The returned ptable has all of these rows added to the
1571
    table given as input.
1572

1573
    Args:
1574
        ptable (Table): The processing table where each row is a processed job.
1575
        prows (list or array of dict): Unsubmitted prestdstar jobs that are first steps in tilenight.
1576
        calibjobs (dict): Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
1577
            and 'nightlyflat'. Each key corresponds to a Table.Row or
1578
            None. The table.Row() values are for the corresponding
1579
            calibration job.
1580
        internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
1581
        queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1582
        reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1583
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1584
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1585
            for testing as though scripts are being submitted. Default is 0 (false).
1586
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1587
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1588
            than failing completely from failed calibrations. Default is False.
1589
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1590
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1591
            remaining cameras not found to exist.
1592
        system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
1593
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
1594
        extra_job_args (dict): Dictionary with key-value pairs that specify additional
1595
            information used for a specific type of job. Examples include
1596
            laststeps for for tilenight, etc.
1597

1598
    Returns:
1599
        tuple: A tuple containing:
1600

1601
        * ptable, Table. The same processing table as input except with added rows for the joint fit job.
1602
        * tnight_prow, dict. Row of a processing table corresponding to the tilenight job.
1603
        * internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1604
    """
1605
    log = get_logger()
1✔
1606
    if len(prows) < 1:
1✔
1607
        return ptable, None, internal_id
×
1608

1609
    log.info(" ")
1✔
1610
    log.info(f"Running tilenight.\n")
1✔
1611

1612
    tnight_prow = make_tnight_prow(prows, calibjobs, internal_id=internal_id)
1✔
1613
    internal_id += 1
1✔
1614
    tnight_prow = create_and_submit(tnight_prow, queue=queue, reservation=reservation, dry_run=dry_run,
1✔
1615
                                   strictly_successful=strictly_successful, check_for_outputs=False,
1616
                                   resubmit_partial_complete=resubmit_partial_complete, system_name=system_name,
1617
                                   use_specter=use_specter, extra_job_args=extra_job_args)
1618
    ptable.add_row(tnight_prow)
1✔
1619

1620
    return ptable, tnight_prow, internal_id
1✔
1621

1622
## wrapper functions for joint fitting
1623
def science_joint_fit(ptable, sciences, internal_id, queue='realtime', reservation=None,
1✔
1624
                      z_submit_types=None, dry_run=0, strictly_successful=False,
1625
                      check_for_outputs=True, resubmit_partial_complete=True,
1626
                      system_name=None):
1627
    """
1628
    Wrapper function for desiproc.workflow.processing.joint_fit specific to the stdstarfit joint fit and redshift fitting.
1629

1630
    All variables are the same except::
1631

1632
        Arg 'sciences' is mapped to the prows argument of joint_fit.
1633
        The joint_fit argument descriptor is pre-defined as 'science'.
1634
    """
1635
    return joint_fit(ptable=ptable, prows=sciences, internal_id=internal_id, queue=queue, reservation=reservation,
×
1636
                     descriptor='science', z_submit_types=z_submit_types, dry_run=dry_run,
1637
                     strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
1638
                     resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
1639

1640

1641
def flat_joint_fit(ptable, flats, internal_id, queue='realtime',
1✔
1642
                   reservation=None, dry_run=0, strictly_successful=False,
1643
                   check_for_outputs=True, resubmit_partial_complete=True,
1644
                   system_name=None):
1645
    """
1646
    Wrapper function for desiproc.workflow.processing.joint_fit specific to the nightlyflat joint fit.
1647

1648
    All variables are the same except::
1649

1650
        Arg 'flats' is mapped to the prows argument of joint_fit.
1651
        The joint_fit argument descriptor is pre-defined as 'nightlyflat'.
1652
    """
1653
    return joint_fit(ptable=ptable, prows=flats, internal_id=internal_id, queue=queue, reservation=reservation,
×
1654
                     descriptor='nightlyflat', dry_run=dry_run, strictly_successful=strictly_successful,
1655
                     check_for_outputs=check_for_outputs, resubmit_partial_complete=resubmit_partial_complete,
1656
                     system_name=system_name)
1657

1658

1659
def arc_joint_fit(ptable, arcs, internal_id, queue='realtime',
1✔
1660
                  reservation=None, dry_run=0, strictly_successful=False,
1661
                  check_for_outputs=True, resubmit_partial_complete=True,
1662
                  system_name=None):
1663
    """
1664
    Wrapper function for desiproc.workflow.processing.joint_fit specific to the psfnight joint fit.
1665

1666
    All variables are the same except::
1667

1668
        Arg 'arcs' is mapped to the prows argument of joint_fit.
1669
        The joint_fit argument descriptor is pre-defined as 'psfnight'.
1670
    """
1671
    return joint_fit(ptable=ptable, prows=arcs, internal_id=internal_id, queue=queue, reservation=reservation,
×
1672
                     descriptor='psfnight', dry_run=dry_run, strictly_successful=strictly_successful,
1673
                     check_for_outputs=check_for_outputs, resubmit_partial_complete=resubmit_partial_complete,
1674
                     system_name=system_name)
1675

1676
def make_joint_prow(prows, descriptor, internal_id):
1✔
1677
    """
1678
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
1679
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
1680
    input prows).
1681

1682
    Args:
1683
        prows, list or array of dicts. The rows corresponding to the individual exposure jobs that are
1684
            inputs to the joint fit.
1685
        descriptor, str. Description of the joint fitting job. Can either be 'stdstarfit', 'psfnight', or 'nightlyflat'.
1686
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1687

1688
    Returns:
1689
        dict: Row of a processing table corresponding to the joint fit job.
1690
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1691
    """
1692
    first_row = table_row_to_dict(prows[0])
1✔
1693
    joint_prow = first_row.copy()
1✔
1694

1695
    joint_prow['INTID'] = internal_id
1✔
1696
    internal_id += 1
1✔
1697
    joint_prow['JOBDESC'] = descriptor
1✔
1698
    joint_prow['LATEST_QID'] = -99
1✔
1699
    joint_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
1✔
1700
    joint_prow['SUBMIT_DATE'] = -99
1✔
1701
    joint_prow['STATUS'] = 'U'
1✔
1702
    joint_prow['SCRIPTNAME'] = ''
1✔
1703
    joint_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int)
1✔
1704

1705
    ## Assign the PROCCAMWORD based on the descriptor and the input exposures
1706
    if descriptor == 'stdstarfit':
1✔
1707
        pcamwords = [prow['PROCCAMWORD'] for prow in prows]
×
1708
        joint_prow['PROCCAMWORD'] = camword_union(pcamwords,
×
1709
                                                  full_spectros_only=True)
1710
    else:
1711
        ## For arcs and flats, a BADAMP takes out the camera, so remove those
1712
        ## cameras from the proccamword
1713
        pcamwords = []
1✔
1714
        for prow in prows:
1✔
1715
            if len(prow['BADAMPS']) > 0:
1✔
1716
                badcams = []
×
1717
                for (camera, petal, amplifier) in parse_badamps(prow['BADAMPS']):
×
1718
                    badcams.append(f'{camera}{petal}')
×
1719
                badampcamword = create_camword(list(set(badcams)))
×
1720
                pcamword = difference_camwords(prow['PROCCAMWORD'], badampcamword)
×
1721
            else:
1722
                pcamword = prow['PROCCAMWORD']
1✔
1723
            pcamwords.append(pcamword)
1✔
1724

1725
        ## For flats we want any camera that exists in all 12 exposures
1726
        ## For arcs we want any camera that exists in at least 3 exposures
1727
        if descriptor == 'nightlyflat':
1✔
1728
            joint_prow['PROCCAMWORD'] = camword_intersection(pcamwords,
1✔
1729
                                                             full_spectros_only=False)
1730
        elif descriptor == 'psfnight':
1✔
1731
            ## Count number of exposures each camera is present for
1732
            camcheck = {}
1✔
1733
            for camword in pcamwords:
1✔
1734
                for cam in decode_camword(camword):
1✔
1735
                    if cam in camcheck:
1✔
1736
                        camcheck[cam] += 1
1✔
1737
                    else:
1738
                        camcheck[cam] = 1
1✔
1739
            ## if exists in 3 or more exposures, then include it
1740
            goodcams = []
1✔
1741
            for cam,camcount in camcheck.items():
1✔
1742
                if camcount >= 3:
1✔
1743
                    goodcams.append(cam)
1✔
1744
            joint_prow['PROCCAMWORD'] = create_camword(goodcams)
1✔
1745

1746
    joint_prow = assign_dependency(joint_prow, dependency=prows)
1✔
1747
    return joint_prow, internal_id
1✔
1748

1749
def make_exposure_prow(erow, int_id, calibjobs, jobdesc=None):
1✔
1750
    prow = erow_to_prow(erow)
1✔
1751
    prow['INTID'] = int_id
1✔
1752
    int_id += 1
1✔
1753
    if jobdesc is None:
1✔
1754
        prow['JOBDESC'] = prow['OBSTYPE']
1✔
1755
    else:
1756
        prow['JOBDESC'] = jobdesc
1✔
1757
    prow = define_and_assign_dependency(prow, calibjobs)
1✔
1758
    return prow, int_id
1✔
1759

1760
def make_tnight_prow(prows, calibjobs, internal_id):
1✔
1761
    """
1762
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
1763
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
1764
    input prows).
1765

1766
    Args:
1767
        prows, list or array of dicts. Unsumbitted rows corresponding to the individual prestdstar jobs that are
1768
            the first steps of tilenight.
1769
        calibjobs, dict. Dictionary containing keys that each corresponds to a Table.Row or
1770
            None, with each table.Row() value corresponding to a calibration job
1771
            on which the tilenight job depends.
1772
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1773

1774
    Returns:
1775
        dict: Row of a processing table corresponding to the tilenight job.
1776
    """
1777
    first_row = table_row_to_dict(prows[0])
1✔
1778
    joint_prow = first_row.copy()
1✔
1779

1780
    joint_prow['INTID'] = internal_id
1✔
1781
    joint_prow['JOBDESC'] = 'tilenight'
1✔
1782
    joint_prow['LATEST_QID'] = -99
1✔
1783
    joint_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
1✔
1784
    joint_prow['SUBMIT_DATE'] = -99
1✔
1785
    joint_prow['STATUS'] = 'U'
1✔
1786
    joint_prow['SCRIPTNAME'] = ''
1✔
1787
    joint_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int)
1✔
1788

1789
    joint_prow = define_and_assign_dependency(joint_prow,calibjobs,use_tilenight=True)
1✔
1790

1791
    return joint_prow
1✔
1792

1793
def make_redshift_prow(prows, tnight, descriptor, internal_id):
1✔
1794
    """
1795
    Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
1796
    It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
1797
    input prows).
1798

1799
    Args:
1800
        prows, list or array of dicts. Unsumbitted rows corresponding to the individual prestdstar jobs that are
1801
            the first steps of tilenight.
1802
        tnight, Table.Row object. Row corresponding to the tilenight job on which the redshift job depends.
1803
        internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
1804

1805
    Returns:
1806
        dict: Row of a processing table corresponding to the tilenight job.
1807
    """
1808
    first_row = table_row_to_dict(prows[0])
1✔
1809
    redshift_prow = first_row.copy()
1✔
1810

1811
    redshift_prow['INTID'] = internal_id
1✔
1812
    redshift_prow['JOBDESC'] = descriptor
1✔
1813
    redshift_prow['LATEST_QID'] = -99
1✔
1814
    redshift_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
1✔
1815
    redshift_prow['SUBMIT_DATE'] = -99
1✔
1816
    redshift_prow['STATUS'] = 'U'
1✔
1817
    redshift_prow['SCRIPTNAME'] = ''
1✔
1818
    redshift_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int)
1✔
1819

1820
    redshift_prow = assign_dependency(redshift_prow,dependency=tnight)
1✔
1821

1822
    return redshift_prow
1✔
1823

1824
def checkfor_and_submit_joint_job(ptable, arcs, flats, sciences, calibjobs,
1✔
1825
                                  lasttype, internal_id, z_submit_types=None, dry_run=0,
1826
                                  queue='realtime', reservation=None, strictly_successful=False,
1827
                                  check_for_outputs=True, resubmit_partial_complete=True,
1828
                                  system_name=None):
1829
    """
1830
    Takes all the state-ful data from daily processing and determines whether a joint fit needs to be submitted. Places
1831
    the decision criteria into a single function for easier maintainability over time. These are separate from the
1832
    new standard manifest*.json method of indicating a calibration sequence is complete. That is checked independently
1833
    elsewhere and doesn't interact with this.
1834

1835
    Args:
1836
        ptable (Table): Processing table of all exposures that have been processed.
1837
        arcs (list of dict): list of the individual arc jobs to be used for the psfnight (NOT all
1838
            the arcs, if multiple sets existed). May be empty if none identified yet.
1839
        flats (list of dict): list of the individual flat jobs to be used for the nightlyflat (NOT
1840
            all the flats, if multiple sets existed). May be empty if none identified yet.
1841
        sciences (list of dict): list of the most recent individual prestdstar science exposures
1842
            (if currently processing that tile). May be empty if none identified yet.
1843
        calibjobs (dict): Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
1844
            and 'nightlyflat'. Each key corresponds to a Table.Row or
1845
            None. The table.Row() values are for the corresponding
1846
            calibration job.
1847
        lasttype (str or None): the obstype of the last individual exposure row to be processed.
1848
        internal_id (int): an internal identifier unique to each job. Increments with each new job. This
1849
            is the smallest unassigned value.
1850
        z_submit_types (list of str): The "group" types of redshifts that should be submitted with each
1851
            exposure. If not specified or None, then no redshifts are submitted.
1852
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1853
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1854
            for testing as though scripts are being submitted. Default is 0 (false).
1855
        queue (str, optional): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1856
        reservation (str, optional): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1857
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1858
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1859
            than failing completely from failed calibrations. Default is False.
1860
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1861
            data products for the script being submitted. If all files exist and this is True,
1862
            then the script will not be submitted. If some files exist and this is True, only the
1863
            subset of the cameras without the final data products will be generated and submitted.
1864
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1865
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1866
            remaining cameras not found to exist.
1867
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, permutter-gpu
1868

1869
    Returns:
1870
        tuple: A tuple containing:
1871

1872
        * ptable, Table, Processing table of all exposures that have been processed.
1873
        * calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
1874
          and 'nightlyflat'. Each key corresponds to a Table.Row or
1875
          None. The table.Row() values are for the corresponding
1876
          calibration job.
1877
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
1878
          (if currently processing that tile). May be empty if none identified yet or
1879
          we just submitted them for processing.
1880
        * internal_id, int, if no job is submitted, this is the same as the input, otherwise it is incremented upward from
1881
          from the input such that it represents the smallest unused ID.
1882
    """
1883
    if lasttype == 'science' and len(sciences) > 0:
×
1884
        log = get_logger()
×
1885
        skysubonly = np.array([sci['LASTSTEP'] == 'skysub' for sci in sciences])
×
1886
        if np.all(skysubonly):
×
1887
            log.error("Identified all exposures in joint fitting request as skysub-only. Not submitting")
×
1888
            sciences = []
×
1889
            return ptable, calibjobs, sciences, internal_id
×
1890

1891
        if np.any(skysubonly):
×
1892
            log.error("Identified skysub-only exposures in joint fitting request")
×
1893
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
1894
            log.info("LASTSTEP's: {}".format([row['LASTSTEP'] for row in sciences]))
×
1895
            sciences = (np.array(sciences,dtype=object)[~skysubonly]).tolist()
×
1896
            log.info("Removed skysub only exposures in joint fitting:")
×
1897
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
1898
            log.info("LASTSTEP's: {}".format([row['LASTSTEP'] for row in sciences]))
×
1899

1900
        from collections import Counter
×
1901
        tiles = np.array([sci['TILEID'] for sci in sciences])
×
1902
        counts = Counter(tiles)
×
1903
        if len(counts.most_common()) > 1:
×
1904
            log.error("Identified more than one tile in a joint fitting request")
×
1905
            log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
×
1906
            log.info("Tileid's: {}".format(tiles))
×
1907
            log.info("Returning without joint fitting any of these exposures.")
×
1908
            # most_common, nmost_common = counts.most_common()[0]
1909
            # if most_common == -99:
1910
            #     most_common, nmost_common = counts.most_common()[1]
1911
            # log.warning(f"Given multiple tiles to jointly fit: {counts}. "+
1912
            #             "Only processing the most common non-default " +
1913
            #             f"tile: {most_common} with {nmost_common} exposures")
1914
            # sciences = (np.array(sciences,dtype=object)[tiles == most_common]).tolist()
1915
            # log.info("Tiles and exposure id's being submitted for joint fitting:")
1916
            # log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
1917
            # log.info("Tileid's: {}".format([row['TILEID'] for row in sciences]))
1918
            sciences = []
×
1919
            return ptable, calibjobs, sciences, internal_id
×
1920

1921
        ptable, tilejob, internal_id = science_joint_fit(ptable, sciences, internal_id, z_submit_types=z_submit_types,
×
1922
                                                         dry_run=dry_run, queue=queue, reservation=reservation,
1923
                                                         strictly_successful=strictly_successful,
1924
                                                         check_for_outputs=check_for_outputs,
1925
                                                         resubmit_partial_complete=resubmit_partial_complete,
1926
                                                         system_name=system_name)
1927
        if tilejob is not None:
×
1928
            sciences = []
×
1929

1930
    elif lasttype == 'flat' and calibjobs['nightlyflat'] is None and len(flats) == 12:
×
1931
        ## Note here we have an assumption about the number of expected flats being greater than 11
1932
        ptable, calibjobs['nightlyflat'], internal_id \
×
1933
            = flat_joint_fit(ptable, flats, internal_id, dry_run=dry_run, queue=queue,
1934
                             reservation=reservation, strictly_successful=strictly_successful,
1935
                             check_for_outputs=check_for_outputs,
1936
                             resubmit_partial_complete=resubmit_partial_complete,
1937
                             system_name=system_name
1938
                            )
1939

1940
    elif lasttype == 'arc' and calibjobs['psfnight'] is None and len(arcs) == 5:
×
1941
        ## Note here we have an assumption about the number of expected arcs being greater than 4
1942
        ptable, calibjobs['psfnight'], internal_id \
×
1943
            = arc_joint_fit(ptable, arcs, internal_id, dry_run=dry_run, queue=queue,
1944
                            reservation=reservation, strictly_successful=strictly_successful,
1945
                            check_for_outputs=check_for_outputs,
1946
                            resubmit_partial_complete=resubmit_partial_complete,
1947
                            system_name=system_name
1948
                            )
1949
    return ptable, calibjobs, sciences, internal_id
×
1950

1951
def submit_tilenight_and_redshifts(ptable, sciences, calibjobs, internal_id, dry_run=0,
1✔
1952
                                  queue='realtime', reservation=None, strictly_successful=False,
1953
                                  check_for_outputs=True, resubmit_partial_complete=True,
1954
                                  system_name=None,use_specter=False, extra_job_args=None):
1955
    """
1956
    Takes all the state-ful data from daily processing and determines whether a tilenight job needs to be submitted.
1957

1958
    Args:
1959
        ptable (Table): Processing table of all exposures that have been processed.
1960
        sciences (list of dict): list of the most recent individual prestdstar science exposures
1961
            (if currently processing that tile). May be empty if none identified yet.
1962
        internal_id (int): an internal identifier unique to each job. Increments with each new job. This
1963
            is the smallest unassigned value.
1964
        dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
1965
            dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
1966
            for testing as though scripts are being submitted. Default is 0 (false).
1967
        queue (str, optional): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
1968
        reservation (str, optional): The reservation to submit jobs to. If None, it is not submitted to a reservation.
1969
        strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
1970
            less desirable because e.g. the sciences can run with SVN default calibrations rather
1971
            than failing completely from failed calibrations. Default is False.
1972
        check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
1973
            data products for the script being submitted. If all files exist and this is True,
1974
            then the script will not be submitted. If some files exist and this is True, only the
1975
            subset of the cameras without the final data products will be generated and submitted.
1976
        resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
1977
            jobs with some prior data are pruned using PROCCAMWORD to only process the
1978
            remaining cameras not found to exist.
1979
        system_name (str): batch system name, e.g. cori-haswell, cori-knl, permutter-gpu
1980
        use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
1981
        extra_job_args (dict, optional): Dictionary with key-value pairs that specify additional
1982
            information used for a specific type of job. Examples include
1983
            laststeps for tilenight, z_submit_types for redshifts, etc.
1984

1985
    Returns:
1986
        tuple: A tuple containing:
1987

1988
        * ptable, Table, Processing table of all exposures that have been processed.
1989
        * sciences, list of dicts, list of the most recent individual prestdstar science exposures
1990
          (if currently processing that tile). May be empty if none identified yet or
1991
          we just submitted them for processing.
1992
        * internal_id, int, if no job is submitted, this is the same as the input, otherwise it is incremented upward from
1993
          from the input such that it represents the smallest unused ID.
1994
    """
1995
    ptable, tnight, internal_id = submit_tilenight(ptable, sciences, calibjobs, internal_id,
1✔
1996
                                             queue=queue, reservation=reservation,
1997
                                             dry_run=dry_run, strictly_successful=strictly_successful,
1998
                                             resubmit_partial_complete=resubmit_partial_complete,
1999
                                             system_name=system_name,use_specter=use_specter,
2000
                                             extra_job_args=extra_job_args)
2001

2002
    z_submit_types = None
1✔
2003
    if 'z_submit_types'  in extra_job_args:
1✔
2004
        z_submit_types = extra_job_args['z_submit_types']
1✔
2005
        
2006
    ptable, internal_id = submit_redshifts(ptable, sciences, tnight, internal_id,
1✔
2007
                                    queue=queue, reservation=reservation,
2008
                                    dry_run=dry_run, strictly_successful=strictly_successful,
2009
                                    check_for_outputs=check_for_outputs,
2010
                                    resubmit_partial_complete=resubmit_partial_complete,
2011
                                    z_submit_types=z_submit_types,
2012
                                    system_name=system_name)
2013

2014
    if tnight is not None:
1✔
2015
        sciences = []
1✔
2016

2017
    return ptable, sciences, internal_id
1✔
2018

2019
def set_calibrator_flag(prows, ptable):
1✔
2020
    """
2021
    Sets the "CALIBRATOR" column of a procesing table row to 1 (integer representation of True)
2022
     for all input rows. Used within joint fitting code to flag the exposures that were input
2023
     to the psfnight or nightlyflat for later reference.
2024

2025
    Args:
2026
        prows, list or array of Table.Rows or dicts. The rows corresponding to the individual exposure jobs that are
2027
            inputs to the joint fit.
2028
        ptable, Table. The processing table where each row is a processed job.
2029

2030
    Returns:
2031
        Table: The same processing table as input except with added rows for the joint fit job and, in the case
2032
        of a stdstarfit, the poststdstar science exposure jobs.
2033
    """
2034
    for prow in prows:
1✔
2035
        ptable['CALIBRATOR'][ptable['INTID'] == prow['INTID']] = 1
1✔
2036
    return ptable
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc