• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

desihub / desispec / 11924426179

20 Nov 2024 12:39AM UTC coverage: 30.072% (-0.09%) from 30.16%
11924426179

Pull #2411

github

segasai
reformat
Pull Request #2411: save the trace shift offsets in the psf file

1 of 31 new or added lines in 3 files covered. (3.23%)

1675 existing lines in 20 files now uncovered.

14637 of 48673 relevant lines covered (30.07%)

0.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.63
/py/desispec/scripts/proc_night.py
1
"""
2
desispec.scripts.proc_night
3
=============================
4

5
"""
6
from desispec.io import findfile
1✔
7
from desispec.scripts.link_calibnight import derive_include_exclude
1✔
8
from desispec.workflow.calibration_selection import \
1✔
9
    determine_calibrations_to_proc
10
from desispec.workflow.science_selection import determine_science_to_proc, \
1✔
11
    get_tiles_cumulative
12
from desiutil.log import get_logger
1✔
13
import numpy as np
1✔
14
import os
1✔
15
import sys
1✔
16
import time
1✔
17
import re
1✔
18
from socket import gethostname
1✔
19
from astropy.table import Table, vstack
1✔
20

21
## Import some helper functions, you can see their definitions by uncomenting the bash shell command
22
from desispec.scripts.update_exptable import update_exposure_table
1✔
23
from desispec.workflow.tableio import load_tables, write_table
1✔
24
from desispec.workflow.utils import sleep_and_report, \
1✔
25
    verify_variable_with_environment, load_override_file
26
from desispec.workflow.timing import what_night_is_it, during_operating_hours
1✔
27
from desispec.workflow.exptable import get_last_step_options, \
1✔
28
    read_minimal_science_exptab_cols
29
from desispec.workflow.proctable import default_obstypes_for_proctable, \
1✔
30
    erow_to_prow, default_prow, read_minimal_tilenight_proctab_cols
31
from desispec.workflow.processing import define_and_assign_dependency, \
1✔
32
    create_and_submit, \
33
    submit_tilenight_and_redshifts, \
34
    generate_calibration_dict, \
35
    night_to_starting_iid, make_joint_prow, \
36
    set_calibrator_flag, make_exposure_prow, \
37
    all_calibs_submitted, \
38
    update_and_recursively_submit, update_accounted_for_with_linking
39
from desispec.workflow.queue import update_from_queue, any_jobs_failed, \
1✔
40
    get_resubmission_states
41
from desispec.io.util import decode_camword, difference_camwords, \
1✔
42
    create_camword, replace_prefix, erow_to_goodcamword, camword_union
43

44

45
def proc_night(night=None, proc_obstypes=None, z_submit_types=None,
1✔
46
               queue=None, reservation=None, system_name=None,
47
               exp_table_pathname=None, proc_table_pathname=None,
48
               override_pathname=None, update_exptable=False,
49
               dry_run_level=0, dry_run=False, no_redshifts=False,
50
               ignore_proc_table_failures = False,
51
               dont_check_job_outputs=False, dont_resubmit_partial_jobs=False,
52
               tiles=None, surveys=None, science_laststeps=None,
53
               all_tiles=False, specstatus_path=None, use_specter=False,
54
               no_cte_flats=False, complete_tiles_thrunight=None,
55
               all_cumulatives=False, daily=False, specprod=None,
56
               path_to_data=None, exp_obstypes=None, camword=None,
57
               badcamword=None, badamps=None, exps_to_ignore=None,
58
               sub_wait_time=0.1, verbose=False, dont_require_cals=False,
59
               psf_linking_without_fflat=False, no_resub_failed=False,
60
               still_acquiring=False):
61
    """
62
    Process some or all exposures on a night. Can be used to process an entire
63
    night, or used to process data currently available on a given night using
64
    the '--daily' flag.
65

66
    Args:
67
        night (int): The night of data to be processed. Exposure table must exist.
68
        proc_obstypes (list or np.array, optional): A list of exposure OBSTYPE's
69
            that should be processed (and therefore added to the processing table).
70
        z_submit_types (list of str):
71
            The "group" types of redshifts that should be submitted with each
72
            exposure. If not specified, default for daily processing is
73
            ['cumulative', 'pernight-v0']. If false, 'false', or [], then no
74
            redshifts are submitted.
75
        queue (str, optional): The name of the queue to submit the jobs to.
76
            Default is "realtime".
77
        reservation (str, optional): The reservation to submit jobs to.
78
            If None, it is not submitted to a reservation.
79
        system_name (str): batch system name, e.g. cori-haswell, cori-knl,
80
            perlmutter-gpu
81
        exp_table_pathname (str): Full path to where to exposure tables are stored,
82
            including file name.
83
        proc_table_pathname (str): Full path to where to processing tables to be
84
            written, including file name
85
        override_pathname (str): Full path to the override file.
86
        update_exptable (bool): If true then the exposure table is updated.
87
            The default is False.
88
        dry_run_level (int, optional): If nonzero, this is a simulated run.
89
            If dry_run_level=1 the scripts will be written but not submitted.
90
            If dry_run_level=2, the scripts will not be written nor submitted
91
            but the processing_table is still created.
92
            If dry_run_level=3, no output files are written.
93
            Logging will remain the same for testing as though scripts are
94
            being submitted. Default is 0 (false).
95
        dry_run (bool, optional): When to run without submitting scripts or
96
            not. If dry_run_level is defined, then it over-rides this flag.
97
            dry_run_level not set and dry_run=True, dry_run_level is set to 2
98
            (no scripts generated or run). Default for dry_run is False.
99
        no_redshifts (bool, optional): Whether to submit redshifts or not.
100
            If True, redshifts are not submitted.
101
        ignore_proc_table_failures (bool, optional): True if you want to submit
102
            other jobs even the loaded processing table has incomplete jobs in
103
            it. Use with caution. Default is False.
104
        dont_check_job_outputs (bool, optional): Default is False. If False,
105
            the code checks for the existence of the expected final data
106
            products for the script being submitted. If all files exist and
107
            this is False, then the script will not be submitted. If some
108
            files exist and this is False, only the subset of the cameras
109
            without the final data products will be generated and submitted.
110
        dont_resubmit_partial_jobs (bool, optional): Default is False. Must be
111
            used with dont_check_job_outputs=False. If this flag is False, jobs
112
            with some prior data are pruned using PROCCAMWORD to only process
113
            the remaining cameras not found to exist.
114
        tiles (array-like, optional): Only submit jobs for these TILEIDs.
115
        surveys (array-like, optional): Only submit science jobs for these
116
            surveys (lowercase)
117
        science_laststeps (array-like, optional): Only submit jobs for exposures
118
            with LASTSTEP in these science_laststeps (lowercase)
119
        all_tiles (bool, optional): Default is False. Set to NOT restrict to
120
            completed tiles as defined by the table pointed to by specstatus_path.
121
        specstatus_path (str, optional): Default is
122
            $DESI_SURVEYOPS/ops/tiles-specstatus.ecsv. Location of the
123
            surveyops specstatus table.
124
        use_specter (bool, optional): Default is False. If True, use specter,
125
            otherwise use gpu_specter by default.
126
        no_cte_flats (bool, optional): Default is False. If False, cte flats
127
            are used if available to correct for cte effects.
128
        complete_tiles_thrunight (int, optional): Default is None. Only tiles
129
            completed on or before the supplied YYYYMMDD are considered
130
            completed and will be processed. All complete tiles are submitted
131
            if None or all_tiles is True.
132
        all_cumulatives (bool, optional): Default is False. Set to run
133
            cumulative redshifts for all tiles even if the tile has observations
134
            on a later night.
135
        specprod: str. The name of the current production. If used, this will
136
            overwrite the SPECPROD environment variable.
137
        daily: bool. Flag that sets other flags for running this script for the
138
            daily pipeline.
139
        path_to_data: str. Path to the raw data.
140
        exp_obstypes: str or comma separated list of strings. The exposure
141
            OBSTYPE's that you want to include in the exposure table.
142
        camword: str. Camword that, if set, alters the set of cameras that will
143
            be set for processing. Examples: a0123456789, a1, a2b3r3,
144
            a2b3r4z3. Note this is only true for new exposures being
145
            added to the exposure_table in 'daily' mode.
146
        badcamword: str. Camword that, if set, will be removed from the camword
147
            defined in camword if given, or the camword inferred from
148
            the data if camword is not given. Note this is only true
149
            for new exposures being added to the exposure_table
150
            in 'daily' mode.
151
        badamps: str. Comma seperated list of bad amplifiers that should not
152
            be processed. Should be of the form "{camera}{petal}{amp}",
153
            i.e. "[brz][0-9][ABCD]". Example: 'b7D,z8A'. Note this is
154
            only true for new exposures being added to the
155
            exposure_table in 'daily' mode.
156
        sub_wait_time: int. Wait time in seconds between submission loops.
157
            Default 0.1 seconds.
158
        verbose: bool. True if you want more verbose output, false otherwise.
159
            Current not propagated to lower code, so it is only used in the
160
            main daily_processing script itself.
161
        dont_require_cals: bool. Default False. If set then the code doesn't
162
            require either a valid set of calibrations or a valid override file
163
            to link to calibrations in order to proceed with science processing.
164
        psf_linking_without_fflat: bool. Default False. If set then the code
165
            will NOT raise an error if asked to link psfnight calibrations
166
            without fiberflatnight calibrations.
167
        no_resub_failed: bool. Set to True if you do NOT want to resubmit
168
            jobs with Slurm status 'FAILED' by default. Default is False.
169
        still_acquiring: bool. If True, assume more data might be coming, e.g.
170
            wait for additional exposures of latest tile.  If False, auto-derive
171
            True/False based upon night and current time. Primarily for testing.
172
    """
173
    ## Get logger
174
    log = get_logger()
1✔
175
    log.info(f'----- Processing {night} at {time.asctime()} -----')
1✔
176
    log.info(f"SLURM_JOB_ID={os.getenv('SLURM_JOB_ID')} on {gethostname()}")
1✔
177

178
    ## Inform user of how some parameters will be used
179
    if camword is not None:
1✔
180
        log.info(f"Note custom {camword=} will only be used for new exposures"
×
181
                 f" being entered into the exposure_table, not all exposures"
182
                 f" to be processed.")
183
    if badcamword is not None:
1✔
184
        log.info(f"Note custom {badcamword=} will only be used for new exposures"
×
185
                 f" being entered into the exposure_table, not all exposures"
186
                 f" to be processed.")
187
    if badamps is not None:
1✔
188
        log.info(f"Note custom {badamps=} will only be used for new exposures"
×
189
                 f" being entered into the exposure_table, not all exposures"
190
                 f" to be processed.")
191

192
    ## Reconcile the dry_run and dry_run_level
193
    if dry_run and dry_run_level == 0:
1✔
194
        dry_run_level = 2
×
195
    elif dry_run_level > 0:
1✔
196
        dry_run = True
1✔
197

198
    ## If running in daily mode, change a bunch of defaults
199
    if daily:
1✔
200
        ## What night are we running on?
201
        true_night = what_night_is_it()
×
202
        if night is not None:
×
203
            night = int(night)
×
204
            if true_night != night:
×
205
                log.info(f"True night is {true_night}, but running daily for {night=}")
×
206
        else:
207
            night = true_night
×
208

209
        if science_laststeps is None:
×
210
            science_laststeps = ['all', 'skysub', 'fluxcal']
×
211

212
        if z_submit_types is None and not no_redshifts:
×
213
            z_submit_types = ['cumulative']
×
214

215
        ## still_acquiring is flag to determine whether to process the last tile in the exposure table
216
        ## or not. This is used in daily mode when processing and exiting mid-night.
217
        ## override still_acquiring==False if daily mode during observing hours
218
        if during_operating_hours(dry_run=dry_run) and (true_night == night):
×
219
            if still_acquiring is False:
×
220
                log.info(f'Daily mode during observing hours on current night, so assuming that more data might arrive and setting still_acquiring=True')
×
221
            still_acquiring = True
×
222

223
        update_exptable = True    
×
224
        append_to_proc_table = True
×
225
        all_cumulatives = True
×
226
        all_tiles = True
×
227
        complete_tiles_thrunight = None
×
228
        ## Default for nightly processing is realtime queue
229
        if queue is None:
×
230
            queue = 'realtime'
×
231

232
    ## Default for normal processing is regular queue
233
    if queue is None:
1✔
234
        queue = 'regular'
1✔
235
    log.info(f"Submitting to the {queue} queue.")
1✔
236
             
237
    ## Set night
238
    if night is None:
1✔
239
        err = "Must specify night unless running in daily=True mode"
×
240
        log.error(err)
×
241
        raise ValueError(err)
×
242
    else:
243
        log.info(f"Processing {night=}")
1✔
244

245
    ## Recast booleans from double negative
246
    check_for_outputs = (not dont_check_job_outputs)
1✔
247
    resubmit_partial_complete = (not dont_resubmit_partial_jobs)
1✔
248
    require_cals = (not dont_require_cals)
1✔
249
    do_cte_flats = (not no_cte_flats)
1✔
250
    
251
    ## cte flats weren't available before 20211130 so hardcode that in
252
    if do_cte_flats and night < 20211130:
1✔
253
        log.info("Asked to do cte flat correction but before 20211130 no "
1✔
254
                    + "no cte flats are available to do the correction. "
255
                    + "Code will NOT perform cte flat corrections.")
256
        do_cte_flats = False
1✔
257

258
    ###################
259
    ## Set filenames ##
260
    ###################
261
    ## Ensure specprod is set in the environment and that it matches user
262
    ## specified value if given
263
    specprod = verify_variable_with_environment(specprod, var_name='specprod',
1✔
264
                                                env_name='SPECPROD')
265

266
    ## Determine where the exposure table will be written
267
    if exp_table_pathname is None:
1✔
268
        exp_table_pathname = findfile('exposure_table', night=night)
1✔
269
    if not os.path.exists(exp_table_pathname) and not update_exptable:
1✔
UNCOV
270
        raise IOError(f"Exposure table: {exp_table_pathname} not found. Exiting this night.")
×
271

272
    ## Determine where the processing table will be written
273
    if proc_table_pathname is None:
1✔
274
        proc_table_pathname = findfile('processing_table', night=night)
1✔
275
    proc_table_path = os.path.dirname(proc_table_pathname)
1✔
276
    if dry_run_level < 3:
1✔
277
        os.makedirs(proc_table_path, exist_ok=True)
1✔
278

279
    ## Determine where the unprocessed data table will be written
280
    unproc_table_pathname = replace_prefix(proc_table_pathname, 'processing', 'unprocessed')
1✔
281

282
    ## Require cal_override to exist if explcitly specified
283
    if override_pathname is None:
1✔
284
        override_pathname = findfile('override', night=night)
1✔
UNCOV
285
    elif not os.path.exists(override_pathname):
×
UNCOV
286
        raise IOError(f"Specified override file: "
×
287
                      f"{override_pathname} not found. Exiting this night.")
288

289
    #######################################
290
    ## Define parameters based on inputs ##
291
    #######################################
292
    ## If science_laststeps not defined, default is only LASTSTEP=='all' exposures
293
    if science_laststeps is None:
1✔
294
        science_laststeps = ['all']
1✔
295
    else:
UNCOV
296
        laststep_options = get_last_step_options()
×
UNCOV
297
        for laststep in science_laststeps:
×
298
            if laststep not in laststep_options:
×
299
                raise ValueError(f"Couldn't understand laststep={laststep} "
×
300
                                 + f"in science_laststeps={science_laststeps}.")
301
    log.info(f"Processing exposures with the following LASTSTEP's: {science_laststeps}")
1✔
302

303
    ## Define the group types of redshifts you want to generate for each tile
304
    if no_redshifts:
1✔
UNCOV
305
        log.info(f"no_redshifts set, so ignoring {z_submit_types=}")
×
UNCOV
306
        z_submit_types = None
×
307

308
    if z_submit_types is None:
1✔
309
        log.info("Not submitting scripts for redshift fitting")
1✔
310
    else:
311
        for ztype in z_submit_types:
1✔
312
            if ztype not in ['cumulative', 'pernight-v0', 'pernight', 'perexp']:
1✔
UNCOV
313
                raise ValueError(f"Couldn't understand ztype={ztype} "
×
314
                                 + f"in z_submit_types={z_submit_types}.")
315
        log.info(f"Redshift fitting with redshift group types: {z_submit_types}")
1✔
316

317
    ## Identify OBSTYPES to process
318
    if proc_obstypes is None:
1✔
319
        proc_obstypes = default_obstypes_for_proctable()
1✔
320

321
    #############################
322
    ## Start the Actual Script ##
323
    #############################
324
    ## If running in daily mode, or requested, then update the exposure table
325
    ## This reads in and writes out the exposure table to disk
326
    if update_exptable:
1✔
UNCOV
327
        log.info("Running update_exposure_table.")
×
UNCOV
328
        update_exposure_table(night=night, specprod=specprod,
×
329
                              exp_table_pathname=exp_table_pathname,
330
                              path_to_data=path_to_data, exp_obstypes=exp_obstypes,
331
                              camword=camword, badcamword=badcamword, badamps=badamps,
332
                              exps_to_ignore=exps_to_ignore,
333
                              dry_run_level=dry_run_level, verbose=verbose)
UNCOV
334
        log.info("Done with update_exposure_table.\n\n")
×
335
    ## Combine the table names and types for easier passing to io functions
336
    table_pathnames = [exp_table_pathname, proc_table_pathname]
1✔
337
    table_types = ['exptable', 'proctable']
1✔
338

339
    ## Load in the files defined above
340
    etable, ptable = load_tables(tablenames=table_pathnames, tabletypes=table_types)
1✔
341
    full_etable = etable.copy()
1✔
342

343
    ## For I/O efficiency, pre-populate exposure table and processing table caches
344
    ## of all nights if doing cross-night redshifts so that future per-night "reads"
345
    ## will use the cache.
346
    if z_submit_types is not None and 'cumulative' in z_submit_types:
1✔
347
        ## this shouldn't need to change since we've already updated the exptab
348
        read_minimal_science_exptab_cols()
1✔
349
        ## this would become out of date for the current night except
350
        ## write_table will keep it up to date
351
        read_minimal_tilenight_proctab_cols()
1✔
352

353
    ## Cut on OBSTYPES
354
    log.info(f"Processing the following obstypes: {proc_obstypes}")
1✔
355
    good_types = np.isin(np.array(etable['OBSTYPE']).astype(str), proc_obstypes)
1✔
356
    etable = etable[good_types]
1✔
357

358
    ## Update processing table
359
    tableng = len(ptable)
1✔
360
    if tableng > 0:
1✔
UNCOV
361
        ptable = update_from_queue(ptable, dry_run=dry_run_level)
×
UNCOV
362
        if dry_run_level < 3:
×
363
            write_table(ptable, tablename=proc_table_pathname, tabletype='proctable')
×
364
        if any_jobs_failed(ptable['STATUS']):
×
365
            ## Try up to two times to resubmit failures, afterwards give up
366
            ## unless explicitly told to proceed with the failures
367
            ## Note after 2 resubmissions, the code won't resubmit anymore even
368
            ## if given ignore_proc_table_failures
UNCOV
369
            if np.max([len(qids) for qids in ptable['ALL_QIDS']]) < 3:
×
UNCOV
370
                log.info("Job failures were detected. Resubmitting those jobs "
×
371
                         + "before continuing with new submissions.")
372

373
                ptable, nsubmits = update_and_recursively_submit(ptable,
×
374
                                                                 no_resub_failed=no_resub_failed,
375
                                                                 ptab_name=proc_table_pathname,
376
                                                                 dry_run=dry_run,
377
                                                                 reservation=reservation)
UNCOV
378
            elif not ignore_proc_table_failures:
×
UNCOV
379
                err = "Some jobs have an incomplete job status. This script " \
×
380
                      + "will not fix them. You should remedy those first. "
381
                log.error(err)
×
382
                ## if the failures are in calibrations, then crash since
383
                ## we need them for any new jobs
384
                if any_jobs_failed(ptable['STATUS'][ptable['CALIBRATOR'] > 0]):
×
UNCOV
385
                    err += "To proceed anyway use "
×
UNCOV
386
                    err += "'--ignore-proc-table-failures'. Exiting."
×
387
                    raise AssertionError(err)
388
            else:
389
                log.warning("Some jobs have an incomplete job status, but "
×
390
                      + "you entered '--ignore-proc-table-failures'. This "
391
                      + "script will not fix them. "
392
                      + "You should have fixed those first. Proceeding...")
UNCOV
393
        if np.sum(ptable['OBSTYPE']=='science') > 0:
×
UNCOV
394
            ptable_expids = set(np.concatenate(
×
395
                                ptable['EXPID'][ptable['OBSTYPE']=='science']
396
                            ))
397
        else:
398
            ptable_expids = set()
×
399
        etable_expids = set(etable['EXPID'][etable['OBSTYPE']=='science'])
×
400
        if len(etable_expids) == 0:
×
401
            log.info(f"No science exposures yet. Exiting at {time.asctime()}.")
×
402
            return ptable, None
×
UNCOV
403
        elif len(etable_expids.difference(ptable_expids)) == 0:
×
404
            log.info("All science EXPID's already present in processing table, "
×
405
                     + f"nothing to run. Exiting at {time.asctime()}.")
406
            return ptable, None
×
407

UNCOV
408
        int_id = np.max(ptable['INTID'])+1
×
409
    else:
410
        int_id = night_to_starting_iid(night=night)
1✔
411

412
    ################### Determine What to Process ###################
413
    ## Load calibration_override_file
414
    overrides = load_override_file(filepathname=override_pathname)
1✔
415
    cal_override = {}
1✔
416
    if 'calibration' in overrides:
1✔
417
        cal_override = overrides['calibration']
1✔
418

419
    ## Determine calibrations that will be linked
420
    if 'linkcal' in cal_override:
1✔
421
        files_to_link, files_not_linked = None, None
1✔
422
        if 'include' in cal_override['linkcal']:
1✔
423
            files_to_link = cal_override['linkcal']['include']
1✔
424
        if 'exclude' in cal_override['linkcal']:
1✔
UNCOV
425
            files_not_linked = cal_override['linkcal']['exclude']
×
426
        files_to_link, files_not_linked = derive_include_exclude(files_to_link,
1✔
427
                                                                 files_not_linked)
428
        ## Fiberflatnights need to be generated with psfs from same time, so
429
        ## can't link psfs without also linking fiberflatnight
430
        if 'psfnight' in files_to_link and not 'fiberflatnight' in files_to_link \
1✔
431
                and not psf_linking_without_fflat:
432
            err = "Must link fiberflatnight if linking psfnight"
1✔
433
            log.error(err)
1✔
434
            raise ValueError(err)
1✔
435
    else:
436
        files_to_link = set()
1✔
437

438
    ## Identify what calibrations have been done
439
    calibjobs = generate_calibration_dict(ptable, files_to_link)
1✔
440

441
    ## Determine the appropriate set of calibrations
442
    ## Only run if we haven't already linked or done fiberflatnight's
443
    cal_etable = etable[[]]
1✔
444
    if not all_calibs_submitted(calibjobs['accounted_for'], do_cte_flats):
1✔
445
        cal_etable = determine_calibrations_to_proc(etable,
1✔
446
                                                    do_cte_flats=do_cte_flats,
447
                                                    still_acquiring=still_acquiring)
448

449
    ## Determine the appropriate science exposures
450
    sci_etable, tiles_to_proc = determine_science_to_proc(
1✔
451
                                        etable=etable, tiles=tiles,
452
                                        surveys=surveys, laststeps=science_laststeps,
453
                                        processed_tiles=np.unique(ptable['TILEID']),
454
                                        all_tiles=all_tiles,
455
                                        ignore_last_tile=still_acquiring,
456
                                        complete_tiles_thrunight=complete_tiles_thrunight,
457
                                        specstatus_path=specstatus_path)
458

459
    ## if camword isn't defined in the file, derive the cameras needed
460
    ## to process the current night's etable
461
    if 'linkcal' in cal_override and 'camword' not in cal_override['linkcal']:
1✔
462
        log.info(f"Linkcal in override file doesn't define camword. Setting "
1✔
463
                 + f"as the union of goodcamwords for all good exposures.")
464
        goodcamwords = [erow_to_goodcamword(erow, suppress_logging=True)
1✔
465
                           for erow in vstack([cal_etable, sci_etable])]
466
        cal_override['linkcal']['camword'] = camword_union(goodcamwords)
1✔
467

468
    ## For cumulative redshifts, identify tiles for which this is the last
469
    ## night that they were observed
470
    tiles_cumulative = get_tiles_cumulative(sci_etable, z_submit_types,
1✔
471
                                            all_cumulatives, night)
472

473
    ################### Process the data ###################
474
    ## Process Calibrations
475
    ## For now assume that a linkcal job links all files and we therefore
476
    ## don't need to submit anything more.
477
    def create_submit_add_and_save(prow, proctable, check_outputs=check_for_outputs,
1✔
478
                                   extra_job_args=None):
479
        log.info(f"\nProcessing: {prow}\n")
1✔
480
        prow = create_and_submit(prow, dry_run=dry_run_level, queue=queue,
1✔
481
                                 reservation=reservation,
482
                                 strictly_successful=True,
483
                                 check_for_outputs=check_outputs,
484
                                 resubmit_partial_complete=resubmit_partial_complete,
485
                                 system_name=system_name,
486
                                 use_specter=use_specter,
487
                                 extra_job_args=extra_job_args)
488
        ## Add the processing row to the processing table
489
        proctable.add_row(prow)
1✔
490
        if len(proctable) > 0 and dry_run_level < 3:
1✔
491
            write_table(proctable, tablename=proc_table_pathname, tabletype='proctable')
1✔
492
        sleep_and_report(sub_wait_time,
1✔
493
                         message_suffix=f"to slow down the queue submission rate",
494
                         dry_run=dry_run, logfunc=log.info)
495
        return prow, proctable
1✔
496

497
    ## Actually process the calibrations
498
    ## Only run if we haven't already linked or done fiberflatnight's
499
    if not all_calibs_submitted(calibjobs['accounted_for'], do_cte_flats):
1✔
500
        ptable, calibjobs, int_id = submit_calibrations(cal_etable, ptable,
1✔
501
                                                cal_override, calibjobs,
502
                                                int_id, night, files_to_link,
503
                                                create_submit_add_and_save)
504

505
    ## Require some minimal level of calibrations to process science exposures
506
    if require_cals and not all_calibs_submitted(calibjobs['accounted_for'], do_cte_flats):
1✔
UNCOV
507
        err = (f"Exiting because not all calibration files accounted for "
×
508
               + f"with links or submissions and require_cals is True.")
UNCOV
509
        log.error(err)
×
510
        ## If still acquiring new data in daily mode, don't exit with error code
511
        ## But do exit
UNCOV
512
        log.info(f'Stopping at {time.asctime()}\n')
×
UNCOV
513
        if still_acquiring:
×
UNCOV
514
            if len(ptable) > 0:
×
UNCOV
515
                processed = np.isin(full_etable['EXPID'],
×
516
                                    np.unique(np.concatenate(ptable['EXPID'])))
UNCOV
517
                unproc_table = full_etable[~processed]
×
518
            else:
519
                unproc_table = full_etable
×
520

521
            return ptable, unproc_table
×
522
        else:
UNCOV
523
            sys.exit(1)
×
524

525
    ## Process Sciences
526
    ## Loop over new tiles and process them
527
    for tile in tiles_to_proc:
1✔
528
        log.info(f'\n\n################# Submitting {tile} #####################')
1✔
529

530
        ## Identify the science exposures for the given tile
531
        tile_etable = sci_etable[sci_etable['TILEID'] == tile]
1✔
532
        
533
        ## Should change submit_tilenight_and_redshifts to take erows
534
        ## but for now will remain backward compatible and use prows
535
        ## Create list of prows from selected etable rows
536
        sciences = []
1✔
537
        for erow in tile_etable:
1✔
538
            prow = erow_to_prow(erow)
1✔
539
            prow['INTID'] = int_id
1✔
540
            int_id += 1
1✔
541
            prow['JOBDESC'] = prow['OBSTYPE']
1✔
542
            prow = define_and_assign_dependency(prow, calibjobs)
1✔
543
            sciences.append(prow)
1✔
544
            
545
        # don't submit cumulative redshifts for lasttile if it isn't in tiles_cumulative
546
        if z_submit_types is None:
1✔
547
            cur_z_submit_types = None
1✔
548
        else:
549
            cur_z_submit_types = z_submit_types.copy()
1✔
550

551
        if ((z_submit_types is not None) and ('cumulative' in z_submit_types)
1✔
552
            and (tile not in tiles_cumulative)):
UNCOV
553
            cur_z_submit_types.remove('cumulative')
×
554

555
        ## No longer need to return sciences since this is always the
556
        ## full set of exposures, but will keep for now for backward
557
        ## compatibility
558
        extra_job_args = {}
1✔
559
        if 'science' in overrides and 'tilenight' in overrides['science']:
1✔
UNCOV
560
            extra_job_args = overrides['science']['tilenight']
×
561
        else:
562
            extra_job_args = {}
1✔
563

564
        extra_job_args['z_submit_types'] = cur_z_submit_types
1✔
565
        extra_job_args['laststeps'] = science_laststeps
1✔
566
        ptable, sciences, int_id = submit_tilenight_and_redshifts(
1✔
567
                                    ptable, sciences, calibjobs, int_id,
568
                                    dry_run=dry_run_level, queue=queue,
569
                                    reservation=reservation,
570
                                    strictly_successful=True,
571
                                    check_for_outputs=check_for_outputs,
572
                                    resubmit_partial_complete=resubmit_partial_complete,
573
                                    system_name=system_name,
574
                                    use_specter=use_specter,
575
                                    extra_job_args=extra_job_args)
576

577
        if len(ptable) > 0 and dry_run_level < 3:
1✔
578
            write_table(ptable, tablename=proc_table_pathname, tabletype='proctable')
1✔
579

580
        sleep_and_report(sub_wait_time,
1✔
581
                         message_suffix=f"to slow down the queue submission rate",
582
                         dry_run=dry_run, logfunc=log.info)
583

584
        ## Flush the outputs
585
        sys.stdout.flush()
1✔
586
        sys.stderr.flush()
1✔
587

588
    ################### Wrap things up ###################
589
    unproc_table = None
1✔
590
    if len(ptable) > 0:
1✔
591
        ## All jobs now submitted, update information from job queue and save
592
        ## But only if actually submitting or fully simulating, don't simulate
593
        ## outputs that will be written to disk (levels 1 and 2)
594
        if dry_run_level < 1 or dry_run_level > 2:
1✔
595
            ptable = update_from_queue(ptable, dry_run=dry_run_level)
1✔
596
        if dry_run_level < 3:
1✔
597
            write_table(ptable, tablename=proc_table_pathname, tabletype='proctable')
1✔
598
            ## Now that processing is complete, lets identify what we didn't process
599
            if len(ptable) > 0:
1✔
600
                processed = np.isin(full_etable['EXPID'], np.unique(np.concatenate(ptable['EXPID'])))
1✔
601
                unproc_table = full_etable[~processed]
1✔
602
            else:
UNCOV
603
                unproc_table = full_etable
×
604
            write_table(unproc_table, tablename=unproc_table_pathname)
1✔
UNCOV
605
    elif dry_run_level < 3 and len(full_etable) > 0:
×
606
        ## Done determining what not to process, so write out unproc file
UNCOV
607
        unproc_table = full_etable
×
UNCOV
608
        write_table(unproc_table, tablename=unproc_table_pathname)
×
609

610
    if dry_run_level >= 3:
1✔
611
        log.info(f"{dry_run_level=} so not saving outputs.")
1✔
612
        log.info(f"\n{full_etable=}")
1✔
613
        log.info(f"\nn{ptable=}")
1✔
614
        log.info(f"\n{unproc_table=}")
1✔
615

616
    if still_acquiring:
1✔
UNCOV
617
        log.info(f"Current submission of exposures "
×
618
                 + f"for {night=} are complete except for last tile at {time.asctime()}.\n\n\n\n")
619
    else:
620
        log.info(f"All done: Completed submission of exposures for night {night} at {time.asctime()}.\n")
1✔
621
        
622
    return ptable, unproc_table
1✔
623

624

625
def submit_calibrations(cal_etable, ptable, cal_override, calibjobs, int_id,
1✔
626
                        curnight, files_to_link, create_submit_add_and_save):
627
    log = get_logger()
1✔
628
    if len(ptable) > 0:
1✔
629
        ## we use this to check for individual jobs rather than combination
630
        ## jobs, so only check for scalar jobs where JOBDESC == OBSTYPE
631
        ## ex. dark, zero, arc, and flat
UNCOV
632
        explists = ptable['EXPID'][ptable['JOBDESC']==ptable['OBSTYPE']]
×
UNCOV
633
        if len(explists) == 0:
×
UNCOV
634
            processed_cal_expids = np.array([]).astype(int)
×
UNCOV
635
        elif len(explists) == 1:
×
UNCOV
636
            processed_cal_expids = np.unique(explists[0]).astype(int)
×
637
        else:
UNCOV
638
            processed_cal_expids = np.unique(np.concatenate(explists).astype(int))
×
639
    else:
640
        processed_cal_expids = np.array([]).astype(int)
1✔
641

642
    ######## Submit caliblink if requested ########
643

644
    if 'linkcal' in cal_override and calibjobs['linkcal'] is None:
1✔
645
        log.info("Linking calibration files listed in override files: "
1✔
646
                 + f"{files_to_link}")
647
        prow = default_prow()
1✔
648
        prow['INTID'] = int_id
1✔
649
        int_id += 1
1✔
650
        prow['JOBDESC'] = 'linkcal'
1✔
651
        prow['OBSTYPE'] = 'link'
1✔
652
        prow['CALIBRATOR'] = 1
1✔
653
        prow['NIGHT'] = curnight
1✔
654
        if 'refnight' in cal_override['linkcal']:
1✔
655
            refnight = int(cal_override['linkcal']['refnight'])
1✔
656
            prow = define_and_assign_dependency(prow, calibjobs, refnight=refnight)
1✔
657
        if 'camword' in cal_override['linkcal']:
1✔
658
            prow['PROCCAMWORD'] = cal_override['linkcal']['camword']
1✔
659

660
        ## create dictionary to carry linking information
661
        linkcalargs = cal_override['linkcal']
1✔
662
        prow, ptable = create_submit_add_and_save(prow, ptable,
1✔
663
                                                  check_outputs=False,
664
                                                  extra_job_args=linkcalargs)
665
        calibjobs[prow['JOBDESC']] = prow.copy()
1✔
666
        calibjobs['accounted_for'] = \
1✔
667
            update_accounted_for_with_linking(calibjobs['accounted_for'],
668
                                              files_to_link)
669

670
    if len(cal_etable) == 0:
1✔
UNCOV
671
        return ptable, calibjobs, int_id
×
672

673
    ## Otherwise proceed with submitting the calibrations
674
    ## Define objects to process
675
    darks, flats, ctes, cte1s = list(), list(), list(), list()
1✔
676
    zeros = cal_etable[cal_etable['OBSTYPE']=='zero']
1✔
677
    arcs = cal_etable[cal_etable['OBSTYPE']=='arc']
1✔
678
    if 'dark' in cal_etable['OBSTYPE']:
1✔
679
        darks = cal_etable[cal_etable['OBSTYPE']=='dark']
1✔
680
    if 'flat' in cal_etable['OBSTYPE']:
1✔
681
        allflats = cal_etable[cal_etable['OBSTYPE']=='flat']
1✔
682
        is_cte = np.array(['cte' in prog.lower() for prog in allflats['PROGRAM']])
1✔
683
        flats = allflats[~is_cte]
1✔
684
        ctes = allflats[is_cte]
1✔
685

686
    have_flats_for_cte = len(ctes) > 0 and len(flats) > 0
1✔
687
    do_bias = len(zeros) > 0 and not calibjobs['accounted_for']['biasnight']
1✔
688
    do_badcol = len(darks) > 0 and not calibjobs['accounted_for']['badcolumns']
1✔
689
    do_cte = have_flats_for_cte and not calibjobs['accounted_for']['ctecorrnight']
1✔
690

691
    ## if do badcol or cte, then submit a ccdcalib job, otherwise submit a
692
    ## nightlybias job
693
    if do_badcol or do_cte:
1✔
694
        ######## Submit ccdcalib ########
695
        ## process dark for bad columns even if we don't have zeros for nightlybias
696
        ## ccdcalib = nightlybias(zeros) + badcol(dark) + cte correction
697
        jobdesc = 'ccdcalib'
1✔
698

699
        if calibjobs[jobdesc] is None:
1✔
700
            ## Define which erow to use to create the processing table row
701
            all_expids = []
1✔
702
            if do_badcol:
1✔
703
                ## first exposure is a 300s dark
704
                job_erow = darks[0]
1✔
705
                all_expids.append(job_erow['EXPID'])
1✔
706
            else:
707
                job_erow = ctes[-1]
1✔
708
            ## if doing cte correction, create expid list of last 120s flat
709
            ## and all ctes provided by the calibration selection function
710
            if do_cte:
1✔
711
                cte_expids = np.array([flats[-1]['EXPID'], *ctes['EXPID']])
1✔
712
                all_expids.extend(cte_expids)
1✔
713
            else:
714
                cte_expids = None
1✔
715

716
            prow, int_id = make_exposure_prow(job_erow, int_id,
1✔
717
                                              calibjobs, jobdesc=jobdesc)
718
            if len(all_expids) > 1:
1✔
719
                prow['EXPID'] = np.array(all_expids)
1✔
720

721
            prow['CALIBRATOR'] = 1
1✔
722

723
            extra_job_args = {'nightlybias': do_bias,
1✔
724
                              'nightlycte': do_cte,
725
                              'cte_expids': cte_expids}
726
            prow, ptable = create_submit_add_and_save(prow, ptable,
1✔
727
                                                      extra_job_args=extra_job_args)
728
            calibjobs[prow['JOBDESC']] = prow.copy()
1✔
729
            log.info(f"Submitted ccdcalib job with {do_bias=}, "
1✔
730
                     + f"{do_badcol=}, {do_cte=}")
731
    elif do_bias:
1✔
732
        log.info("\nNo dark or cte found. Submitting nightlybias before "
1✔
733
                 "processing exposures.\n")
734
        prow = erow_to_prow(zeros[0])
1✔
735
        prow['EXPID'] = np.array([])
1✔
736
        prow['INTID'] = int_id
1✔
737
        int_id += 1
1✔
738
        prow['JOBDESC'] = 'nightlybias'
1✔
739
        prow['CALIBRATOR'] = 1
1✔
740
        cams = set(decode_camword('a0123456789'))
1✔
741
        for zero in zeros:
1✔
742
            if 'calib' in zero['PROGRAM']:
1✔
743
                proccamword = difference_camwords(zero['CAMWORD'],
1✔
744
                                                  zero['BADCAMWORD'])
745
                cams = cams.intersection(set(decode_camword(proccamword)))
1✔
746
        prow['PROCCAMWORD'] = create_camword(list(cams))
1✔
747
        prow = define_and_assign_dependency(prow, calibjobs)
1✔
748
        prow, ptable = create_submit_add_and_save(prow, ptable)
1✔
749
        calibjobs[prow['JOBDESC']] = prow.copy()
1✔
750
        log.info("Performed nightly bias as no dark or cte passed cuts.")
1✔
751

752
    if do_bias:
1✔
753
        calibjobs['accounted_for']['biasnight'] = True
1✔
754
    if do_badcol:
1✔
755
        calibjobs['accounted_for']['badcolumns'] = True
1✔
756
    if do_cte:
1✔
757
        calibjobs['accounted_for']['ctecorrnight'] = True
1✔
758

759
    ######## Submit arcs and psfnight ########
760
    if len(arcs)>0 and not calibjobs['accounted_for']['psfnight']:
1✔
761
        arc_prows = []
1✔
762
        for arc_erow in arcs:
1✔
763
            if arc_erow['EXPID'] in processed_cal_expids:
1✔
UNCOV
764
                matches = np.where([arc_erow['EXPID'] in itterprow['EXPID']
×
765
                                    for itterprow in ptable])[0]
UNCOV
766
                if len(matches) == 1:
×
UNCOV
767
                    prow = ptable[matches[0]]
×
UNCOV
768
                    log.info("Found existing arc prow in ptable, " 
×
769
                             + f"including it for psfnight job: {list(prow)}")
UNCOV
770
                    arc_prows.append(prow)
×
UNCOV
771
                continue
×
772
            prow, int_id = make_exposure_prow(arc_erow, int_id, calibjobs)
1✔
773
            prow, ptable = create_submit_add_and_save(prow, ptable)
1✔
774
            arc_prows.append(prow)
1✔
775

776
        joint_prow, int_id = make_joint_prow(arc_prows, descriptor='psfnight',
1✔
777
                                             internal_id=int_id)
778
        ptable = set_calibrator_flag(arc_prows, ptable)
1✔
779
        joint_prow, ptable = create_submit_add_and_save(joint_prow, ptable)
1✔
780
        calibjobs[joint_prow['JOBDESC']] = joint_prow.copy()
1✔
781
        calibjobs['accounted_for']['psfnight'] = True
1✔
782

783

784
    ######## Submit flats and nightlyflat ########
785
    ## If nightlyflat defined we don't need to process more normal flats
786
    if len(flats) > 0 and not calibjobs['accounted_for']['fiberflatnight']:
1✔
787
        flat_prows = []
1✔
788
        for flat_erow in flats:
1✔
789
            if flat_erow['EXPID'] in processed_cal_expids:
1✔
UNCOV
790
                matches = np.where([flat_erow['EXPID'] in itterprow['EXPID']
×
791
                                    for itterprow in ptable])[0]
UNCOV
792
                if len(matches) == 1:
×
UNCOV
793
                    prow = ptable[matches[0]]
×
UNCOV
794
                    log.info("Found existing flat prow in ptable, " 
×
795
                             + f"including it for nightlyflat job: {list(prow)}")
UNCOV
796
                    flat_prows.append(prow)
×
UNCOV
797
                continue
×
798

799
            jobdesc = 'flat'
1✔
800
            prow, int_id = make_exposure_prow(flat_erow, int_id, calibjobs,
1✔
801
                                              jobdesc=jobdesc)
802
            prow, ptable = create_submit_add_and_save(prow, ptable)
1✔
803
            flat_prows.append(prow)
1✔
804

805
        joint_prow, int_id = make_joint_prow(flat_prows, descriptor='nightlyflat',
1✔
806
                                             internal_id=int_id)
807
        ptable = set_calibrator_flag(flat_prows, ptable)
1✔
808
        if 'nightlyflat' in cal_override:
1✔
809
            extra_args = cal_override['nightlyflat']
1✔
810
        else:
811
            extra_args = None
1✔
812
        joint_prow, ptable = create_submit_add_and_save(joint_prow, ptable,
1✔
813
                                                        extra_job_args=extra_args)
814
        calibjobs[joint_prow['JOBDESC']] = joint_prow.copy()
1✔
815
        calibjobs['accounted_for']['fiberflatnight'] = True
1✔
816
        
817
    ######## Submit cte flats ########
818
    jobdesc = 'flat'
1✔
819
    for cte_erow in ctes:
1✔
820
        if cte_erow['EXPID'] in processed_cal_expids:
1✔
UNCOV
821
            continue
×
822
        prow, int_id = make_exposure_prow(cte_erow, int_id, calibjobs,
1✔
823
                                      jobdesc=jobdesc)
824
        prow, ptable = create_submit_add_and_save(prow, ptable)
1✔
825
            
826
    return ptable, calibjobs, int_id
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc