• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

desihub / desispec / 8334186034

18 Mar 2024 10:00PM UTC coverage: 28.093% (+3.0%) from 25.113%
8334186034

Pull #2187

github

akremin
only complain about existing jobs if they failed
Pull Request #2187: Introduce desi_proc_night to unify and simplify processing scripts

753 of 1123 new or added lines in 21 files covered. (67.05%)

1066 existing lines in 11 files now uncovered.

13160 of 46844 relevant lines covered (28.09%)

0.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.59
/py/desispec/scripts/proc_night.py
1
"""
2
desispec.scripts.proc_night
3
=============================
4

5
"""
6
from desispec.io import findfile
1✔
7
from desispec.scripts.link_calibnight import derive_include_exclude
1✔
8
from desispec.workflow.calibration_selection import \
1✔
9
    determine_calibrations_to_proc
10
from desispec.workflow.science_selection import determine_science_to_proc, \
1✔
11
    get_tiles_cumulative
12
from desiutil.log import get_logger
1✔
13
import numpy as np
1✔
14
import os
1✔
15
import sys
1✔
16
import time
1✔
17
import re
1✔
18
from astropy.table import Table, vstack
1✔
19

20
## Import some helper functions, you can see their definitions by uncomenting the bash shell command
21
from desispec.scripts.update_exptable import update_exposure_table
1✔
22
from desispec.workflow.tableio import load_tables, write_table
1✔
23
from desispec.workflow.utils import sleep_and_report, \
1✔
24
    verify_variable_with_environment, load_override_file
25
from desispec.workflow.timing import what_night_is_it, during_operating_hours
1✔
26
from desispec.workflow.exptable import get_last_step_options
1✔
27
from desispec.workflow.proctable import default_obstypes_for_proctable, \
1✔
28
    erow_to_prow, default_prow
29
from desispec.workflow.processing import define_and_assign_dependency, \
1✔
30
    create_and_submit, \
31
    submit_tilenight_and_redshifts, \
32
    generate_calibration_dict, \
33
    night_to_starting_iid, make_joint_prow, \
34
    set_calibrator_flag, make_exposure_prow, \
35
update_calibjobs_with_linking, all_calibs_submitted
36
from desispec.workflow.queue import update_from_queue, any_jobs_failed
1✔
37
from desispec.io.util import decode_camword, difference_camwords, \
1✔
38
    create_camword, replace_prefix
39

40

41
def proc_night(night=None, proc_obstypes=None, z_submit_types=None,
1✔
42
               queue=None, reservation=None, system_name=None,
43
               exp_table_pathname=None, proc_table_pathname=None,
44
               override_pathname=None, update_exptable=False,
45
               dry_run_level=0, dry_run=False, no_redshifts=False,
46
               ignore_proc_table_failures = False,
47
               dont_check_job_outputs=False, dont_resubmit_partial_jobs=False,
48
               tiles=None, surveys=None, science_laststeps=None,
49
               all_tiles=False, specstatus_path=None, use_specter=False,
50
               no_cte_flats=False, complete_tiles_thrunight=None,
51
               all_cumulatives=False, daily=False, specprod=None,
52
               path_to_data=None, exp_obstypes=None, camword=None,
53
               badcamword=None, badamps=None, exps_to_ignore=None,
54
               sub_wait_time=0.5, verbose=False, dont_require_cals=False,
55
               psf_linking_without_fflat=False):
56
    """
57
    Process some or all exposures on a night. Can be used to process an entire
58
    night, or used to process data currently available on a given night using
59
    the '--daily' flag.
60

61
    Args:
62
        night (int): The night of data to be processed. Exposure table must exist.
63
        proc_obstypes (list or np.array, optional): A list of exposure OBSTYPE's
64
            that should be processed (and therefore added to the processing table).
65
        z_submit_types (list of str):
66
            The "group" types of redshifts that should be submitted with each
67
            exposure. If not specified, default for daily processing is
68
            ['cumulative', 'pernight-v0']. If false, 'false', or [], then no
69
            redshifts are submitted.
70
        queue (str, optional): The name of the queue to submit the jobs to.
71
            Default is "realtime".
72
        reservation (str, optional): The reservation to submit jobs to.
73
            If None, it is not submitted to a reservation.
74
        system_name (str): batch system name, e.g. cori-haswell, cori-knl,
75
            perlmutter-gpu
76
        exp_table_pathname (str): Full path to where to exposure tables are stored,
77
            including file name.
78
        proc_table_pathname (str): Full path to where to processing tables to be
79
            written, including file name
80
        override_pathname (str): Full path to the override file.
81
        update_exptable (bool): If true then the exposure table is updated.
82
            The default is False.
83
        dry_run_level (int, optional): If nonzero, this is a simulated run.
84
            If dry_run_level=1 the scripts will be written but not submitted.
85
            If dry_run_level=2, the scripts will not be written nor submitted
86
            but the processing_table is still created.
87
            If dry_run_level=3, no output files are written.
88
            Logging will remain the same for testing as though scripts are
89
            being submitted. Default is 0 (false).
90
        dry_run (bool, optional): When to run without submitting scripts or
91
            not. If dry_run_level is defined, then it over-rides this flag.
92
            dry_run_level not set and dry_run=True, dry_run_level is set to 2
93
            (no scripts generated or run). Default for dry_run is False.
94
        no_redshifts (bool, optional): Whether to submit redshifts or not.
95
            If True, redshifts are not submitted.
96
        ignore_proc_table_failures (bool, optional): True if you want to submit
97
            other jobs even the loaded processing table has incomplete jobs in
98
            it. Use with caution. Default is False.
99
        dont_check_job_outputs (bool, optional): Default is False. If False,
100
            the code checks for the existence of the expected final data
101
            products for the script being submitted. If all files exist and
102
            this is False, then the script will not be submitted. If some
103
            files exist and this is False, only the subset of the cameras
104
            without the final data products will be generated and submitted.
105
        dont_resubmit_partial_jobs (bool, optional): Default is False. Must be
106
            used with dont_check_job_outputs=False. If this flag is False, jobs
107
            with some prior data are pruned using PROCCAMWORD to only process
108
            the remaining cameras not found to exist.
109
        tiles (array-like, optional): Only submit jobs for these TILEIDs.
110
        surveys (array-like, optional): Only submit science jobs for these
111
            surveys (lowercase)
112
        science_laststeps (array-like, optional): Only submit jobs for exposures
113
            with LASTSTEP in these science_laststeps (lowercase)
114
        all_tiles (bool, optional): Default is False. Set to NOT restrict to
115
            completed tiles as defined by the table pointed to by specstatus_path.
116
        specstatus_path (str, optional): Default is
117
            $DESI_SURVEYOPS/ops/tiles-specstatus.ecsv. Location of the
118
            surveyops specstatus table.
119
        use_specter (bool, optional): Default is False. If True, use specter,
120
            otherwise use gpu_specter by default.
121
        no_cte_flats (bool, optional): Default is False. If False, cte flats
122
            are used if available to correct for cte effects.
123
        complete_tiles_thrunight (int, optional): Default is None. Only tiles
124
            completed on or before the supplied YYYYMMDD are considered
125
            completed and will be processed. All complete tiles are submitted
126
            if None or all_tiles is True.
127
        all_cumulatives (bool, optional): Default is False. Set to run
128
            cumulative redshifts for all tiles even if the tile has observations
129
            on a later night.
130
        specprod: str. The name of the current production. If used, this will
131
            overwrite the SPECPROD environment variable.
132
        daily: bool. Flag that sets other flags for running this script for the
133
            daily pipeline.
134
        path_to_data: str. Path to the raw data.
135
        exp_obstypes: str or comma separated list of strings. The exposure
136
            OBSTYPE's that you want to include in the exposure table.
137
        camword: str. Camword that, if set, alters the set of cameras that will
138
            be set for processing. Examples: a0123456789, a1, a2b3r3,
139
            a2b3r4z3. Note this is only true for new exposures being
140
            added to the exposure_table in 'daily' mode.
141
        badcamword: str. Camword that, if set, will be removed from the camword
142
            defined in camword if given, or the camword inferred from
143
            the data if camword is not given. Note this is only true
144
            for new exposures being added to the exposure_table
145
            in 'daily' mode.
146
        badamps: str. Comma seperated list of bad amplifiers that should not
147
            be processed. Should be of the form "{camera}{petal}{amp}",
148
            i.e. "[brz][0-9][ABCD]". Example: 'b7D,z8A'. Note this is
149
            only true for new exposures being added to the
150
            exposure_table in 'daily' mode.
151
        sub_wait_time: int. Wait time in seconds between submission loops.
152
            Default 0.5 seconds.
153
        verbose: bool. True if you want more verbose output, false otherwise.
154
            Current not propagated to lower code, so it is only used in the
155
            main daily_processing script itself.
156
        dont_require_cals: bool. Default False. If set then the code doesn't
157
            require either a valid set of calibrations or a valid override file
158
            to link to calibrations in order to proceed with science processing.
159
        psf_linking_without_fflat: bool. Default False. If set then the code
160
            will NOT raise an error if asked to link psfnight calibrations
161
            without fiberflatnight calibrations.
162
    """
163
    ## Get logger
164
    log = get_logger()
1✔
165

166
    ## Inform user of how some parameters will be used
167
    if camword is not None:
1✔
NEW
168
        log.info(f"Note custom {camword=} will only be used for new exposures"
×
169
                 f" being entered into the exposure_table, not all exposures"
170
                 f" to be processed.")
171
    if badcamword is not None:
1✔
NEW
172
        log.info(f"Note custom {badcamword=} will only be used for new exposures"
×
173
                 f" being entered into the exposure_table, not all exposures"
174
                 f" to be processed.")
175
    if badamps is not None:
1✔
NEW
176
        log.info(f"Note custom {badamps=} will only be used for new exposures"
×
177
                 f" being entered into the exposure_table, not all exposures"
178
                 f" to be processed.")
179

180
    ## Reconcile the dry_run and dry_run_level
181
    if dry_run and dry_run_level == 0:
1✔
NEW
182
        dry_run_level = 2
×
183
    elif dry_run_level > 0:
1✔
184
        dry_run = True
1✔
185

186
    ## Set a flag to determine whether to process the last tile in the exposure table
187
    ## or not. This is used in daily mode when processing and exiting mid-night.
188
    still_acquiring = False
1✔
189
    
190
    ## If running in daily mode, change a bunch of defaults
191
    if daily:
1✔
192
        ## What night are we running on?
NEW
193
        true_night = what_night_is_it()
×
NEW
194
        if night is not None:
×
NEW
195
            night = int(night)
×
NEW
196
            if true_night != night:
×
NEW
197
                log.info(f"True night is {true_night}, but running for {night=}")
×
198
        else:
NEW
199
            night = true_night
×
200

NEW
201
        if science_laststeps is None:
×
NEW
202
            science_laststeps = ['all', 'skysub', 'fluxcal']
×
203

NEW
204
        if z_submit_types is None and not no_redshifts:
×
NEW
205
            z_submit_types = ['cumulatives']
×
206

NEW
207
        if during_operating_hours(dry_run=dry_run) and (true_night == night):
×
NEW
208
            still_acquiring = True
×
209

NEW
210
        update_exptable = True    
×
NEW
211
        append_to_proc_table = True
×
NEW
212
        all_cumulatives = True
×
NEW
213
        all_tiles = True
×
NEW
214
        complete_tiles_thrunight = None
×
215
        ## Default for nightly processing is realtime queue
NEW
216
        if queue is None:
×
NEW
217
            queue = 'realtime'
×
218

219
    ## Default for normal processing is regular queue
220
    if queue is None:
1✔
221
        queue = 'regular'
1✔
222
    log.info(f"Submitting to the {queue} queue.")
1✔
223
             
224
    ## Set night
225
    if night is None:
1✔
NEW
226
        err = "Must specify night unless running in daily=True mode"
×
NEW
227
        log.error(err)
×
NEW
228
        raise ValueError(err)
×
229
    else:
230
        log.info(f"Processing {night=}")
1✔
231

232
    ## Recast booleans from double negative
233
    check_for_outputs = (not dont_check_job_outputs)
1✔
234
    resubmit_partial_complete = (not dont_resubmit_partial_jobs)
1✔
235
    require_cals = (not dont_require_cals)
1✔
236
    do_cte_flats = (not no_cte_flats)
1✔
237
    
238
    ## cte flats weren't available before 20211130 so hardcode that in
239
    if do_cte_flats and night < 20211130:
1✔
NEW
240
        log.warning("Asked to do cte flat correction but before 20211130 no "
×
241
                    + "no cte flats are available to do the correction. "
242
                    + "Code will NOT perform cte flat corrections.")
NEW
243
        do_cte_flats = False
×
244

245
    ###################
246
    ## Set filenames ##
247
    ###################
248
    ## Ensure specprod is set in the environment and that it matches user
249
    ## specified value if given
250
    specprod = verify_variable_with_environment(specprod, var_name='specprod',
1✔
251
                                                env_name='SPECPROD')
252

253
    ## Determine where the exposure table will be written
254
    if exp_table_pathname is None:
1✔
255
        exp_table_pathname = findfile('exposure_table', night=night)
1✔
256
    if not os.path.exists(exp_table_pathname) and not update_exptable:
1✔
NEW
257
        raise IOError(f"Exposure table: {exp_table_pathname} not found. Exiting this night.")
×
258

259
    ## Determine where the processing table will be written
260
    if proc_table_pathname is None:
1✔
261
        proc_table_pathname = findfile('processing_table', night=night)
1✔
262
    proc_table_path = os.path.dirname(proc_table_pathname)
1✔
263
    if dry_run_level < 3:
1✔
264
        os.makedirs(proc_table_path, exist_ok=True)
1✔
265

266
    ## Determine where the unprocessed data table will be written
267
    unproc_table_pathname = replace_prefix(proc_table_pathname, 'processing', 'unprocessed')
1✔
268

269
    ## Require cal_override to exist if explcitly specified
270
    if override_pathname is None:
1✔
271
        override_pathname = findfile('override', night=night)
1✔
NEW
272
    elif not os.path.exists(override_pathname):
×
NEW
273
        raise IOError(f"Specified override file: "
×
274
                      f"{override_pathname} not found. Exiting this night.")
275

276
    #######################################
277
    ## Define parameters based on inputs ##
278
    #######################################
279
    ## If science_laststeps not defined, default is only LASTSTEP=='all' exposures
280
    if science_laststeps is None:
1✔
281
        science_laststeps = ['all']
1✔
282
    else:
NEW
283
        laststep_options = get_last_step_options()
×
NEW
284
        for laststep in science_laststeps:
×
NEW
285
            if laststep not in laststep_options:
×
NEW
286
                raise ValueError(f"Couldn't understand laststep={laststep} "
×
287
                                 + f"in science_laststeps={science_laststeps}.")
288
    log.info(f"Processing exposures with the following LASTSTEP's: {science_laststeps}")
1✔
289

290
    ## Define the group types of redshifts you want to generate for each tile
291
    if no_redshifts:
1✔
NEW
292
        log.info(f"no_redshifts set, so ignoring {z_submit_types=}")
×
NEW
293
        z_submit_types = None
×
294

295
    if z_submit_types is None:
1✔
296
        log.info("Not submitting scripts for redshift fitting")
1✔
297
    else:
298
        for ztype in z_submit_types:
1✔
299
            if ztype not in ['cumulative', 'pernight-v0', 'pernight', 'perexp']:
1✔
NEW
300
                raise ValueError(f"Couldn't understand ztype={ztype} "
×
301
                                 + f"in z_submit_types={z_submit_types}.")
302
        log.info(f"Redshift fitting with redshift group types: {z_submit_types}")
1✔
303

304
    ## Identify OBSTYPES to process
305
    if proc_obstypes is None:
1✔
306
        proc_obstypes = default_obstypes_for_proctable()
1✔
307

308
    #############################
309
    ## Start the Actual Script ##
310
    #############################
311
    ## If running in daily mode, or requested, then update the exposure table
312
    ## This reads in and writes out the exposure table to disk
313
    if update_exptable:
1✔
NEW
314
        log.info("Running update_exposure_table.")
×
NEW
315
        update_exposure_table(night=night, specprod=specprod,
×
316
                              exp_table_pathname=exp_table_pathname,
317
                              path_to_data=path_to_data, exp_obstypes=exp_obstypes,
318
                              camword=camword, badcamword=badcamword, badamps=badamps,
319
                              exps_to_ignore=exps_to_ignore,
320
                              dry_run_level=dry_run_level, verbose=verbose)
NEW
321
        log.info("Done with update_exposure_table.\n\n")
×
322
    ## Combine the table names and types for easier passing to io functions
323
    table_pathnames = [exp_table_pathname, proc_table_pathname]
1✔
324
    table_types = ['exptable', 'proctable']
1✔
325

326
    ## Load in the files defined above
327
    etable, ptable = load_tables(tablenames=table_pathnames, tabletypes=table_types)
1✔
328
    full_etable = etable.copy()
1✔
329

330
    ## Cut on OBSTYPES
331
    log.info(f"Processing the following obstypes: {proc_obstypes}")
1✔
332
    good_types = np.isin(np.array(etable['OBSTYPE']).astype(str), proc_obstypes)
1✔
333
    etable = etable[good_types]
1✔
334

335
    ## Update processing table
336
    tableng = len(ptable)
1✔
337
    if tableng > 0:
1✔
NEW
338
        ptable = update_from_queue(ptable, dry_run=dry_run_level)
×
NEW
339
        if dry_run_level < 3:
×
NEW
340
            write_table(ptable, tablename=proc_table_pathname)
×
NEW
341
        if any_jobs_failed(ptable['STATUS']):
×
NEW
342
            if not ignore_proc_table_failures:
×
NEW
343
                err = "Some jobs have an incomplete job status. This script " \
×
344
                      + "will not fix them. You should remedy those first. "
NEW
345
                log.error(err)
×
346
                ## if the failures are in calibrations, then crash since
347
                ## we need them for any new jobs
NEW
348
                if any_jobs_failed(ptable['STATUS'][ptable['CALIBRATOR'] > 0]):
×
NEW
349
                    err += "To proceed anyway use "
×
NEW
350
                    err += "'--ignore-proc-table-failures'. Exiting."
×
351
                    raise AssertionError(err)
352
            else:
NEW
353
                log.warning("Some jobs have an incomplete job status, but "
×
354
                      + "you entered '--ignore-proc-table-failures'. This "
355
                      + "script will not fix them. "
356
                      + "You should have fixed those first. Proceeding...")
NEW
357
        if np.sum(ptable['OBSTYPE']=='science')>0:
×
NEW
358
            ptable_expids = set(np.unique(np.concatenate(
×
359
                                ptable['EXPID'][ptable['OBSTYPE']=='science']
360
                            )))
361
        else:
NEW
362
            ptable_expids = set()
×
NEW
363
        etable_expids = set(etable['EXPID'][etable['OBSTYPE']=='science'])
×
NEW
364
        if len(etable_expids.difference(ptable_expids)) == 0:
×
NEW
365
            log.info("All science EXPID's already present in processing table, "
×
366
                     + "nothing to run. Exiting")
NEW
367
            return ptable
×
NEW
368
        int_id = np.max(ptable['INTID'])+1
×
369
    else:
370
        int_id = night_to_starting_iid(night=night)
1✔
371

372
    ################### Determine What to Process ###################
373
    ## Load calibration_override_file
374
    overrides = load_override_file(filepathname=override_pathname)
1✔
375
    cal_override = {}
1✔
376
    if 'calibration' in overrides:
1✔
377
        cal_override = overrides['calibration']
1✔
378

379
    ## Determine calibrations that will be linked
380
    if 'linkcal' in cal_override:
1✔
381
        files_to_link, files_not_linked = None, None
1✔
382
        if 'include' in  cal_override['linkcal']:
1✔
383
            files_to_link = cal_override['linkcal']['include']
1✔
384
        if 'exclude' in  cal_override['linkcal']:
1✔
NEW
385
            files_not_linked = cal_override['linkcal']['exclude']
×
386
        files_to_link, files_not_linked = derive_include_exclude(files_to_link,
1✔
387
                                                                 files_not_linked)
388
        ## Fiberflatnights need to be generated with psfs from same time, so
389
        ## can't link psfs without also linking fiberflatnight
390
        if 'psfnight' in files_to_link and not 'fiberflatnight' in files_to_link \
1✔
391
                and not psf_linking_without_fflat:
392
            err = "Must link fiberflatnight if linking psfnight"
1✔
393
            log.error(err)
1✔
394
            raise ValueError(err)
1✔
395
    else:
396
        files_to_link = set()
1✔
397

398
    ## Identify what calibrations have been done
399
    calibjobs = generate_calibration_dict(ptable, files_to_link)
1✔
400

401
    ## Determine the appropriate set of calibrations
402
    ## Only run if we haven't already linked or done fiberflatnight's
403
    cal_etable = None
1✔
404
    if not all_calibs_submitted(calibjobs['completed']):
1✔
405
        cal_etable = determine_calibrations_to_proc(etable,
1✔
406
                                                    do_cte_flats=do_cte_flats,
407
                                                    still_acquiring=still_acquiring)
408

409
    ## Determine the appropriate science exposures
410
    sci_etable, tiles_to_proc = determine_science_to_proc(
1✔
411
                                        etable=etable, tiles=tiles,
412
                                        surveys=surveys, laststeps=science_laststeps,
413
                                        processed_tiles=np.unique(ptable['TILEID']),
414
                                        all_tiles=all_tiles,
415
                                        ignore_last_tile=still_acquiring,
416
                                        complete_tiles_thrunight=complete_tiles_thrunight,
417
                                        specstatus_path=specstatus_path)
418

419
    ## For cumulative redshifts, identify tiles for which this is the last
420
    ## night that they were observed
421
    tiles_cumulative = get_tiles_cumulative(sci_etable, z_submit_types,
1✔
422
                                            all_cumulatives, night)
423

424
    ################### Process the data ###################
425
    ## Process Calibrations
426
    ## For now assume that a linkcal job links all files and we therefore
427
    ## don't need to submit anything more.
428
    def create_submit_add_and_save(prow, proctable, check_outputs=check_for_outputs,
1✔
429
                                   extra_job_args=None):
430
        log.info(f"\nProcessing: {prow}\n")
1✔
431
        prow = create_and_submit(prow, dry_run=dry_run_level, queue=queue,
1✔
432
                                 reservation=reservation,
433
                                 strictly_successful=True,
434
                                 check_for_outputs=check_outputs,
435
                                 resubmit_partial_complete=resubmit_partial_complete,
436
                                 system_name=system_name,
437
                                 use_specter=use_specter,
438
                                 extra_job_args=extra_job_args)
439
        ## Add the processing row to the processing table
440
        proctable.add_row(prow)
1✔
441
        if len(proctable) > 0 and dry_run_level < 3:
1✔
442
            write_table(proctable, tablename=proc_table_pathname)
1✔
443
        sleep_and_report(sub_wait_time,
1✔
444
                         message_suffix=f"to slow down the queue submission rate",
445
                         dry_run=dry_run, logfunc=log.info)
446
        return prow, proctable
1✔
447

448
    ## Actually process the calibrations
449
    ## Only run if we haven't already linked or done fiberflatnight's
450
    if not all_calibs_submitted(calibjobs['completed']):
1✔
451
        ptable, calibjobs, int_id = submit_calibrations(cal_etable, ptable,
1✔
452
                                                cal_override, calibjobs,
453
                                                int_id, night, files_to_link,
454
                                                create_submit_add_and_save)
455

456
    ## Require some minimal level of calibrations to process science exposures
457
    if require_cals and not all_calibs_submitted(calibjobs['completed']):
1✔
NEW
458
        err = (f"Required to have at least flat calibrations via override link"
×
459
               + f" or nightlyflat")
NEW
460
        log.error(err)
×
461
        ## If still acquiring new data in daily mode, don't exit with error code
462
        ## But do exit
NEW
463
        if still_acquiring:
×
NEW
464
            if len(ptable) > 0:
×
NEW
465
                processed = np.isin(full_etable['EXPID'],
×
466
                                    np.unique(np.concatenate(ptable['EXPID'])))
NEW
467
                unproc_table = full_etable[~processed]
×
468
            else:
NEW
469
                unproc_table = full_etable
×
NEW
470
            return ptable, unproc_table
×
471
        else:
NEW
472
            sys.exit(1)
×
473

474
    ## Process Sciences
475
    ## Loop over new tiles and process them
476
    for tile in tiles_to_proc:
1✔
477
        log.info(f'\n\n##################### {tile} #########################')
1✔
478

479
        ## Identify the science exposures for the given tile
480
        tile_etable = sci_etable[sci_etable['TILEID'] == tile]
1✔
481
        
482
        ## Should change submit_tilenight_and_redshifts to take erows
483
        ## but for now will remain backward compatible and use prows
484
        ## Create list of prows from selected etable rows
485
        sciences = []
1✔
486
        for erow in tile_etable:
1✔
487
            prow = erow_to_prow(erow)
1✔
488
            prow['INTID'] = int_id
1✔
489
            int_id += 1
1✔
490
            prow['JOBDESC'] = prow['OBSTYPE']
1✔
491
            prow = define_and_assign_dependency(prow, calibjobs)
1✔
492
            sciences.append(prow)
1✔
493
            
494
        # don't submit cumulative redshifts for lasttile if it isn't in tiles_cumulative
495
        if z_submit_types is None:
1✔
496
            cur_z_submit_types = None
1✔
497
        else:
498
            cur_z_submit_types = z_submit_types.copy()
1✔
499

500
        if ((z_submit_types is not None) and ('cumulative' in z_submit_types)
1✔
501
            and (tile not in tiles_cumulative)):
NEW
502
            cur_z_submit_types.remove('cumulative')
×
503

504
        ## No longer need to return sciences since this is always the
505
        ## full set of exposures, but will keep for now for backward
506
        ## compatibility
507
        extra_job_args = {}
1✔
508
        if 'science' in overrides and 'tilenight' in overrides['science']:
1✔
NEW
509
            extra_job_args = overrides['science']['tilenight']
×
510
        else:
511
            extra_job_args = {}
1✔
512

513
        extra_job_args['z_submit_types'] = cur_z_submit_types
1✔
514
        extra_job_args['laststeps'] = science_laststeps
1✔
515
        ptable, sciences, int_id = submit_tilenight_and_redshifts(
1✔
516
                                    ptable, sciences, calibjobs, int_id,
517
                                    dry_run=dry_run_level, queue=queue,
518
                                    reservation=reservation,
519
                                    strictly_successful=True,
520
                                    check_for_outputs=check_for_outputs,
521
                                    resubmit_partial_complete=resubmit_partial_complete,
522
                                    system_name=system_name,
523
                                    use_specter=use_specter,
524
                                    extra_job_args=extra_job_args)
525

526
        if len(ptable) > 0 and dry_run_level < 3:
1✔
527
            write_table(ptable, tablename=proc_table_pathname)
1✔
528

529
        sleep_and_report(sub_wait_time,
1✔
530
                         message_suffix=f"to slow down the queue submission rate",
531
                         dry_run=dry_run, logfunc=log.info)
532

533
        ## Flush the outputs
534
        sys.stdout.flush()
1✔
535
        sys.stderr.flush()
1✔
536

537
    ################### Wrap things up ###################
538
    unproc_table = None
1✔
539
    if len(ptable) > 0:
1✔
540
        ## All jobs now submitted, update information from job queue and save
541
        ptable = update_from_queue(ptable, dry_run=dry_run_level)
1✔
542
        if dry_run_level < 3:
1✔
543
            write_table(ptable, tablename=proc_table_pathname)
1✔
544
            ## Now that processing is complete, lets identify what we didn't process
545
            if len(ptable) > 0:
1✔
546
                processed = np.isin(full_etable['EXPID'], np.unique(np.concatenate(ptable['EXPID'])))
1✔
547
                unproc_table = full_etable[~processed]
1✔
548
            else:
NEW
549
                unproc_table = full_etable
×
550
            write_table(unproc_table, tablename=unproc_table_pathname)
1✔
NEW
551
    elif dry_run_level < 3 and len(full_etable) > 0:
×
552
        ## Done determining what not to process, so write out unproc file
NEW
553
        unproc_table = full_etable
×
NEW
554
        write_table(unproc_table, tablename=unproc_table_pathname)
×
555

556
    if dry_run_level >= 3:
1✔
557
        log.info(f"{dry_run_level=} so not saving outputs.")
1✔
558
        log.info(f"\n{full_etable=}")
1✔
559
        log.info(f"\nn{ptable=}")
1✔
560
        log.info(f"\n{unproc_table=}")
1✔
561

562
    if still_acquiring:
1✔
NEW
563
        log.info(f"Current submission of exposures "
×
564
                 + f"for {night=} are complete except for last tile.\n\n\n\n")
565
    else:
566
        log.info(f"All done: Completed submission of exposures for night {night}.\n")
1✔
567
        
568
    return ptable, unproc_table
1✔
569

570

571
def submit_calibrations(cal_etable, ptable, cal_override, calibjobs, int_id,
1✔
572
                        curnight, files_to_link, create_submit_add_and_save):
573
    log = get_logger()
1✔
574
    if len(ptable) > 0:
1✔
575
        ## we use this to check for individual jobs rather than combination
576
        ## jobs, so only check for scalar jobs where JOBDESC == OBSTYPE
577
        ## ex. dark, zero, arc, and flat
NEW
578
        explists = ptable['EXPID'][ptable['JOBDESC']==ptable['OBSTYPE']]
×
NEW
579
        processed_cal_expids = np.unique(np.concatenate(explists).astype(int))
×
580
    else:
581
        processed_cal_expids = np.array([]).astype(int)
1✔
582

583
    ######## Submit caliblink if requested ########
584

585
    if 'linkcal' in cal_override and calibjobs['linkcal'] is None:
1✔
586
        log.info("Linking calibration files listed in override files: "
1✔
587
                 + f"{files_to_link}")
588
        prow = default_prow()
1✔
589
        prow['INTID'] = int_id
1✔
590
        int_id += 1
1✔
591
        prow['JOBDESC'] = 'linkcal'
1✔
592
        prow['OBSTYPE'] = 'link'
1✔
593
        prow['CALIBRATOR'] = 1
1✔
594
        prow['NIGHT'] = curnight
1✔
595
        if 'refnight' in cal_override['linkcal']:
1✔
596
            refnight = int(cal_override['linkcal']['refnight'])
1✔
597
            prow = define_and_assign_dependency(prow, calibjobs, refnight=refnight)
1✔
598
        ## create dictionary to carry linking information
599
        linkcalargs = cal_override['linkcal']
1✔
600
        prow, ptable = create_submit_add_and_save(prow, ptable,
1✔
601
                                                  check_outputs=False,
602
                                                  extra_job_args=linkcalargs)
603
        calibjobs[prow['JOBDESC']] = prow.copy()
1✔
604
        calibjobs = update_calibjobs_with_linking(calibjobs, files_to_link)
1✔
605

606
    if len(cal_etable) == 0:
1✔
NEW
607
        return ptable, calibjobs, int_id
×
608

609
    ## Otherwise proceed with submitting the calibrations
610
    ## Define objects to process
611
    dark_erow, flats, ctes, cte1s = list(), list(), list(), list()
1✔
612
    zeros = cal_etable[cal_etable['OBSTYPE']=='zero']
1✔
613
    arcs = cal_etable[cal_etable['OBSTYPE']=='arc']
1✔
614
    if 'dark' in cal_etable['OBSTYPE']:
1✔
615
        darks = cal_etable[cal_etable['OBSTYPE']=='dark']
1✔
616
    if 'flat' in cal_etable['OBSTYPE']:
1✔
617
        allflats = cal_etable[cal_etable['OBSTYPE']=='flat']
1✔
618
        is_cte = np.array(['cte' in prog.lower() for prog in allflats['PROGRAM']])
1✔
619
        flats = allflats[~is_cte]
1✔
620
        ctes = allflats[is_cte]
1✔
621
        cte1s = ctes[np.abs(ctes['EXPTIME']-1.)<0.1]
1✔
622

623
    do_bias = (len(zeros) > 0 and 'biasnight' not in files_to_link
1✔
624
               and not calibjobs['completed']['nightlybias'])
625
    do_badcol = len(darks) > 0 and 'badcolumns' not in files_to_link
1✔
626
    do_cte = (len(cte1s) > 0 and len(flats) > 0) and 'ctecorrnight' not in files_to_link
1✔
627
    ## If no dark or ctes, do the nightlybias
628
    if do_bias and not do_badcol and not do_cte and not calibjobs['completed']['ccdcalib']:
1✔
629
        log.info("\nNo dark or cte found. Submitting nightlybias before "
1✔
630
                 "processing exposures.\n")
631
        prow = erow_to_prow(zeros[0])
1✔
632
        prow['EXPID'] = np.array([])
1✔
633
        prow['INTID'] = int_id
1✔
634
        int_id += 1
1✔
635
        prow['JOBDESC'] = 'nightlybias'
1✔
636
        prow['CALIBRATOR'] = 1
1✔
637
        cams = set(decode_camword('a0123456789'))
1✔
638
        for zero in zeros:
1✔
639
            if 'calib' in zero['PROGRAM']:
1✔
640
                proccamword = difference_camwords(zero['CAMWORD'],
1✔
641
                                                  zero['BADCAMWORD'])
642
                cams = cams.intersection(set(decode_camword(proccamword)))
1✔
643
        prow['PROCCAMWORD'] = create_camword(list(cams))
1✔
644
        prow = define_and_assign_dependency(prow, calibjobs)
1✔
645
        prow, ptable = create_submit_add_and_save(prow, ptable)
1✔
646
        calibjobs[prow['JOBDESC']] = prow.copy()
1✔
647
        calibjobs['completed'][prow['JOBDESC']] = True
1✔
648
        log.info("Performed nightly bias as no dark or cte passed cuts.")
1✔
649
    elif (do_badcol or do_cte) and not calibjobs['completed']['ccdcalib']:
1✔
650
        ######## Submit ccdcalib ########
651
        ## process dark for bad columns even if we don't have zeros for nightlybias
652
        ## ccdcalib = nightlybias(zeros) + badcol(dark) + cte correction
653
        jobdesc = 'ccdcalib'
1✔
654

655
        if calibjobs[jobdesc] is None:
1✔
656
            ccdcalib_erows = []
1✔
657
            if do_badcol:
1✔
658
                ## first exposure is a 300s dark
659
                ccdcalib_erows.append(darks[0])
1✔
660
            if do_cte:
1✔
661
                cte_expids = []
1✔
662
                ## second exposure is a 1s cte flat
663
                ccdcalib_erows.append(cte1s[0])
1✔
664
                cte_expids.append(cte1s[0]['EXPID'])
1✔
665
                ## third exposure is a 120s flat
666
                ccdcalib_erows.append(flats[-1])
1✔
667
                cte_expids.append(flats[-1]['EXPID'])
1✔
668
                cte_expids = np.array(cte_expids)
1✔
669
            else:
670
                cte_expids = None
1✔
671
                
672
            prow, int_id = make_exposure_prow(ccdcalib_erows[0], int_id,
1✔
673
                                              calibjobs, jobdesc=jobdesc)
674
            if len(ccdcalib_erows) > 1:
1✔
675
                prow['EXPID'] = np.array([erow['EXPID'] for erow in ccdcalib_erows])
1✔
676

677
            prow['CALIBRATOR'] = 1
1✔
678

679
            extra_job_args = {'nightlybias': do_bias,
1✔
680
                              'nightlycte': do_cte,
681
                              'cte_expids': cte_expids}
682
            prow, ptable = create_submit_add_and_save(prow, ptable,
1✔
683
                                                      extra_job_args=extra_job_args)
684
            calibjobs[prow['JOBDESC']] = prow.copy()
1✔
685
            calibjobs['completed'][prow['JOBDESC']] = True
1✔
686
            log.info(f"Submitted ccdcalib job with {do_bias=}, "
1✔
687
                     + f"{do_badcol=}, {do_cte=}")
688

689
    ######## Submit arcs and psfnight ########
690
    if len(arcs)>0 and not calibjobs['completed']['psfnight']:
1✔
691
        arc_prows = []
1✔
692
        for arc_erow in arcs:
1✔
693
            if arc_erow['EXPID'] in processed_cal_expids:
1✔
NEW
694
                continue
×
695
            prow, int_id = make_exposure_prow(arc_erow, int_id, calibjobs)
1✔
696
            prow, ptable = create_submit_add_and_save(prow, ptable)
1✔
697
            arc_prows.append(prow)
1✔
698

699
        joint_prow, int_id = make_joint_prow(arc_prows, descriptor='psfnight',
1✔
700
                                             internal_id=int_id)
701
        ptable = set_calibrator_flag(arc_prows, ptable)
1✔
702
        joint_prow, ptable = create_submit_add_and_save(joint_prow, ptable)
1✔
703
        calibjobs[joint_prow['JOBDESC']] = joint_prow.copy()
1✔
704
        calibjobs['completed'][joint_prow['JOBDESC']] = True
1✔
705

706

707
    ######## Submit flats and nightlyflat ########
708
    ## If nightlyflat defined we don't need to process more normal flats
709
    if len(flats) > 0 and not calibjobs['completed']['nightlyflat']:
1✔
710
        flat_prows = []
1✔
711
        for flat_erow in flats:
1✔
712
            if flat_erow['EXPID'] in processed_cal_expids:
1✔
NEW
713
                continue
×
714

715
            jobdesc = 'flat'
1✔
716
            prow, int_id = make_exposure_prow(flat_erow, int_id, calibjobs,
1✔
717
                                              jobdesc=jobdesc)
718
            prow, ptable = create_submit_add_and_save(prow, ptable)
1✔
719
            flat_prows.append(prow)
1✔
720

721
        joint_prow, int_id = make_joint_prow(flat_prows, descriptor='nightlyflat',
1✔
722
                                             internal_id=int_id)
723
        ptable = set_calibrator_flag(flat_prows, ptable)
1✔
724
        joint_prow, ptable = create_submit_add_and_save(joint_prow, ptable)
1✔
725
        calibjobs[joint_prow['JOBDESC']] = joint_prow.copy()
1✔
726
        calibjobs['completed'][joint_prow['JOBDESC']] = True
1✔
727
        
728
    ######## Submit cte flats ########
729
    jobdesc = 'flat'
1✔
730
    for cte_erow in ctes:
1✔
731
        if cte_erow['EXPID'] in processed_cal_expids:
1✔
NEW
732
            continue
×
733
        prow, int_id = make_exposure_prow(cte_erow, int_id, calibjobs,
1✔
734
                                      jobdesc=jobdesc)
735
        prow, ptable = create_submit_add_and_save(prow, ptable)
1✔
736
            
737
    return ptable, calibjobs, int_id
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc