• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

desihub / desispec / 8397945773

23 Mar 2024 12:22AM UTC coverage: 28.132% (+3.0%) from 25.113%
8397945773

Pull #2187

github

akremin
bug fixes in old sub scripts
Pull Request #2187: Introduce desi_proc_night to unify and simplify processing scripts

768 of 1152 new or added lines in 21 files covered. (66.67%)

1071 existing lines in 14 files now uncovered.

13184 of 46864 relevant lines covered (28.13%)

0.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.58
/py/desispec/scripts/proc_night.py
1
"""
2
desispec.scripts.proc_night
3
=============================
4

5
"""
6
from desispec.io import findfile
1✔
7
from desispec.scripts.link_calibnight import derive_include_exclude
1✔
8
from desispec.workflow.calibration_selection import \
1✔
9
    determine_calibrations_to_proc
10
from desispec.workflow.science_selection import determine_science_to_proc, \
1✔
11
    get_tiles_cumulative
12
from desiutil.log import get_logger
1✔
13
import numpy as np
1✔
14
import os
1✔
15
import sys
1✔
16
import time
1✔
17
import re
1✔
18
from socket import gethostname
1✔
19
from astropy.table import Table, vstack
1✔
20

21
## Import some helper functions, you can see their definitions by uncomenting the bash shell command
22
from desispec.scripts.update_exptable import update_exposure_table
1✔
23
from desispec.workflow.tableio import load_tables, write_table
1✔
24
from desispec.workflow.utils import sleep_and_report, \
1✔
25
    verify_variable_with_environment, load_override_file
26
from desispec.workflow.timing import what_night_is_it, during_operating_hours
1✔
27
from desispec.workflow.exptable import get_last_step_options
1✔
28
from desispec.workflow.proctable import default_obstypes_for_proctable, \
1✔
29
    erow_to_prow, default_prow
30
from desispec.workflow.processing import define_and_assign_dependency, \
1✔
31
    create_and_submit, \
32
    submit_tilenight_and_redshifts, \
33
    generate_calibration_dict, \
34
    night_to_starting_iid, make_joint_prow, \
35
    set_calibrator_flag, make_exposure_prow, \
36
update_calibjobs_with_linking, all_calibs_submitted
37
from desispec.workflow.queue import update_from_queue, any_jobs_failed
1✔
38
from desispec.io.util import decode_camword, difference_camwords, \
1✔
39
    create_camword, replace_prefix
40

41

42
def proc_night(night=None, proc_obstypes=None, z_submit_types=None,
1✔
43
               queue=None, reservation=None, system_name=None,
44
               exp_table_pathname=None, proc_table_pathname=None,
45
               override_pathname=None, update_exptable=False,
46
               dry_run_level=0, dry_run=False, no_redshifts=False,
47
               ignore_proc_table_failures = False,
48
               dont_check_job_outputs=False, dont_resubmit_partial_jobs=False,
49
               tiles=None, surveys=None, science_laststeps=None,
50
               all_tiles=False, specstatus_path=None, use_specter=False,
51
               no_cte_flats=False, complete_tiles_thrunight=None,
52
               all_cumulatives=False, daily=False, specprod=None,
53
               path_to_data=None, exp_obstypes=None, camword=None,
54
               badcamword=None, badamps=None, exps_to_ignore=None,
55
               sub_wait_time=0.5, verbose=False, dont_require_cals=False,
56
               psf_linking_without_fflat=False):
57
    """
58
    Process some or all exposures on a night. Can be used to process an entire
59
    night, or used to process data currently available on a given night using
60
    the '--daily' flag.
61

62
    Args:
63
        night (int): The night of data to be processed. Exposure table must exist.
64
        proc_obstypes (list or np.array, optional): A list of exposure OBSTYPE's
65
            that should be processed (and therefore added to the processing table).
66
        z_submit_types (list of str):
67
            The "group" types of redshifts that should be submitted with each
68
            exposure. If not specified, default for daily processing is
69
            ['cumulative', 'pernight-v0']. If false, 'false', or [], then no
70
            redshifts are submitted.
71
        queue (str, optional): The name of the queue to submit the jobs to.
72
            Default is "realtime".
73
        reservation (str, optional): The reservation to submit jobs to.
74
            If None, it is not submitted to a reservation.
75
        system_name (str): batch system name, e.g. cori-haswell, cori-knl,
76
            perlmutter-gpu
77
        exp_table_pathname (str): Full path to where to exposure tables are stored,
78
            including file name.
79
        proc_table_pathname (str): Full path to where to processing tables to be
80
            written, including file name
81
        override_pathname (str): Full path to the override file.
82
        update_exptable (bool): If true then the exposure table is updated.
83
            The default is False.
84
        dry_run_level (int, optional): If nonzero, this is a simulated run.
85
            If dry_run_level=1 the scripts will be written but not submitted.
86
            If dry_run_level=2, the scripts will not be written nor submitted
87
            but the processing_table is still created.
88
            If dry_run_level=3, no output files are written.
89
            Logging will remain the same for testing as though scripts are
90
            being submitted. Default is 0 (false).
91
        dry_run (bool, optional): When to run without submitting scripts or
92
            not. If dry_run_level is defined, then it over-rides this flag.
93
            dry_run_level not set and dry_run=True, dry_run_level is set to 2
94
            (no scripts generated or run). Default for dry_run is False.
95
        no_redshifts (bool, optional): Whether to submit redshifts or not.
96
            If True, redshifts are not submitted.
97
        ignore_proc_table_failures (bool, optional): True if you want to submit
98
            other jobs even the loaded processing table has incomplete jobs in
99
            it. Use with caution. Default is False.
100
        dont_check_job_outputs (bool, optional): Default is False. If False,
101
            the code checks for the existence of the expected final data
102
            products for the script being submitted. If all files exist and
103
            this is False, then the script will not be submitted. If some
104
            files exist and this is False, only the subset of the cameras
105
            without the final data products will be generated and submitted.
106
        dont_resubmit_partial_jobs (bool, optional): Default is False. Must be
107
            used with dont_check_job_outputs=False. If this flag is False, jobs
108
            with some prior data are pruned using PROCCAMWORD to only process
109
            the remaining cameras not found to exist.
110
        tiles (array-like, optional): Only submit jobs for these TILEIDs.
111
        surveys (array-like, optional): Only submit science jobs for these
112
            surveys (lowercase)
113
        science_laststeps (array-like, optional): Only submit jobs for exposures
114
            with LASTSTEP in these science_laststeps (lowercase)
115
        all_tiles (bool, optional): Default is False. Set to NOT restrict to
116
            completed tiles as defined by the table pointed to by specstatus_path.
117
        specstatus_path (str, optional): Default is
118
            $DESI_SURVEYOPS/ops/tiles-specstatus.ecsv. Location of the
119
            surveyops specstatus table.
120
        use_specter (bool, optional): Default is False. If True, use specter,
121
            otherwise use gpu_specter by default.
122
        no_cte_flats (bool, optional): Default is False. If False, cte flats
123
            are used if available to correct for cte effects.
124
        complete_tiles_thrunight (int, optional): Default is None. Only tiles
125
            completed on or before the supplied YYYYMMDD are considered
126
            completed and will be processed. All complete tiles are submitted
127
            if None or all_tiles is True.
128
        all_cumulatives (bool, optional): Default is False. Set to run
129
            cumulative redshifts for all tiles even if the tile has observations
130
            on a later night.
131
        specprod: str. The name of the current production. If used, this will
132
            overwrite the SPECPROD environment variable.
133
        daily: bool. Flag that sets other flags for running this script for the
134
            daily pipeline.
135
        path_to_data: str. Path to the raw data.
136
        exp_obstypes: str or comma separated list of strings. The exposure
137
            OBSTYPE's that you want to include in the exposure table.
138
        camword: str. Camword that, if set, alters the set of cameras that will
139
            be set for processing. Examples: a0123456789, a1, a2b3r3,
140
            a2b3r4z3. Note this is only true for new exposures being
141
            added to the exposure_table in 'daily' mode.
142
        badcamword: str. Camword that, if set, will be removed from the camword
143
            defined in camword if given, or the camword inferred from
144
            the data if camword is not given. Note this is only true
145
            for new exposures being added to the exposure_table
146
            in 'daily' mode.
147
        badamps: str. Comma seperated list of bad amplifiers that should not
148
            be processed. Should be of the form "{camera}{petal}{amp}",
149
            i.e. "[brz][0-9][ABCD]". Example: 'b7D,z8A'. Note this is
150
            only true for new exposures being added to the
151
            exposure_table in 'daily' mode.
152
        sub_wait_time: int. Wait time in seconds between submission loops.
153
            Default 0.5 seconds.
154
        verbose: bool. True if you want more verbose output, false otherwise.
155
            Current not propagated to lower code, so it is only used in the
156
            main daily_processing script itself.
157
        dont_require_cals: bool. Default False. If set then the code doesn't
158
            require either a valid set of calibrations or a valid override file
159
            to link to calibrations in order to proceed with science processing.
160
        psf_linking_without_fflat: bool. Default False. If set then the code
161
            will NOT raise an error if asked to link psfnight calibrations
162
            without fiberflatnight calibrations.
163
    """
164
    ## Get logger
165
    log = get_logger()
1✔
166
    log.info(f'----- Processing {night} at {time.asctime()} -----')
1✔
167
    log.info(f"SLURM_JOB_ID={os.getenv('SLURM_JOB_ID')} on {gethostname()}")
1✔
168

169
    ## Inform user of how some parameters will be used
170
    if camword is not None:
1✔
NEW
171
        log.info(f"Note custom {camword=} will only be used for new exposures"
×
172
                 f" being entered into the exposure_table, not all exposures"
173
                 f" to be processed.")
174
    if badcamword is not None:
1✔
NEW
175
        log.info(f"Note custom {badcamword=} will only be used for new exposures"
×
176
                 f" being entered into the exposure_table, not all exposures"
177
                 f" to be processed.")
178
    if badamps is not None:
1✔
NEW
179
        log.info(f"Note custom {badamps=} will only be used for new exposures"
×
180
                 f" being entered into the exposure_table, not all exposures"
181
                 f" to be processed.")
182

183
    ## Reconcile the dry_run and dry_run_level
184
    if dry_run and dry_run_level == 0:
1✔
NEW
185
        dry_run_level = 2
×
186
    elif dry_run_level > 0:
1✔
187
        dry_run = True
1✔
188

189
    ## Set a flag to determine whether to process the last tile in the exposure table
190
    ## or not. This is used in daily mode when processing and exiting mid-night.
191
    still_acquiring = False
1✔
192
    
193
    ## If running in daily mode, change a bunch of defaults
194
    if daily:
1✔
195
        ## What night are we running on?
NEW
196
        true_night = what_night_is_it()
×
NEW
197
        if night is not None:
×
NEW
198
            night = int(night)
×
NEW
199
            if true_night != night:
×
NEW
200
                log.info(f"True night is {true_night}, but running for {night=}")
×
201
        else:
NEW
202
            night = true_night
×
203

NEW
204
        if science_laststeps is None:
×
NEW
205
            science_laststeps = ['all', 'skysub', 'fluxcal']
×
206

NEW
207
        if z_submit_types is None and not no_redshifts:
×
NEW
208
            z_submit_types = ['cumulatives']
×
209

NEW
210
        if during_operating_hours(dry_run=dry_run) and (true_night == night):
×
NEW
211
            still_acquiring = True
×
212

NEW
213
        update_exptable = True    
×
NEW
214
        append_to_proc_table = True
×
NEW
215
        all_cumulatives = True
×
NEW
216
        all_tiles = True
×
NEW
217
        complete_tiles_thrunight = None
×
218
        ## Default for nightly processing is realtime queue
NEW
219
        if queue is None:
×
NEW
220
            queue = 'realtime'
×
221

222
    ## Default for normal processing is regular queue
223
    if queue is None:
1✔
224
        queue = 'regular'
1✔
225
    log.info(f"Submitting to the {queue} queue.")
1✔
226
             
227
    ## Set night
228
    if night is None:
1✔
NEW
229
        err = "Must specify night unless running in daily=True mode"
×
NEW
230
        log.error(err)
×
NEW
231
        raise ValueError(err)
×
232
    else:
233
        log.info(f"Processing {night=}")
1✔
234

235
    ## Recast booleans from double negative
236
    check_for_outputs = (not dont_check_job_outputs)
1✔
237
    resubmit_partial_complete = (not dont_resubmit_partial_jobs)
1✔
238
    require_cals = (not dont_require_cals)
1✔
239
    do_cte_flats = (not no_cte_flats)
1✔
240
    
241
    ## cte flats weren't available before 20211130 so hardcode that in
242
    if do_cte_flats and night < 20211130:
1✔
NEW
243
        log.warning("Asked to do cte flat correction but before 20211130 no "
×
244
                    + "no cte flats are available to do the correction. "
245
                    + "Code will NOT perform cte flat corrections.")
NEW
246
        do_cte_flats = False
×
247

248
    ###################
249
    ## Set filenames ##
250
    ###################
251
    ## Ensure specprod is set in the environment and that it matches user
252
    ## specified value if given
253
    specprod = verify_variable_with_environment(specprod, var_name='specprod',
1✔
254
                                                env_name='SPECPROD')
255

256
    ## Determine where the exposure table will be written
257
    if exp_table_pathname is None:
1✔
258
        exp_table_pathname = findfile('exposure_table', night=night)
1✔
259
    if not os.path.exists(exp_table_pathname) and not update_exptable:
1✔
NEW
260
        raise IOError(f"Exposure table: {exp_table_pathname} not found. Exiting this night.")
×
261

262
    ## Determine where the processing table will be written
263
    if proc_table_pathname is None:
1✔
264
        proc_table_pathname = findfile('processing_table', night=night)
1✔
265
    proc_table_path = os.path.dirname(proc_table_pathname)
1✔
266
    if dry_run_level < 3:
1✔
267
        os.makedirs(proc_table_path, exist_ok=True)
1✔
268

269
    ## Determine where the unprocessed data table will be written
270
    unproc_table_pathname = replace_prefix(proc_table_pathname, 'processing', 'unprocessed')
1✔
271

272
    ## Require cal_override to exist if explcitly specified
273
    if override_pathname is None:
1✔
274
        override_pathname = findfile('override', night=night)
1✔
NEW
275
    elif not os.path.exists(override_pathname):
×
NEW
276
        raise IOError(f"Specified override file: "
×
277
                      f"{override_pathname} not found. Exiting this night.")
278

279
    #######################################
280
    ## Define parameters based on inputs ##
281
    #######################################
282
    ## If science_laststeps not defined, default is only LASTSTEP=='all' exposures
283
    if science_laststeps is None:
1✔
284
        science_laststeps = ['all']
1✔
285
    else:
NEW
286
        laststep_options = get_last_step_options()
×
NEW
287
        for laststep in science_laststeps:
×
NEW
288
            if laststep not in laststep_options:
×
NEW
289
                raise ValueError(f"Couldn't understand laststep={laststep} "
×
290
                                 + f"in science_laststeps={science_laststeps}.")
291
    log.info(f"Processing exposures with the following LASTSTEP's: {science_laststeps}")
1✔
292

293
    ## Define the group types of redshifts you want to generate for each tile
294
    if no_redshifts:
1✔
NEW
295
        log.info(f"no_redshifts set, so ignoring {z_submit_types=}")
×
NEW
296
        z_submit_types = None
×
297

298
    if z_submit_types is None:
1✔
299
        log.info("Not submitting scripts for redshift fitting")
1✔
300
    else:
301
        for ztype in z_submit_types:
1✔
302
            if ztype not in ['cumulative', 'pernight-v0', 'pernight', 'perexp']:
1✔
NEW
303
                raise ValueError(f"Couldn't understand ztype={ztype} "
×
304
                                 + f"in z_submit_types={z_submit_types}.")
305
        log.info(f"Redshift fitting with redshift group types: {z_submit_types}")
1✔
306

307
    ## Identify OBSTYPES to process
308
    if proc_obstypes is None:
1✔
309
        proc_obstypes = default_obstypes_for_proctable()
1✔
310

311
    #############################
312
    ## Start the Actual Script ##
313
    #############################
314
    ## If running in daily mode, or requested, then update the exposure table
315
    ## This reads in and writes out the exposure table to disk
316
    if update_exptable:
1✔
NEW
317
        log.info("Running update_exposure_table.")
×
NEW
318
        update_exposure_table(night=night, specprod=specprod,
×
319
                              exp_table_pathname=exp_table_pathname,
320
                              path_to_data=path_to_data, exp_obstypes=exp_obstypes,
321
                              camword=camword, badcamword=badcamword, badamps=badamps,
322
                              exps_to_ignore=exps_to_ignore,
323
                              dry_run_level=dry_run_level, verbose=verbose)
NEW
324
        log.info("Done with update_exposure_table.\n\n")
×
325
    ## Combine the table names and types for easier passing to io functions
326
    table_pathnames = [exp_table_pathname, proc_table_pathname]
1✔
327
    table_types = ['exptable', 'proctable']
1✔
328

329
    ## Load in the files defined above
330
    etable, ptable = load_tables(tablenames=table_pathnames, tabletypes=table_types)
1✔
331
    full_etable = etable.copy()
1✔
332

333
    ## Cut on OBSTYPES
334
    log.info(f"Processing the following obstypes: {proc_obstypes}")
1✔
335
    good_types = np.isin(np.array(etable['OBSTYPE']).astype(str), proc_obstypes)
1✔
336
    etable = etable[good_types]
1✔
337

338
    ## Update processing table
339
    tableng = len(ptable)
1✔
340
    if tableng > 0:
1✔
NEW
341
        ptable = update_from_queue(ptable, dry_run=dry_run_level)
×
NEW
342
        if dry_run_level < 3:
×
NEW
343
            write_table(ptable, tablename=proc_table_pathname)
×
NEW
344
        if any_jobs_failed(ptable['STATUS']):
×
NEW
345
            if not ignore_proc_table_failures:
×
NEW
346
                err = "Some jobs have an incomplete job status. This script " \
×
347
                      + "will not fix them. You should remedy those first. "
NEW
348
                log.error(err)
×
349
                ## if the failures are in calibrations, then crash since
350
                ## we need them for any new jobs
NEW
351
                if any_jobs_failed(ptable['STATUS'][ptable['CALIBRATOR'] > 0]):
×
NEW
352
                    err += "To proceed anyway use "
×
NEW
353
                    err += "'--ignore-proc-table-failures'. Exiting."
×
354
                    raise AssertionError(err)
355
            else:
NEW
356
                log.warning("Some jobs have an incomplete job status, but "
×
357
                      + "you entered '--ignore-proc-table-failures'. This "
358
                      + "script will not fix them. "
359
                      + "You should have fixed those first. Proceeding...")
NEW
360
        if np.sum(ptable['OBSTYPE']=='science')>0:
×
NEW
361
            ptable_expids = set(np.unique(np.concatenate(
×
362
                                ptable['EXPID'][ptable['OBSTYPE']=='science']
363
                            )))
364
        else:
NEW
365
            ptable_expids = set()
×
NEW
366
        etable_expids = set(etable['EXPID'][etable['OBSTYPE']=='science'])
×
NEW
367
        if len(etable_expids.difference(ptable_expids)) == 0:
×
NEW
368
            log.info("All science EXPID's already present in processing table, "
×
369
                     + "nothing to run. Exiting")
NEW
370
            return ptable
×
NEW
371
        int_id = np.max(ptable['INTID'])+1
×
372
    else:
373
        int_id = night_to_starting_iid(night=night)
1✔
374

375
    ################### Determine What to Process ###################
376
    ## Load calibration_override_file
377
    overrides = load_override_file(filepathname=override_pathname)
1✔
378
    cal_override = {}
1✔
379
    if 'calibration' in overrides:
1✔
380
        cal_override = overrides['calibration']
1✔
381

382
    ## Determine calibrations that will be linked
383
    if 'linkcal' in cal_override:
1✔
384
        files_to_link, files_not_linked = None, None
1✔
385
        if 'include' in  cal_override['linkcal']:
1✔
386
            files_to_link = cal_override['linkcal']['include']
1✔
387
        if 'exclude' in  cal_override['linkcal']:
1✔
NEW
388
            files_not_linked = cal_override['linkcal']['exclude']
×
389
        files_to_link, files_not_linked = derive_include_exclude(files_to_link,
1✔
390
                                                                 files_not_linked)
391
        ## Fiberflatnights need to be generated with psfs from same time, so
392
        ## can't link psfs without also linking fiberflatnight
393
        if 'psfnight' in files_to_link and not 'fiberflatnight' in files_to_link \
1✔
394
                and not psf_linking_without_fflat:
395
            err = "Must link fiberflatnight if linking psfnight"
1✔
396
            log.error(err)
1✔
397
            raise ValueError(err)
1✔
398
    else:
399
        files_to_link = set()
1✔
400

401
    ## Identify what calibrations have been done
402
    calibjobs = generate_calibration_dict(ptable, files_to_link)
1✔
403

404
    ## Determine the appropriate set of calibrations
405
    ## Only run if we haven't already linked or done fiberflatnight's
406
    cal_etable = None
1✔
407
    if not all_calibs_submitted(calibjobs['completed']):
1✔
408
        cal_etable = determine_calibrations_to_proc(etable,
1✔
409
                                                    do_cte_flats=do_cte_flats,
410
                                                    still_acquiring=still_acquiring)
411

412
    ## Determine the appropriate science exposures
413
    sci_etable, tiles_to_proc = determine_science_to_proc(
1✔
414
                                        etable=etable, tiles=tiles,
415
                                        surveys=surveys, laststeps=science_laststeps,
416
                                        processed_tiles=np.unique(ptable['TILEID']),
417
                                        all_tiles=all_tiles,
418
                                        ignore_last_tile=still_acquiring,
419
                                        complete_tiles_thrunight=complete_tiles_thrunight,
420
                                        specstatus_path=specstatus_path)
421

422
    ## For cumulative redshifts, identify tiles for which this is the last
423
    ## night that they were observed
424
    tiles_cumulative = get_tiles_cumulative(sci_etable, z_submit_types,
1✔
425
                                            all_cumulatives, night)
426

427
    ################### Process the data ###################
428
    ## Process Calibrations
429
    ## For now assume that a linkcal job links all files and we therefore
430
    ## don't need to submit anything more.
431
    def create_submit_add_and_save(prow, proctable, check_outputs=check_for_outputs,
1✔
432
                                   extra_job_args=None):
433
        log.info(f"\nProcessing: {prow}\n")
1✔
434
        prow = create_and_submit(prow, dry_run=dry_run_level, queue=queue,
1✔
435
                                 reservation=reservation,
436
                                 strictly_successful=True,
437
                                 check_for_outputs=check_outputs,
438
                                 resubmit_partial_complete=resubmit_partial_complete,
439
                                 system_name=system_name,
440
                                 use_specter=use_specter,
441
                                 extra_job_args=extra_job_args)
442
        ## Add the processing row to the processing table
443
        proctable.add_row(prow)
1✔
444
        if len(proctable) > 0 and dry_run_level < 3:
1✔
445
            write_table(proctable, tablename=proc_table_pathname)
1✔
446
        sleep_and_report(sub_wait_time,
1✔
447
                         message_suffix=f"to slow down the queue submission rate",
448
                         dry_run=dry_run, logfunc=log.info)
449
        return prow, proctable
1✔
450

451
    ## Actually process the calibrations
452
    ## Only run if we haven't already linked or done fiberflatnight's
453
    if not all_calibs_submitted(calibjobs['completed']):
1✔
454
        ptable, calibjobs, int_id = submit_calibrations(cal_etable, ptable,
1✔
455
                                                cal_override, calibjobs,
456
                                                int_id, night, files_to_link,
457
                                                create_submit_add_and_save)
458

459
    ## Require some minimal level of calibrations to process science exposures
460
    if require_cals and not all_calibs_submitted(calibjobs['completed']):
1✔
NEW
461
        err = (f"Required to have at least flat calibrations via override link"
×
462
               + f" or nightlyflat before proceeding.")
NEW
463
        log.error(err)
×
464
        ## If still acquiring new data in daily mode, don't exit with error code
465
        ## But do exit
NEW
466
        log.info(f'Stopping at {time.asctime()}\n')
×
NEW
467
        if still_acquiring:
×
NEW
468
            if len(ptable) > 0:
×
NEW
469
                processed = np.isin(full_etable['EXPID'],
×
470
                                    np.unique(np.concatenate(ptable['EXPID'])))
NEW
471
                unproc_table = full_etable[~processed]
×
472
            else:
NEW
473
                unproc_table = full_etable
×
474

NEW
475
            return ptable, unproc_table
×
476
        else:
NEW
477
            sys.exit(1)
×
478

479
    ## Process Sciences
480
    ## Loop over new tiles and process them
481
    for tile in tiles_to_proc:
1✔
482
        log.info(f'\n\n##################### {tile} #########################')
1✔
483

484
        ## Identify the science exposures for the given tile
485
        tile_etable = sci_etable[sci_etable['TILEID'] == tile]
1✔
486
        
487
        ## Should change submit_tilenight_and_redshifts to take erows
488
        ## but for now will remain backward compatible and use prows
489
        ## Create list of prows from selected etable rows
490
        sciences = []
1✔
491
        for erow in tile_etable:
1✔
492
            prow = erow_to_prow(erow)
1✔
493
            prow['INTID'] = int_id
1✔
494
            int_id += 1
1✔
495
            prow['JOBDESC'] = prow['OBSTYPE']
1✔
496
            prow = define_and_assign_dependency(prow, calibjobs)
1✔
497
            sciences.append(prow)
1✔
498
            
499
        # don't submit cumulative redshifts for lasttile if it isn't in tiles_cumulative
500
        if z_submit_types is None:
1✔
501
            cur_z_submit_types = None
1✔
502
        else:
503
            cur_z_submit_types = z_submit_types.copy()
1✔
504

505
        if ((z_submit_types is not None) and ('cumulative' in z_submit_types)
1✔
506
            and (tile not in tiles_cumulative)):
NEW
507
            cur_z_submit_types.remove('cumulative')
×
508

509
        ## No longer need to return sciences since this is always the
510
        ## full set of exposures, but will keep for now for backward
511
        ## compatibility
512
        extra_job_args = {}
1✔
513
        if 'science' in overrides and 'tilenight' in overrides['science']:
1✔
NEW
514
            extra_job_args = overrides['science']['tilenight']
×
515
        else:
516
            extra_job_args = {}
1✔
517

518
        extra_job_args['z_submit_types'] = cur_z_submit_types
1✔
519
        extra_job_args['laststeps'] = science_laststeps
1✔
520
        ptable, sciences, int_id = submit_tilenight_and_redshifts(
1✔
521
                                    ptable, sciences, calibjobs, int_id,
522
                                    dry_run=dry_run_level, queue=queue,
523
                                    reservation=reservation,
524
                                    strictly_successful=True,
525
                                    check_for_outputs=check_for_outputs,
526
                                    resubmit_partial_complete=resubmit_partial_complete,
527
                                    system_name=system_name,
528
                                    use_specter=use_specter,
529
                                    extra_job_args=extra_job_args)
530

531
        if len(ptable) > 0 and dry_run_level < 3:
1✔
532
            write_table(ptable, tablename=proc_table_pathname)
1✔
533

534
        sleep_and_report(sub_wait_time,
1✔
535
                         message_suffix=f"to slow down the queue submission rate",
536
                         dry_run=dry_run, logfunc=log.info)
537

538
        ## Flush the outputs
539
        sys.stdout.flush()
1✔
540
        sys.stderr.flush()
1✔
541

542
    ################### Wrap things up ###################
543
    unproc_table = None
1✔
544
    if len(ptable) > 0:
1✔
545
        ## All jobs now submitted, update information from job queue and save
546
        ptable = update_from_queue(ptable, dry_run=dry_run_level)
1✔
547
        if dry_run_level < 3:
1✔
548
            write_table(ptable, tablename=proc_table_pathname)
1✔
549
            ## Now that processing is complete, lets identify what we didn't process
550
            if len(ptable) > 0:
1✔
551
                processed = np.isin(full_etable['EXPID'], np.unique(np.concatenate(ptable['EXPID'])))
1✔
552
                unproc_table = full_etable[~processed]
1✔
553
            else:
NEW
554
                unproc_table = full_etable
×
555
            write_table(unproc_table, tablename=unproc_table_pathname)
1✔
NEW
556
    elif dry_run_level < 3 and len(full_etable) > 0:
×
557
        ## Done determining what not to process, so write out unproc file
NEW
558
        unproc_table = full_etable
×
NEW
559
        write_table(unproc_table, tablename=unproc_table_pathname)
×
560

561
    if dry_run_level >= 3:
1✔
562
        log.info(f"{dry_run_level=} so not saving outputs.")
1✔
563
        log.info(f"\n{full_etable=}")
1✔
564
        log.info(f"\nn{ptable=}")
1✔
565
        log.info(f"\n{unproc_table=}")
1✔
566

567
    if still_acquiring:
1✔
NEW
568
        log.info(f"Current submission of exposures "
×
569
                 + f"for {night=} are complete except for last tile at {time.asctime()}.\n\n\n\n")
570
    else:
571
        log.info(f"All done: Completed submission of exposures for night {night} at {time.asctime()}.\n")
1✔
572
        
573
    return ptable, unproc_table
1✔
574

575

576
def submit_calibrations(cal_etable, ptable, cal_override, calibjobs, int_id,
1✔
577
                        curnight, files_to_link, create_submit_add_and_save):
578
    log = get_logger()
1✔
579
    if len(ptable) > 0:
1✔
580
        ## we use this to check for individual jobs rather than combination
581
        ## jobs, so only check for scalar jobs where JOBDESC == OBSTYPE
582
        ## ex. dark, zero, arc, and flat
NEW
583
        explists = ptable['EXPID'][ptable['JOBDESC']==ptable['OBSTYPE']]
×
NEW
584
        processed_cal_expids = np.unique(np.concatenate(explists).astype(int))
×
585
    else:
586
        processed_cal_expids = np.array([]).astype(int)
1✔
587

588
    ######## Submit caliblink if requested ########
589

590
    if 'linkcal' in cal_override and calibjobs['linkcal'] is None:
1✔
591
        log.info("Linking calibration files listed in override files: "
1✔
592
                 + f"{files_to_link}")
593
        prow = default_prow()
1✔
594
        prow['INTID'] = int_id
1✔
595
        int_id += 1
1✔
596
        prow['JOBDESC'] = 'linkcal'
1✔
597
        prow['OBSTYPE'] = 'link'
1✔
598
        prow['CALIBRATOR'] = 1
1✔
599
        prow['NIGHT'] = curnight
1✔
600
        if 'refnight' in cal_override['linkcal']:
1✔
601
            refnight = int(cal_override['linkcal']['refnight'])
1✔
602
            prow = define_and_assign_dependency(prow, calibjobs, refnight=refnight)
1✔
603
        ## create dictionary to carry linking information
604
        linkcalargs = cal_override['linkcal']
1✔
605
        prow, ptable = create_submit_add_and_save(prow, ptable,
1✔
606
                                                  check_outputs=False,
607
                                                  extra_job_args=linkcalargs)
608
        calibjobs[prow['JOBDESC']] = prow.copy()
1✔
609
        calibjobs = update_calibjobs_with_linking(calibjobs, files_to_link)
1✔
610

611
    if len(cal_etable) == 0:
1✔
NEW
612
        return ptable, calibjobs, int_id
×
613

614
    ## Otherwise proceed with submitting the calibrations
615
    ## Define objects to process
616
    darks, flats, ctes, cte1s = list(), list(), list(), list()
1✔
617
    zeros = cal_etable[cal_etable['OBSTYPE']=='zero']
1✔
618
    arcs = cal_etable[cal_etable['OBSTYPE']=='arc']
1✔
619
    if 'dark' in cal_etable['OBSTYPE']:
1✔
620
        darks = cal_etable[cal_etable['OBSTYPE']=='dark']
1✔
621
    if 'flat' in cal_etable['OBSTYPE']:
1✔
622
        allflats = cal_etable[cal_etable['OBSTYPE']=='flat']
1✔
623
        is_cte = np.array(['cte' in prog.lower() for prog in allflats['PROGRAM']])
1✔
624
        flats = allflats[~is_cte]
1✔
625
        ctes = allflats[is_cte]
1✔
626
        cte1s = ctes[np.abs(ctes['EXPTIME']-1.)<0.1]
1✔
627

628
    do_bias = (len(zeros) > 0 and 'biasnight' not in files_to_link
1✔
629
               and not calibjobs['completed']['nightlybias'])
630
    do_badcol = len(darks) > 0 and 'badcolumns' not in files_to_link
1✔
631
    do_cte = (len(cte1s) > 0 and len(flats) > 0) and 'ctecorrnight' not in files_to_link
1✔
632
    ## If no dark or ctes, do the nightlybias
633
    if do_bias and not do_badcol and not do_cte and not calibjobs['completed']['ccdcalib']:
1✔
634
        log.info("\nNo dark or cte found. Submitting nightlybias before "
1✔
635
                 "processing exposures.\n")
636
        prow = erow_to_prow(zeros[0])
1✔
637
        prow['EXPID'] = np.array([])
1✔
638
        prow['INTID'] = int_id
1✔
639
        int_id += 1
1✔
640
        prow['JOBDESC'] = 'nightlybias'
1✔
641
        prow['CALIBRATOR'] = 1
1✔
642
        cams = set(decode_camword('a0123456789'))
1✔
643
        for zero in zeros:
1✔
644
            if 'calib' in zero['PROGRAM']:
1✔
645
                proccamword = difference_camwords(zero['CAMWORD'],
1✔
646
                                                  zero['BADCAMWORD'])
647
                cams = cams.intersection(set(decode_camword(proccamword)))
1✔
648
        prow['PROCCAMWORD'] = create_camword(list(cams))
1✔
649
        prow = define_and_assign_dependency(prow, calibjobs)
1✔
650
        prow, ptable = create_submit_add_and_save(prow, ptable)
1✔
651
        calibjobs[prow['JOBDESC']] = prow.copy()
1✔
652
        calibjobs['completed'][prow['JOBDESC']] = True
1✔
653
        log.info("Performed nightly bias as no dark or cte passed cuts.")
1✔
654
    elif (do_badcol or do_cte) and not calibjobs['completed']['ccdcalib']:
1✔
655
        ######## Submit ccdcalib ########
656
        ## process dark for bad columns even if we don't have zeros for nightlybias
657
        ## ccdcalib = nightlybias(zeros) + badcol(dark) + cte correction
658
        jobdesc = 'ccdcalib'
1✔
659

660
        if calibjobs[jobdesc] is None:
1✔
661
            ccdcalib_erows = []
1✔
662
            if do_badcol:
1✔
663
                ## first exposure is a 300s dark
664
                ccdcalib_erows.append(darks[0])
1✔
665
            if do_cte:
1✔
666
                cte_expids = []
1✔
667
                ## second exposure is a 1s cte flat
668
                ccdcalib_erows.append(cte1s[0])
1✔
669
                cte_expids.append(cte1s[0]['EXPID'])
1✔
670
                ## third exposure is a 120s flat
671
                ccdcalib_erows.append(flats[-1])
1✔
672
                cte_expids.append(flats[-1]['EXPID'])
1✔
673
                cte_expids = np.array(cte_expids)
1✔
674
            else:
675
                cte_expids = None
1✔
676
                
677
            prow, int_id = make_exposure_prow(ccdcalib_erows[0], int_id,
1✔
678
                                              calibjobs, jobdesc=jobdesc)
679
            if len(ccdcalib_erows) > 1:
1✔
680
                prow['EXPID'] = np.array([erow['EXPID'] for erow in ccdcalib_erows])
1✔
681

682
            prow['CALIBRATOR'] = 1
1✔
683

684
            extra_job_args = {'nightlybias': do_bias,
1✔
685
                              'nightlycte': do_cte,
686
                              'cte_expids': cte_expids}
687
            prow, ptable = create_submit_add_and_save(prow, ptable,
1✔
688
                                                      extra_job_args=extra_job_args)
689
            calibjobs[prow['JOBDESC']] = prow.copy()
1✔
690
            calibjobs['completed'][prow['JOBDESC']] = True
1✔
691
            log.info(f"Submitted ccdcalib job with {do_bias=}, "
1✔
692
                     + f"{do_badcol=}, {do_cte=}")
693

694
    ######## Submit arcs and psfnight ########
695
    if len(arcs)>0 and not calibjobs['completed']['psfnight']:
1✔
696
        arc_prows = []
1✔
697
        for arc_erow in arcs:
1✔
698
            if arc_erow['EXPID'] in processed_cal_expids:
1✔
NEW
699
                continue
×
700
            prow, int_id = make_exposure_prow(arc_erow, int_id, calibjobs)
1✔
701
            prow, ptable = create_submit_add_and_save(prow, ptable)
1✔
702
            arc_prows.append(prow)
1✔
703

704
        joint_prow, int_id = make_joint_prow(arc_prows, descriptor='psfnight',
1✔
705
                                             internal_id=int_id)
706
        ptable = set_calibrator_flag(arc_prows, ptable)
1✔
707
        joint_prow, ptable = create_submit_add_and_save(joint_prow, ptable)
1✔
708
        calibjobs[joint_prow['JOBDESC']] = joint_prow.copy()
1✔
709
        calibjobs['completed'][joint_prow['JOBDESC']] = True
1✔
710

711

712
    ######## Submit flats and nightlyflat ########
713
    ## If nightlyflat defined we don't need to process more normal flats
714
    if len(flats) > 0 and not calibjobs['completed']['nightlyflat']:
1✔
715
        flat_prows = []
1✔
716
        for flat_erow in flats:
1✔
717
            if flat_erow['EXPID'] in processed_cal_expids:
1✔
NEW
718
                continue
×
719

720
            jobdesc = 'flat'
1✔
721
            prow, int_id = make_exposure_prow(flat_erow, int_id, calibjobs,
1✔
722
                                              jobdesc=jobdesc)
723
            prow, ptable = create_submit_add_and_save(prow, ptable)
1✔
724
            flat_prows.append(prow)
1✔
725

726
        joint_prow, int_id = make_joint_prow(flat_prows, descriptor='nightlyflat',
1✔
727
                                             internal_id=int_id)
728
        ptable = set_calibrator_flag(flat_prows, ptable)
1✔
729
        joint_prow, ptable = create_submit_add_and_save(joint_prow, ptable)
1✔
730
        calibjobs[joint_prow['JOBDESC']] = joint_prow.copy()
1✔
731
        calibjobs['completed'][joint_prow['JOBDESC']] = True
1✔
732
        
733
    ######## Submit cte flats ########
734
    jobdesc = 'flat'
1✔
735
    for cte_erow in ctes:
1✔
736
        if cte_erow['EXPID'] in processed_cal_expids:
1✔
NEW
737
            continue
×
738
        prow, int_id = make_exposure_prow(cte_erow, int_id, calibjobs,
1✔
739
                                      jobdesc=jobdesc)
740
        prow, ptable = create_submit_add_and_save(prow, ptable)
1✔
741
            
742
    return ptable, calibjobs, int_id
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc