• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

desihub / desispec / 6884201030

15 Nov 2023 11:34PM UTC coverage: 24.35% (+0.1%) from 24.217%
6884201030

Pull #2141

github

weaverba137
add placeholder test for coadded spectrum
Pull Request #2141: Spectra to specutils

74 of 109 new or added lines in 1 file covered. (67.89%)

24 existing lines in 18 files now uncovered.

10927 of 44875 relevant lines covered (24.35%)

0.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/py/desispec/scripts/daily_processing.py
1
"""
2
desispec.scripts.daily_processing
3
=================================
4

5
"""
6
import numpy as np
×
7
import os
×
8
import sys
×
9
import time
×
10
from astropy.table import Table
×
11
import glob
×
12

13
## Import some helper functions, you can see their definitions by uncomenting the bash shell command
14
from desispec.workflow.tableio import load_tables, write_tables, write_table
×
15
from desispec.workflow.utils import verify_variable_with_environment, pathjoin, listpath, \
×
16
                                    get_printable_banner, sleep_and_report
17
from desispec.workflow.timing import during_operating_hours, what_night_is_it
×
18
from desispec.workflow.exptable import default_obstypes_for_exptable, get_exposure_table_column_defs, \
×
19
    get_exposure_table_path, get_exposure_table_name, summarize_exposure
20
from desispec.workflow.proctable import default_obstypes_for_proctable, \
×
21
                                        get_processing_table_path, \
22
                                        get_processing_table_name, \
23
                                        erow_to_prow, default_prow
24
from desispec.workflow.procfuncs import parse_previous_tables, flat_joint_fit, arc_joint_fit, get_type_and_tile, \
×
25
                                        science_joint_fit, define_and_assign_dependency, create_and_submit, \
26
                                        update_and_recurvsively_submit, checkfor_and_submit_joint_job, \
27
                                        submit_tilenight_and_redshifts
28
from desispec.workflow.queue import update_from_queue, any_jobs_not_complete
×
29
from desispec.io.util import difference_camwords, parse_badamps, validate_badamps
×
30

31
def daily_processing_manager(specprod=None, exp_table_path=None, proc_table_path=None, path_to_data=None,
×
32
                             expobstypes=None, procobstypes=None, z_submit_types=None, camword=None, badcamword=None,
33
                             badamps=None, override_night=None, tab_filetype='csv', queue='realtime',
34
                             exps_to_ignore=None, data_cadence_time=300, queue_cadence_time=1800,
35
                             exp_cadence_time=2,
36
                             dry_run_level=0, dry_run=False, no_redshifts=False, continue_looping_debug=False, dont_check_job_outputs=False,
37
                             dont_resubmit_partial_jobs=False, verbose=False, use_specter=False, use_tilenight=False):
38
    """
39
    Generates processing tables for the nights requested. Requires exposure tables to exist on disk.
40

41
    Args:
42
        specprod: str. The name of the current production. If used, this will overwrite the SPECPROD environment variable.
43
        exp_table_path: str. Full path to where to exposure tables are stored, WITHOUT the monthly directory included.
44
        proc_table_path: str. Full path to where to processing tables to be written.
45
        path_to_data: str. Path to the raw data.
46
        expobstypes: str or comma separated list of strings. The exposure OBSTYPE's that you want to include in the exposure table.
47
        procobstypes: str or comma separated list of strings. The exposure OBSTYPE's that you want to include in the processing table.
48
        z_submit_types: list of str's or comma separated list of string. The "group" types of redshifts that should be
49
                                       submitted with each exposure. If not specified, default for daily processing is
50
                                       ['cumulative', 'pernight-v0']. If false, 'false', or [], then no redshifts are submitted.
51
        camword: str. Camword that, if set, alters the set of cameras that will be set for processing.
52
                      Examples: a0123456789, a1, a2b3r3, a2b3r4z3.
53
        badcamword: str. Camword that, if set, will be removed from the camword defined in camword if given, or the camword
54
                         inferred from the data if camword is not given.
55
        badamps: str. Comma seperated list of bad amplifiers that should not be processed. Should be of the
56
                      form "{camera}{petal}{amp}", i.e. "[brz][0-9][ABCD]". Example: 'b7D,z8A'
57
        override_night: str or int. 8 digit night, e.g. 20200314, of data to run on. If None, it runs on the current night.
58
        tab_filetype: str. The file extension (without the '.') of the exposure and processing tables.
59
        queue: str. The name of the queue to submit the jobs to. Default is "realtime".
60
        exps_to_ignore: list. A list of exposure id's that should not be processed. Each should be an integer.
61
        data_cadence_time: int. Wait time in seconds between loops in looking for new data. Default is 30 seconds.
62
        queue_cadence_time: int. Wait time in seconds between loops in checking queue statuses and resubmitting failures. Default is 1800s.
63
        exp_cadence_time: int. Wait time in seconds between loops over each science exposure. Default 2.
64
        dry_run_level, int, If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
65
                      dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
66
                      for testing as though scripts are being submitted. Default is 0 (false).
67
        dry_run, bool. When to run without submitting scripts or not. If dry_run_level is defined, then it over-rides
68
                       this flag. dry_run_level not set and dry_run=True, dry_run_level is set to 2 (no scripts
69
                       generated or run). Default for dry_run is False.
70
        no_redshifts, bool. Whether to submit redshifts or not. If True, redshifts are not submitted.
71
        continue_looping_debug: bool. FOR DEBUG purposes only. Will continue looping in search of new data until the process
72
                                 is terminated. Default is False.
73
        dont_check_job_outputs, bool. Default is False. If False, the code checks for the existence of the expected final
74
                                 data products for the script being submitted. If all files exist and this is False,
75
                                 then the script will not be submitted. If some files exist and this is False, only the
76
                                 subset of the cameras without the final data products will be generated and submitted.
77
        dont_resubmit_partial_jobs, bool. Default is False. Must be used with dont_check_job_outputs=False. If this flag is
78
                                          False, jobs with some prior data are pruned using PROCCAMWORD to only process the
79
                                          remaining cameras not found to exist.
80
        verbose: bool. True if you want more verbose output, false otherwise. Current not propagated to lower code,
81
                       so it is only used in the main daily_processing script itself.
82
        use_specter, bool, optional. Default is False. If True, use specter, otherwise use gpu_specter by default.
83
        use_tilenight (bool, optional): Default is False. If True, use desi_proc_tilenight for prestdstar, stdstar,
84
                    and poststdstar steps for science exposures.
85

86
    Returns: Nothing
87

88
    Notes:
89
        Generates both exposure table and processing tables 'on the fly' and saves them at various checkpoints. These
90
        should be capable of being reloaded in case of interuption or accidental termination of the manager's process.
91
    """
92
    ## If not being done during operating hours, and we're not simulating data or running a catchup run, exit.
93
    if not during_operating_hours(dry_run=dry_run) and override_night is None:
×
94
        print("Not during operating hours, and not asked to perform a dry run or run on historic data. Exiting.")
×
95
        sys.exit(0)
×
96

97
    ## What night are we running on?
98
    true_night = what_night_is_it()
×
99
    if override_night is not None:
×
100
        night = int(override_night)
×
101
        print(f"True night is {true_night}, but running for night={night}")
×
102
    else:
103
        night = true_night
×
104

105
    if continue_looping_debug:
×
106
        print("continue_looping_debug is set. Will continue looking for new data and needs to be terminated by the user.")
×
107

108
    ## Recast booleans from double negative
109
    check_for_outputs = (not dont_check_job_outputs)
×
110
    resubmit_partial_complete = (not dont_resubmit_partial_jobs)
×
111

112
    ## Define the obstypes to process
113
    if procobstypes is None:
×
114
        procobstypes = default_obstypes_for_proctable()
×
115
    elif isinstance(procobstypes, str):
×
116
        procobstypes = procobstypes.split(',')
×
117

118
    ## Define the obstypes to save information for in the exposure table
119
    if expobstypes is None:
×
120
        expobstypes = default_obstypes_for_exptable()
×
121
    elif isinstance(expobstypes, str):
×
122
        expobstypes = expobstypes.split(',')
×
123

124
    ## Define the group types of redshifts you want to generate for each tile
125
    if no_redshifts:
×
126
        z_submit_types = None
×
127
    else:
128
        if z_submit_types is None:
×
129
            pass
×
130
        elif isinstance(z_submit_types, str):
×
131
            if z_submit_types.lower() == 'false':
×
132
                z_submit_types = None
×
133
            elif z_submit_types.lower() == 'none':
×
134
                z_submit_types = None
×
135
            else:
136
                z_submit_types = [ztype.strip().lower() for ztype in z_submit_types.split(',')]
×
137
                for ztype in z_submit_types:
×
138
                    if ztype not in ['cumulative', 'pernight-v0', 'pernight', 'perexp']:
×
139
                        raise ValueError(f"Couldn't understand ztype={ztype} in z_submit_types={z_submit_types}.")
×
140
        else:
141
            raise ValueError(f"Couldn't understand z_submit_types={z_submit_types}, type={type(z_submit_types)}.")
×
142

143
    if z_submit_types is None:
×
144
        print("Not submitting scripts for redshift fitting")
×
145
    else:
146
        print(f"Redshift fitting with redshift group types: {z_submit_types}")
×
147

148
    ## Reconcile the dry_run and dry_run_level
149
    if dry_run and dry_run_level == 0:
×
150
        dry_run_level = 2
×
151
    elif dry_run_level > 0:
×
152
        dry_run = True
×
153

154
    ## expobstypes must contain all the types used in processing
155
    for typ in procobstypes:
×
156
        if typ not in expobstypes:
×
157
            expobstypes.append(typ)
×
158

159
    ## Warn people if changing camword
160
    finalcamword = 'a0123456789'
×
161
    if camword is not None and badcamword is None:
×
162
        badcamword = difference_camwords(finalcamword,camword)
×
163
        finalcamword = camword
×
164
    elif camword is not None and badcamword is not None:
×
165
        finalcamword = difference_camwords(camword, badcamword)
×
166
        badcamword = difference_camwords('a0123456789', finalcamword)
×
167
    elif badcamword is not None:
×
168
        finalcamword = difference_camwords(finalcamword,badcamword)
×
169
    else:
170
        badcamword = ''
×
171

172
    if badcamword != '':
×
173
        ## Inform the user what will be done with it.
174
        print(f"Modifying camword of data to be processed with badcamword: {badcamword}. "+\
×
175
              f"Camword to be processed: {finalcamword}")
176

177
    ## Make sure badamps is formatted properly
178
    if badamps is None:
×
179
        badamps = ''
×
180
    else:
181
        badamps = validate_badamps(badamps)
×
182

183
    ## Define the set of exposures to ignore
184
    if exps_to_ignore is None:
×
185
        exps_to_ignore = set()
×
186
    else:
187
        exps_to_ignore = np.sort(np.array(exps_to_ignore).astype(int))
×
188
        print(f"\nReceived exposures to ignore: {exps_to_ignore}")
×
189
        exps_to_ignore = set(exps_to_ignore)
×
190

191
    ## Get context specific variable values
192
    colnames, coltypes, coldefaults = get_exposure_table_column_defs(return_default_values=True)
×
193

194
    ## Define where to find the data
195
    path_to_data = verify_variable_with_environment(var=path_to_data,var_name='path_to_data', env_name='DESI_SPECTRO_DATA')
×
196
    specprod = verify_variable_with_environment(var=specprod,var_name='specprod',env_name='SPECPROD')
×
197

198
    ## Define the naming scheme for the raw data
199
    ## Manifests (describing end of cals, etc.) don't have a data file, so search for those separately
200
    data_glob = os.path.join(path_to_data, str(night), '*', 'desi-*.fit*')
×
201
    manifest_glob = os.path.join(path_to_data, str(night), '*', 'manifest_*.json')
×
202

203
    ## Determine where the exposure table will be written
204
    if exp_table_path is None:
×
205
        exp_table_path = get_exposure_table_path(night=night, usespecprod=True)
×
206
    os.makedirs(exp_table_path, exist_ok=True)
×
207
    name = get_exposure_table_name(night=night, extension=tab_filetype)
×
208
    exp_table_pathname = pathjoin(exp_table_path, name)
×
209

210
    ## Determine where the processing table will be written
211
    if proc_table_path is None:
×
212
        proc_table_path = get_processing_table_path()
×
213
    os.makedirs(proc_table_path, exist_ok=True)
×
214
    name = get_processing_table_name(prodmod=night, extension=tab_filetype)
×
215
    proc_table_pathname = pathjoin(proc_table_path, name)
×
216

217
    ## Determine where the unprocessed data table will be written
218
    unproc_table_pathname = pathjoin(proc_table_path,name.replace('processing', 'unprocessed'))
×
219

220
    ## Combine the table names and types for easier passing to io functions
221
    table_pathnames = [exp_table_pathname, proc_table_pathname, unproc_table_pathname]
×
222
    table_types = ['exptable','proctable','unproctable']
×
223

224
    ## Load in the files defined above
225
    etable, ptable, unproc_table = load_tables(tablenames=table_pathnames, \
×
226
                                               tabletypes=table_types)
227

228
    ## Get relevant data from the tables
229
    all_exps = set(etable['EXPID'])
×
230
    arcs, flats, sciences, calibjobs, curtype, lasttype, \
×
231
    curtile, lasttile, internal_id = parse_previous_tables(etable, ptable, night)
232
    do_bias = ('bias' in procobstypes or 'dark' in procobstypes)
×
233

234
    ## While running on the proper night and during night hours,
235
    ## or doing a dry_run or override_night, keep looping
236
    while ( (night == what_night_is_it()) and during_operating_hours(dry_run=dry_run) ) or ( override_night is not None ):
×
237
        ## Get a list of new exposures that have been found
238
        print(f"\n\n\nPreviously known exposures: {all_exps}")
×
239
        data_exps = set(sorted([int(os.path.basename(os.path.dirname(fil))) for fil in glob.glob(data_glob)]))
×
240
        manifest_exps = set(sorted([int(os.path.basename(os.path.dirname(fil))) for fil in glob.glob(manifest_glob)]))
×
241
        located_exps = data_exps.union(manifest_exps)
×
242

243
        new_exps = located_exps.difference(all_exps)
×
244
        all_exps = located_exps # i.e. new_exps.union(all_exps)
×
245
        print(f"\nNew exposures: {new_exps}\n\n")
×
246

247
        ## If there aren't any new exps and there won't be more because we're running on an old night or simulating things, exit
248
        if (not continue_looping_debug) and ( override_night is not None ) and ( len(list(new_exps))==0 ):
×
249
            print("Terminating the search for new exposures because no new exposures are present and you have" + \
×
250
                  " override_night set without continue_looping_debug")
251
            break
×
252

253
        ## Loop over new exposures and process them as relevant to that type
254
        for exp in sorted(list(new_exps)):
×
255
            if verbose:
×
256
                print(get_printable_banner(str(exp)))
×
257
            else:
258
                print(f'\n\n##################### {exp} #########################')
×
259

260
            ## Open relevant raw data files to understand what we're dealing with
261
            erow = summarize_exposure(path_to_data, night, exp, expobstypes, colnames, coldefaults, verbosely=False)
×
262

263
            ## If there was an issue, continue. If it's a string summarizing the end of some sequence, use that info.
264
            ## If the exposure is assosciated with data, process that data.
265
            if erow is None:
×
266
                continue
×
267
            elif type(erow) is str:
×
268
                writeout = False
×
269
                if exp in exps_to_ignore:
×
270
                    print(f"Located {erow} in exposure {exp}, but the exposure was listed in the expids to ignore. Ignoring this.")
×
271
                elif erow == 'endofarcs' and calibjobs['psfnight'] is None and 'arc' in procobstypes:
×
272
                    print("\nLocated end of arc calibration sequence flag. Processing psfnight.\n")
×
273
                    ptable, calibjobs['psfnight'], internal_id = arc_joint_fit(ptable, arcs, internal_id, dry_run=dry_run_level, queue=queue)
×
274
                    writeout = True
×
275
                elif erow == 'endofflats' and calibjobs['nightlyflat'] is None and 'flat' in procobstypes:
×
276
                    print("\nLocated end of long flat calibration sequence flag. Processing nightlyflat.\n")
×
277
                    ptable, calibjobs['nightlyflat'], internal_id = flat_joint_fit(ptable, flats, internal_id, dry_run=dry_run_level, queue=queue)
×
278
                    writeout = True
×
279
                elif 'short' in erow and calibjobs['nightlyflat'] is None:
×
280
                    print("\nLocated end of short flat calibration flag. Removing flats from list for nightlyflat processing.\n")
×
281
                    flats = []
×
282
                if writeout and dry_run_level < 3:
×
283
                    write_tables([ptable], tablenames=[proc_table_pathname])
×
284
                    sleep_and_report(2, message_suffix=f"after joint fit", dry_run=dry_run)
×
285
                del writeout
×
286
                continue
×
287
            else:
288
                ## Else it's a real row so start processing it
UNCOV
289
                pass
×
290

291
            erow['BADCAMWORD'] = badcamword
×
292
            erow['BADAMPS'] = badamps
×
293
            unproc = False
×
294
            if exp in exps_to_ignore:
×
295
                print(f"\n{exp} given as exposure id to ignore. Not processing.")
×
296
                erow['LASTSTEP'] = 'ignore'
×
297
                # erow['EXPFLAG'] = np.append(erow['EXPFLAG'], )
298
                unproc = True
×
299
            elif erow['LASTSTEP'] == 'ignore':
×
300
                print(f"\n{exp} identified by the pipeline as something to ignore. Not processing.")
×
301
                unproc = True
×
302
            elif erow['OBSTYPE'] not in procobstypes:
×
303
                print(f"\n{erow['OBSTYPE']} not in obstypes to process: {procobstypes}. Not processing.")
×
304
                unproc = True
×
305
            elif str(erow['OBSTYPE']).lower() == 'arc' and float(erow['EXPTIME']) > 8.0:
×
306
                print("\nArc exposure with EXPTIME greater than 8s. Not processing.")
×
307
                unproc = True
×
308
            elif str(erow['OBSTYPE']).lower() == 'dark' and np.abs(float(erow['EXPTIME'])-300.) > 1:
×
309
                print("\nDark exposure with EXPTIME not consistent with 300s. Not processing.")
×
310
                unproc = True
×
311
            elif str(erow['OBSTYPE']).lower() == 'dark' and calibjobs['ccdcalib'] is not None:
×
312
                print("\nDark exposure found, but already proocessed dark with" +
×
313
                      f" expID {calibjobs['ccdcalib']['EXPID']}. Skipping this one.")
314
                unproc = True
×
315

316
            print(f"\nFound: {erow}")
×
317
            etable.add_row(erow)
×
318
            if unproc:
×
319
                unproc_table.add_row(erow)
×
320
                sleep_and_report(2, message_suffix=f"after exposure", dry_run=dry_run)
×
321
                if dry_run_level < 3:
×
322
                    write_tables([etable, unproc_table], tablenames=[exp_table_pathname, unproc_table_pathname])
×
323
                continue
×
324

325
            curtype,curtile = get_type_and_tile(erow)
×
326

327
            if lasttype is None and curtype != 'dark' and do_bias:
×
328
                print("\nNo dark found at the beginning of the night."
×
329
                      + "Submitting nightlybias before processing exposures.\n")
330
                prow = default_prow()
×
331
                prow['INTID'] = internal_id
×
332
                prow['OBSTYPE'] = 'zero'
×
333
                internal_id += 1
×
334
                prow['JOBDESC'] = 'nightlybias'
×
335
                prow['NIGHT'] = night
×
336
                prow['CALIBRATOR'] = 1
×
337
                prow['PROCCAMWORD'] = finalcamword
×
338
                prow = create_and_submit(prow, dry_run=dry_run_level,
×
339
                                         queue=queue,
340
                                         strictly_successful=True,
341
                                         check_for_outputs=check_for_outputs,
342
                                         resubmit_partial_complete=resubmit_partial_complete)
343
                calibjobs['nightlybias'] = prow.copy()
×
344
                ## Add the processing row to the processing table
345
                ptable.add_row(prow)
×
346
                ## Write out the processing table
347
                if dry_run_level < 3:
×
348
                    write_tables([ptable], tablenames=[proc_table_pathname])
×
349
                    sleep_and_report(2, message_suffix=f"after nightlybias",
×
350
                                     dry_run=dry_run)
351

352
            # if this is a new tile/obstype, proceed with submitting all of the jobs for the previous tile
353
            if lasttype is not None and ((curtype != lasttype) or (curtile != lasttile)):
×
354
                print("\nData for previous tile or obstype is complete. Running joint fits. "
×
355
                      + f"{curtype=}, {lasttype=}, {curtile=}, {lasttile=}\n")
356
                old_iid = internal_id
×
357
                # If done with science exposures for a tile and use_tilenight==True, use
358
                # submit_tilenight_and_redshifts, otherwise use checkfor_and_submit_joint_job
359
                if use_tilenight and lasttype == 'science' and len(sciences)>0:
×
360
                    ptable, sciences, internal_id \
×
361
                        = submit_tilenight_and_redshifts(ptable, sciences, calibjobs, lasttype, internal_id,
362
                                                        dry_run=dry_run_level,
363
                                                        queue=queue,
364
                                                        strictly_successful=True,
365
                                                        check_for_outputs=check_for_outputs,
366
                                                        resubmit_partial_complete=resubmit_partial_complete,
367
                                                        z_submit_types=z_submit_types,
368
                                                        use_specter=use_specter,
369
                                                        laststeps = ['all','fluxcalib','skysub'])
370
                else:
371
                    ptable, calibjobs, sciences, internal_id \
×
372
                        = checkfor_and_submit_joint_job(ptable, arcs, flats, sciences, calibjobs,
373
                                                        lasttype, internal_id,
374
                                                        dry_run=dry_run_level,
375
                                                        queue=queue,
376
                                                        strictly_successful=True,
377
                                                        check_for_outputs=check_for_outputs,
378
                                                        resubmit_partial_complete=resubmit_partial_complete,
379
                                                        z_submit_types=z_submit_types)
380

381
                ## if internal_id changed that means we submitted a joint job
382
                ## so lets write that out and pause
383
                if (internal_id > old_iid) and (dry_run_level < 3):
×
384
                    write_tables([ptable], tablenames=[proc_table_pathname])
×
385
                    sleep_and_report(2, message_suffix=f"after joint fit", dry_run=dry_run)
×
386
                del old_iid
×
387

388
            prow = erow_to_prow(erow)
×
389
            prow['INTID'] = internal_id
×
390
            internal_id += 1
×
391
            if prow['OBSTYPE'] == 'dark':
×
392
                prow['JOBDESC'] = 'ccdcalib'
×
393
            else:
394
                prow['JOBDESC'] = prow['OBSTYPE']
×
395
            prow = define_and_assign_dependency(prow, calibjobs)
×
396
            if (not use_tilenight) or erow['OBSTYPE'] != 'science':
×
397
                print(f"\nProcessing: {prow}\n")
×
398
                prow = create_and_submit(prow, dry_run=dry_run_level, queue=queue,
×
399
                                     strictly_successful=True, check_for_outputs=check_for_outputs,
400
                                     resubmit_partial_complete=resubmit_partial_complete,use_specter=use_specter)
401

402
                ## If processed a dark, assign that to the dark job
403
                if curtype == 'dark':
×
404
                    prow['CALIBRATOR'] = 1
×
405
                    calibjobs['ccdcalib'] = prow.copy()
×
406

407
                ## Add the processing row to the processing table
408
                ptable.add_row(prow)
×
409

410
            ## Note: Assumption here on number of flats
411
            if curtype == 'flat' and calibjobs['nightlyflat'] is None \
×
412
                    and int(erow['SEQTOT']) < 5 \
413
                    and np.abs(float(erow['EXPTIME'])-120.) < 1.:
414
                flats.append(prow)
×
415
            elif curtype == 'arc' and calibjobs['psfnight'] is None:
×
416
                arcs.append(prow)
×
417
            elif curtype == 'science' and (use_tilenight or prow['LASTSTEP'] != 'skysub'):
×
418
                sciences.append(prow)
×
419

420
            lasttile = curtile
×
421
            lasttype = curtype
×
422

423
            ## Flush the outputs
424
            sys.stdout.flush()
×
425
            sys.stderr.flush()
×
426

427
            if dry_run_level < 3:
×
428
                write_tables([etable, ptable], tablenames=[exp_table_pathname, proc_table_pathname])
×
429
            sleep_and_report(exp_cadence_time, message_suffix=f"after exposure", dry_run=dry_run)
×
430

431
        print("\nReached the end of current iteration of new exposures.")
×
432
        if override_night is not None and (not continue_looping_debug):
×
433
            print("\nOverride_night set, not waiting for new data before exiting.\n")
×
434
        else:
435
            sleep_and_report(data_cadence_time, message_suffix=f"before looking for more new data",
×
436
                            dry_run=(dry_run and ()))
437

438
        if len(ptable) > 0:
×
439
            ptable = update_from_queue(ptable, dry_run=dry_run_level)
×
440
            # ptable, nsubmits = update_and_recurvsively_submit(ptable,
441
            #                                                   ptab_name=proc_table_pathname, dry_run=dry_run_level)
442

443
            ## Exposure table doesn't change in the interim, so no need to re-write it to disk
444
            if dry_run_level < 3:
×
445
                write_table(ptable, tablename=proc_table_pathname)
×
446
            if override_night is None or continue_looping_debug:
×
447
                sleep_and_report(10, message_suffix=f"after updating queue information", dry_run=dry_run)
×
448

449
    ## Flush the outputs
450
    sys.stdout.flush()
×
451
    sys.stderr.flush()
×
452
    ## No more data coming in, so do bottleneck steps if any apply
453
    if use_tilenight and len(sciences)>0:
×
454
        ptable, sciences, internal_id \
×
455
            = submit_tilenight_and_redshifts(ptable, sciences, calibjobs, lasttype, internal_id,
456
                                            dry_run=dry_run_level,
457
                                            queue=queue,
458
                                            strictly_successful=True,
459
                                            check_for_outputs=check_for_outputs,
460
                                            resubmit_partial_complete=resubmit_partial_complete,
461
                                            z_submit_types=z_submit_types,
462
                                            use_specter=use_specter,
463
                                            laststeps = ['all','fluxcalib','skysub'])
464
    else:
465
        ptable, calibjobs, sciences, internal_id \
×
466
            = checkfor_and_submit_joint_job(ptable, arcs, flats, sciences, calibjobs,
467
                                            lasttype, internal_id,
468
                                            dry_run=dry_run_level, queue=queue,
469
                                            strictly_successful=True,
470
                                            check_for_outputs=check_for_outputs,
471
                                            resubmit_partial_complete=resubmit_partial_complete,
472
                                            z_submit_types=z_submit_types)
473
    ## All jobs now submitted, update information from job queue and save
474
    ptable = update_from_queue(ptable, dry_run=dry_run_level)
×
475
    if dry_run_level < 3:
×
476
        write_table(ptable, tablename=proc_table_pathname)
×
477

478
    print(f"Completed submission of exposures for night {night}.")
×
479

480
    # #######################################
481
    # ########## Queue Cleanup ##############
482
    # #######################################
483
    # print("Now resolving job failures.")
484
    #
485
    # ## Flush the outputs
486
    # sys.stdout.flush()
487
    # sys.stderr.flush()
488
    # ## Now we resubmit failed jobs and their dependencies until all jobs have un-submittable end state
489
    # ## e.g. they either succeeded or failed with a code-related issue
490
    # ii,nsubmits = 0, 0
491
    # while ii < 4 and any_jobs_not_complete(ptable['STATUS']):
492
    #     print(f"Starting iteration {ii} of queue updating and resubmissions of failures.")
493
    #     ptable, nsubmits = update_and_recurvsively_submit(ptable, submits=nsubmits,
494
    #                                                       ptab_name=proc_table_pathname, dry_run=dry_run_level)
495
    #     if dry_run_level < 3:
496
    #          write_table(ptable, tablename=proc_table_pathname)
497
    #     if any_jobs_not_complete(ptable['STATUS']):
498
    #         sleep_and_report(queue_cadence_time, message_suffix=f"after resubmitting job to queue",
499
    #                          dry_run=(dry_run and (override_night is not None) and not (continue_looping_debug)))
500
    #
501
    #     ptable = update_from_queue(ptable, dry_run=dry_run_level)
502
    #     if dry_run_level < 3:
503
    #          write_table(ptable, tablename=proc_table_pathname)
504
    #     ## Flush the outputs
505
    #     sys.stdout.flush()
506
    #     sys.stderr.flush()
507
    #     ii += 1
508
    #
509
    # print("No job failures left.")
510
    print("Exiting")
×
511
    ## Flush the outputs
512
    sys.stdout.flush()
×
513
    sys.stderr.flush()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc