• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

desihub / desispec / 8440689206

26 Mar 2024 05:57PM UTC coverage: 28.101% (+3.1%) from 25.01%
8440689206

push

github

web-flow
Merge pull request #2187 from desihub/pipelinerefactor

Introduce desi_proc_night to unify and simplify processing scripts

769 of 1188 new or added lines in 20 files covered. (64.73%)

10 existing lines in 6 files now uncovered.

13231 of 47084 relevant lines covered (28.1%)

0.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/py/desispec/scripts/daily_processing.py
1
"""
2
desispec.scripts.daily_processing
3
=================================
4

5
"""
6
import numpy as np
×
7
import os
×
8
import sys
×
9
import time
×
10
from astropy.table import Table
×
11
import glob
×
12

13
## Import some helper functions, you can see their definitions by uncomenting the bash shell command
14
from desispec.workflow.tableio import load_tables, write_tables, write_table
×
15
from desispec.workflow.utils import verify_variable_with_environment, pathjoin, listpath, \
×
16
                                    get_printable_banner, sleep_and_report
17
from desispec.workflow.timing import during_operating_hours, what_night_is_it, wait_for_cals
×
18
from desispec.workflow.exptable import default_obstypes_for_exptable, get_exposure_table_column_defs, \
×
19
    get_exposure_table_path, get_exposure_table_name, summarize_exposure
20
from desispec.workflow.proctable import default_obstypes_for_proctable, \
×
21
                                        get_processing_table_path, \
22
                                        get_processing_table_name, \
23
                                        erow_to_prow, default_prow
NEW
24
from desispec.workflow.processing import parse_previous_tables, flat_joint_fit, arc_joint_fit, get_type_and_tile, \
×
25
                                        science_joint_fit, define_and_assign_dependency, create_and_submit, \
26
                                        update_and_recurvsively_submit, checkfor_and_submit_joint_job, \
27
                                        submit_tilenight_and_redshifts
28
from desispec.workflow.queue import update_from_queue, any_jobs_not_complete
×
29
from desispec.io.util import difference_camwords, parse_badamps, validate_badamps
×
30

31
def daily_processing_manager(specprod=None, exp_table_path=None, proc_table_path=None, path_to_data=None,
×
32
                             expobstypes=None, procobstypes=None, z_submit_types=None, camword=None, badcamword=None,
33
                             badamps=None, override_night=None, tab_filetype='csv', queue='realtime',
34
                             exps_to_ignore=None, data_cadence_time=300, queue_cadence_time=1800,
35
                             exp_cadence_time=2,
36
                             dry_run_level=0, dry_run=False, no_redshifts=False, continue_looping_debug=False, dont_check_job_outputs=False,
37
                             dont_resubmit_partial_jobs=False, verbose=False, use_specter=False, use_tilenight=False):
38
    """
39
    Generates processing tables for the nights requested. Requires exposure tables to exist on disk.
40

41
    Args:
42
        specprod: str. The name of the current production. If used, this will overwrite the SPECPROD environment variable.
43
        exp_table_path: str. Full path to where to exposure tables are stored, WITHOUT the monthly directory included.
44
        proc_table_path: str. Full path to where to processing tables to be written.
45
        path_to_data: str. Path to the raw data.
46
        expobstypes: str or comma separated list of strings. The exposure OBSTYPE's that you want to include in the exposure table.
47
        procobstypes: str or comma separated list of strings. The exposure OBSTYPE's that you want to include in the processing table.
48
        z_submit_types: list of str's or comma separated list of string. The "group" types of redshifts that should be
49
                                       submitted with each exposure. If not specified, default for daily processing is
50
                                       ['cumulative', 'pernight-v0']. If false, 'false', or [], then no redshifts are submitted.
51
        camword: str. Camword that, if set, alters the set of cameras that will be set for processing.
52
                      Examples: a0123456789, a1, a2b3r3, a2b3r4z3.
53
        badcamword: str. Camword that, if set, will be removed from the camword defined in camword if given, or the camword
54
                         inferred from the data if camword is not given.
55
        badamps: str. Comma seperated list of bad amplifiers that should not be processed. Should be of the
56
                      form "{camera}{petal}{amp}", i.e. "[brz][0-9][ABCD]". Example: 'b7D,z8A'
57
        override_night: str or int. 8 digit night, e.g. 20200314, of data to run on. If None, it runs on the current night.
58
        tab_filetype: str. The file extension (without the '.') of the exposure and processing tables.
59
        queue: str. The name of the queue to submit the jobs to. Default is "realtime".
60
        exps_to_ignore: list. A list of exposure id's that should not be processed. Each should be an integer.
61
        data_cadence_time: int. Wait time in seconds between loops in looking for new data. Default is 30 seconds.
62
        queue_cadence_time: int. Wait time in seconds between loops in checking queue statuses and resubmitting failures. Default is 1800s.
63
        exp_cadence_time: int. Wait time in seconds between loops over each science exposure. Default 2.
64
        dry_run_level, int, If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
65
                      dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
66
                      for testing as though scripts are being submitted. Default is 0 (false).
67
        dry_run, bool. When to run without submitting scripts or not. If dry_run_level is defined, then it over-rides
68
                       this flag. dry_run_level not set and dry_run=True, dry_run_level is set to 2 (no scripts
69
                       generated or run). Default for dry_run is False.
70
        no_redshifts, bool. Whether to submit redshifts or not. If True, redshifts are not submitted.
71
        continue_looping_debug: bool. FOR DEBUG purposes only. Will continue looping in search of new data until the process
72
                                 is terminated. Default is False.
73
        dont_check_job_outputs, bool. Default is False. If False, the code checks for the existence of the expected final
74
                                 data products for the script being submitted. If all files exist and this is False,
75
                                 then the script will not be submitted. If some files exist and this is False, only the
76
                                 subset of the cameras without the final data products will be generated and submitted.
77
        dont_resubmit_partial_jobs, bool. Default is False. Must be used with dont_check_job_outputs=False. If this flag is
78
                                          False, jobs with some prior data are pruned using PROCCAMWORD to only process the
79
                                          remaining cameras not found to exist.
80
        verbose: bool. True if you want more verbose output, false otherwise. Current not propagated to lower code,
81
                       so it is only used in the main daily_processing script itself.
82
        use_specter, bool, optional. Default is False. If True, use specter, otherwise use gpu_specter by default.
83
        use_tilenight (bool, optional): Default is False. If True, use desi_proc_tilenight for prestdstar, stdstar,
84
                    and poststdstar steps for science exposures.
85

86
    Returns: Nothing
87

88
    Notes:
89
        Generates both exposure table and processing tables 'on the fly' and saves them at various checkpoints. These
90
        should be capable of being reloaded in case of interuption or accidental termination of the manager's process.
91
    """
92
    ## If not being done during operating hours, and we're not simulating data or running a catchup run, exit.
93
    if not during_operating_hours(dry_run=dry_run) and override_night is None:
×
94
        print("Not during operating hours, and not asked to perform a dry run or run on historic data. Exiting.")
×
95
        sys.exit(0)
×
96

97
    ## What night are we running on?
98
    true_night = what_night_is_it()
×
99
    if override_night is not None:
×
100
        night = int(override_night)
×
101
        print(f"True night is {true_night}, but running for night={night}")
×
102
    else:
103
        night = true_night
×
104

105
    ## Wait for calibrations to completely arrive before proceeding,
106
    ## since we need to measure CTE from flats first
107
    found_cals = wait_for_cals(night)
×
108
    if not found_cals:
×
109
        print(f"ERROR: didn't find calibration data for {night}")
×
110
        sys.exit(1)
×
111

112
    if continue_looping_debug:
×
113
        print("continue_looping_debug is set. Will continue looking for new data and needs to be terminated by the user.")
×
114

115
    ## Recast booleans from double negative
116
    check_for_outputs = (not dont_check_job_outputs)
×
117
    resubmit_partial_complete = (not dont_resubmit_partial_jobs)
×
118

119
    ## Define the obstypes to process
120
    if procobstypes is None:
×
121
        procobstypes = default_obstypes_for_proctable()
×
122
    elif isinstance(procobstypes, str):
×
123
        procobstypes = procobstypes.split(',')
×
124

125
    ## Define the obstypes to save information for in the exposure table
126
    if expobstypes is None:
×
127
        expobstypes = default_obstypes_for_exptable()
×
128
    elif isinstance(expobstypes, str):
×
129
        expobstypes = expobstypes.split(',')
×
130

131
    ## Define the group types of redshifts you want to generate for each tile
132
    if no_redshifts:
×
133
        z_submit_types = None
×
134
    else:
135
        if z_submit_types is None:
×
136
            pass
×
137
        elif isinstance(z_submit_types, str):
×
138
            if z_submit_types.lower() == 'false':
×
139
                z_submit_types = None
×
140
            elif z_submit_types.lower() == 'none':
×
141
                z_submit_types = None
×
142
            else:
143
                z_submit_types = [ztype.strip().lower() for ztype in z_submit_types.split(',')]
×
144
                for ztype in z_submit_types:
×
145
                    if ztype not in ['cumulative', 'pernight-v0', 'pernight', 'perexp']:
×
146
                        raise ValueError(f"Couldn't understand ztype={ztype} in z_submit_types={z_submit_types}.")
×
147
        else:
148
            raise ValueError(f"Couldn't understand z_submit_types={z_submit_types}, type={type(z_submit_types)}.")
×
149

150
    if z_submit_types is None:
×
151
        print("Not submitting scripts for redshift fitting")
×
152
    else:
153
        print(f"Redshift fitting with redshift group types: {z_submit_types}")
×
154

155
    ## Reconcile the dry_run and dry_run_level
156
    if dry_run and dry_run_level == 0:
×
157
        dry_run_level = 2
×
158
    elif dry_run_level > 0:
×
159
        dry_run = True
×
160

161
    ## expobstypes must contain all the types used in processing
162
    for typ in procobstypes:
×
163
        if typ not in expobstypes:
×
164
            expobstypes.append(typ)
×
165

166
    ## Warn people if changing camword
167
    finalcamword = 'a0123456789'
×
168
    if camword is not None and badcamword is None:
×
169
        badcamword = difference_camwords(finalcamword,camword)
×
170
        finalcamword = camword
×
171
    elif camword is not None and badcamword is not None:
×
172
        finalcamword = difference_camwords(camword, badcamword)
×
173
        badcamword = difference_camwords('a0123456789', finalcamword)
×
174
    elif badcamword is not None:
×
175
        finalcamword = difference_camwords(finalcamword,badcamword)
×
176
    else:
177
        badcamword = ''
×
178

179
    if badcamword != '':
×
180
        ## Inform the user what will be done with it.
181
        print(f"Modifying camword of data to be processed with badcamword: {badcamword}. "+\
×
182
              f"Camword to be processed: {finalcamword}")
183

184
    ## Make sure badamps is formatted properly
185
    if badamps is None:
×
186
        badamps = ''
×
187
    else:
188
        badamps = validate_badamps(badamps)
×
189

190
    ## Define the set of exposures to ignore
191
    if exps_to_ignore is None:
×
192
        exps_to_ignore = set()
×
193
    else:
194
        exps_to_ignore = np.sort(np.array(exps_to_ignore).astype(int))
×
195
        print(f"\nReceived exposures to ignore: {exps_to_ignore}")
×
196
        exps_to_ignore = set(exps_to_ignore)
×
197

198
    ## Get context specific variable values
199
    colnames, coltypes, coldefaults = get_exposure_table_column_defs(return_default_values=True)
×
200

201
    ## Define where to find the data
202
    path_to_data = verify_variable_with_environment(var=path_to_data,var_name='path_to_data', env_name='DESI_SPECTRO_DATA')
×
203
    specprod = verify_variable_with_environment(var=specprod,var_name='specprod',env_name='SPECPROD')
×
204

205
    ## Define the naming scheme for the raw data
206
    ## Manifests (describing end of cals, etc.) don't have a data file, so search for those separately
207
    data_glob = os.path.join(path_to_data, str(night), '*', 'desi-*.fit*')
×
208
    manifest_glob = os.path.join(path_to_data, str(night), '*', 'manifest_*.json')
×
209

210
    ## Determine where the exposure table will be written
211
    if exp_table_path is None:
×
212
        exp_table_path = get_exposure_table_path(night=night, usespecprod=True)
×
213
    os.makedirs(exp_table_path, exist_ok=True)
×
214
    name = get_exposure_table_name(night=night, extension=tab_filetype)
×
215
    exp_table_pathname = pathjoin(exp_table_path, name)
×
216

217
    ## Determine where the processing table will be written
218
    if proc_table_path is None:
×
219
        proc_table_path = get_processing_table_path()
×
220
    os.makedirs(proc_table_path, exist_ok=True)
×
221
    name = get_processing_table_name(prodmod=night, extension=tab_filetype)
×
222
    proc_table_pathname = pathjoin(proc_table_path, name)
×
223

224
    ## Determine where the unprocessed data table will be written
225
    unproc_table_pathname = pathjoin(proc_table_path,name.replace('processing', 'unprocessed'))
×
226

227
    ## Combine the table names and types for easier passing to io functions
228
    table_pathnames = [exp_table_pathname, proc_table_pathname, unproc_table_pathname]
×
229
    table_types = ['exptable','proctable','unproctable']
×
230

231
    ## Load in the files defined above
232
    etable, ptable, unproc_table = load_tables(tablenames=table_pathnames, \
×
233
                                               tabletypes=table_types)
234

235
    ## Get relevant data from the tables
236
    all_exps = set(etable['EXPID'])
×
237
    arcs, flats, sciences, calibjobs, curtype, lasttype, \
×
238
    curtile, lasttile, internal_id = parse_previous_tables(etable, ptable, night)
239
    do_bias = ('bias' in procobstypes or 'dark' in procobstypes)
×
240
    
241
    ## While running on the proper night and during night hours,
242
    ## or doing a dry_run or override_night, keep looping
243
    while ( (night == what_night_is_it()) and during_operating_hours(dry_run=dry_run) ) or ( override_night is not None ):
×
244
        ## Get a list of new exposures that have been found
245
        print(f"\n\n\nPreviously known exposures: {all_exps}")
×
246
        data_exps = set(sorted([int(os.path.basename(os.path.dirname(fil))) for fil in glob.glob(data_glob)]))
×
247
        manifest_exps = set(sorted([int(os.path.basename(os.path.dirname(fil))) for fil in glob.glob(manifest_glob)]))
×
248
        located_exps = data_exps.union(manifest_exps)
×
249

250
        new_exps = located_exps.difference(all_exps)
×
251
        all_exps = located_exps # i.e. new_exps.union(all_exps)
×
252
        print(f"\nNew exposures: {new_exps}\n\n")
×
253

254
        ## If there aren't any new exps and there won't be more because we're running on an old night or simulating things, exit
255
        if (not continue_looping_debug) and ( override_night is not None ) and ( len(list(new_exps))==0 ):
×
256
            print("Terminating the search for new exposures because no new exposures are present and you have" + \
×
257
                  " override_night set without continue_looping_debug")
258
            break
×
259

260
        ## Loop over new exposures and process them as relevant to that type
261
        for exp in sorted(list(new_exps)):
×
262
            if verbose:
×
263
                print(get_printable_banner(str(exp)))
×
264
            else:
265
                print(f'\n\n##################### {exp} #########################')
×
266

267
            ## Open relevant raw data files to understand what we're dealing with
268
            erow = summarize_exposure(path_to_data, night, exp, expobstypes, colnames, coldefaults, verbosely=False)
×
269

270
            ## If there was an issue, continue. If it's a string summarizing the end of some sequence, use that info.
271
            ## If the exposure is assosciated with data, process that data.
272
            if erow is None:
×
273
                continue
×
274
            elif type(erow) is str:
×
275
                writeout = False
×
276
                if exp in exps_to_ignore:
×
277
                    print(f"Located {erow} in exposure {exp}, but the exposure was listed in the expids to ignore. Ignoring this.")
×
278
                elif erow == 'endofarcs' and calibjobs['psfnight'] is None and 'arc' in procobstypes:
×
279
                    print("\nLocated end of arc calibration sequence flag. Processing psfnight.\n")
×
280
                    ptable, calibjobs['psfnight'], internal_id = arc_joint_fit(ptable, arcs, internal_id, dry_run=dry_run_level, queue=queue)
×
281
                    writeout = True
×
282
                elif erow == 'endofflats' and calibjobs['nightlyflat'] is None and 'flat' in procobstypes:
×
283
                    print("\nLocated end of long flat calibration sequence flag. Processing nightlyflat.\n")
×
284
                    ptable, calibjobs['nightlyflat'], internal_id = flat_joint_fit(ptable, flats, internal_id, dry_run=dry_run_level, queue=queue)
×
285
                    writeout = True
×
286
                elif 'short' in erow and calibjobs['nightlyflat'] is None:
×
287
                    print("\nLocated end of short flat calibration flag. Removing flats from list for nightlyflat processing.\n")
×
288
                    flats = []
×
289
                if writeout and dry_run_level < 3:
×
290
                    write_tables([ptable], tablenames=[proc_table_pathname])
×
291
                    sleep_and_report(2, message_suffix=f"after joint fit", dry_run=dry_run)
×
292
                del writeout
×
293
                continue
×
294
            else:
295
                ## Else it's a real row so start processing it
296
                pass
×
297

298
            erow['BADCAMWORD'] = badcamword
×
299
            erow['BADAMPS'] = badamps
×
300
            unproc = False
×
301
            if exp in exps_to_ignore:
×
302
                print(f"\n{exp} given as exposure id to ignore. Not processing.")
×
303
                erow['LASTSTEP'] = 'ignore'
×
304
                # erow['EXPFLAG'] = np.append(erow['EXPFLAG'], )
305
                unproc = True
×
306
            elif erow['LASTSTEP'] == 'ignore':
×
307
                print(f"\n{exp} identified by the pipeline as something to ignore. Not processing.")
×
308
                unproc = True
×
309
            elif erow['OBSTYPE'] not in procobstypes:
×
310
                print(f"\n{erow['OBSTYPE']} not in obstypes to process: {procobstypes}. Not processing.")
×
311
                unproc = True
×
312
            elif str(erow['OBSTYPE']).lower() == 'arc' and float(erow['EXPTIME']) > 8.0:
×
313
                print("\nArc exposure with EXPTIME greater than 8s. Not processing.")
×
314
                unproc = True
×
315
            elif str(erow['OBSTYPE']).lower() == 'dark' and np.abs(float(erow['EXPTIME'])-300.) > 1:
×
316
                print("\nDark exposure with EXPTIME not consistent with 300s. Not processing.")
×
317
                unproc = True
×
318
            elif str(erow['OBSTYPE']).lower() == 'dark' and calibjobs['ccdcalib'] is not None:
×
319
                print("\nDark exposure found, but already proocessed dark with" +
×
320
                      f" expID {calibjobs['ccdcalib']['EXPID']}. Skipping this one.")
321
                unproc = True
×
322

323
            print(f"\nFound: {erow}")
×
324
            etable.add_row(erow)
×
325
            if unproc:
×
326
                unproc_table.add_row(erow)
×
327
                sleep_and_report(2, message_suffix=f"after exposure", dry_run=dry_run)
×
328
                if dry_run_level < 3:
×
329
                    write_tables([etable, unproc_table], tablenames=[exp_table_pathname, unproc_table_pathname])
×
330
                continue
×
331

332
            curtype,curtile = get_type_and_tile(erow)
×
333

334
            if lasttype is None and curtype != 'dark' and do_bias:
×
335
                print("\nNo dark found at the beginning of the night."
×
336
                      + "Submitting nightlybias before processing exposures.\n")
337
                prow = default_prow()
×
338
                prow['INTID'] = internal_id
×
339
                prow['OBSTYPE'] = 'zero'
×
340
                internal_id += 1
×
341
                prow['JOBDESC'] = 'nightlybias'
×
342
                prow['NIGHT'] = night
×
343
                prow['CALIBRATOR'] = 1
×
344
                prow['PROCCAMWORD'] = finalcamword
×
345
                prow = create_and_submit(prow, dry_run=dry_run_level,
×
346
                                         queue=queue,
347
                                         strictly_successful=True,
348
                                         check_for_outputs=check_for_outputs,
349
                                         resubmit_partial_complete=resubmit_partial_complete)
350
                calibjobs['nightlybias'] = prow.copy()
×
351
                ## Add the processing row to the processing table
352
                ptable.add_row(prow)
×
353
                ## Write out the processing table
354
                if dry_run_level < 3:
×
355
                    write_tables([ptable], tablenames=[proc_table_pathname])
×
356
                    sleep_and_report(2, message_suffix=f"after nightlybias",
×
357
                                     dry_run=dry_run)
358

359
            # if this is a new tile/obstype, proceed with submitting all of the jobs for the previous tile
360
            if lasttype is not None and ((curtype != lasttype) or (curtile != lasttile)):
×
361
                print("\nData for previous tile or obstype is complete. Running joint fits. "
×
362
                      + f"{curtype=}, {lasttype=}, {curtile=}, {lasttile=}\n")
363
                old_iid = internal_id
×
364
                # If done with science exposures for a tile and use_tilenight==True, use
365
                # submit_tilenight_and_redshifts, otherwise use checkfor_and_submit_joint_job
366
                if use_tilenight and lasttype == 'science' and len(sciences)>0:
×
NEW
367
                    extra_job_args = {}
×
NEW
368
                    extra_job_args['z_submit_types'] = z_submit_types
×
NEW
369
                    extra_job_args['laststeps'] = ['all','fluxcalib','skysub']
×
UNCOV
370
                    ptable, sciences, internal_id \
×
371
                        = submit_tilenight_and_redshifts(ptable, sciences, calibjobs, internal_id,
372
                                                        dry_run=dry_run_level,
373
                                                        queue=queue,
374
                                                        strictly_successful=True,
375
                                                        check_for_outputs=check_for_outputs,
376
                                                        resubmit_partial_complete=resubmit_partial_complete,
377
                                                         use_specter=use_specter, extra_job_args=extra_job_args)
378
                else:
379
                    ptable, calibjobs, sciences, internal_id \
×
380
                        = checkfor_and_submit_joint_job(ptable, arcs, flats, sciences, calibjobs,
381
                                                        lasttype, internal_id,
382
                                                        dry_run=dry_run_level,
383
                                                        queue=queue,
384
                                                        strictly_successful=True,
385
                                                        check_for_outputs=check_for_outputs,
386
                                                        resubmit_partial_complete=resubmit_partial_complete,
387
                                                        z_submit_types=z_submit_types)
388

389
                ## if internal_id changed that means we submitted a joint job
390
                ## so lets write that out and pause
391
                if (internal_id > old_iid) and (dry_run_level < 3):
×
392
                    write_tables([ptable], tablenames=[proc_table_pathname])
×
393
                    sleep_and_report(2, message_suffix=f"after joint fit", dry_run=dry_run)
×
394
                del old_iid
×
395

396
            prow = erow_to_prow(erow)
×
397
            prow['INTID'] = internal_id
×
398
            internal_id += 1
×
399
            if prow['OBSTYPE'] == 'dark':
×
400
                prow['JOBDESC'] = 'ccdcalib'
×
401
            else:
402
                prow['JOBDESC'] = prow['OBSTYPE']
×
403
            prow = define_and_assign_dependency(prow, calibjobs)
×
404
            if (not use_tilenight) or erow['OBSTYPE'] != 'science':
×
405
                print(f"\nProcessing: {prow}\n")
×
406
                prow = create_and_submit(prow, dry_run=dry_run_level, queue=queue,
×
407
                                     strictly_successful=True, check_for_outputs=check_for_outputs,
408
                                     resubmit_partial_complete=resubmit_partial_complete,use_specter=use_specter)
409

410
                ## If processed a dark, assign that to the dark job
411
                if curtype == 'dark':
×
412
                    prow['CALIBRATOR'] = 1
×
413
                    calibjobs['ccdcalib'] = prow.copy()
×
414

415
                ## Add the processing row to the processing table
416
                ptable.add_row(prow)
×
417

418
            ## Note: Assumption here on number of flats
419
            if curtype == 'flat' and calibjobs['nightlyflat'] is None \
×
420
                    and int(erow['SEQTOT']) < 5 \
421
                    and np.abs(float(erow['EXPTIME'])-120.) < 1.:
422
                flats.append(prow)
×
423
            elif curtype == 'arc' and calibjobs['psfnight'] is None:
×
424
                arcs.append(prow)
×
425
            elif curtype == 'science' and (use_tilenight or prow['LASTSTEP'] != 'skysub'):
×
426
                sciences.append(prow)
×
427

428
            lasttile = curtile
×
429
            lasttype = curtype
×
430

431
            ## Flush the outputs
432
            sys.stdout.flush()
×
433
            sys.stderr.flush()
×
434

435
            if dry_run_level < 3:
×
436
                write_tables([etable, ptable], tablenames=[exp_table_pathname, proc_table_pathname])
×
437
            sleep_and_report(exp_cadence_time, message_suffix=f"after exposure", dry_run=dry_run)
×
438

439
        print("\nReached the end of current iteration of new exposures.")
×
440
        if override_night is not None and (not continue_looping_debug):
×
441
            print("\nOverride_night set, not waiting for new data before exiting.\n")
×
442
        else:
443
            sleep_and_report(data_cadence_time, message_suffix=f"before looking for more new data",
×
444
                            dry_run=(dry_run and ()))
445

446
        if len(ptable) > 0:
×
447
            ptable = update_from_queue(ptable, dry_run=dry_run_level)
×
448
            # ptable, nsubmits = update_and_recurvsively_submit(ptable,
449
            #                                                   ptab_name=proc_table_pathname, dry_run=dry_run_level)
450

451
            ## Exposure table doesn't change in the interim, so no need to re-write it to disk
452
            if dry_run_level < 3:
×
453
                write_table(ptable, tablename=proc_table_pathname)
×
454
            if override_night is None or continue_looping_debug:
×
455
                sleep_and_report(10, message_suffix=f"after updating queue information", dry_run=dry_run)
×
456

457
    ## Flush the outputs
458
    sys.stdout.flush()
×
459
    sys.stderr.flush()
×
460
    ## No more data coming in, so do bottleneck steps if any apply
461
    if use_tilenight and len(sciences)>0:
×
NEW
462
        extra_job_args = {}
×
NEW
463
        extra_job_args['z_submit_types'] = z_submit_types
×
NEW
464
        extra_job_args['laststeps'] = ['all','fluxcalib','skysub']
×
UNCOV
465
        ptable, sciences, internal_id \
×
466
            = submit_tilenight_and_redshifts(ptable, sciences, calibjobs, internal_id,
467
                                            dry_run=dry_run_level,
468
                                            queue=queue,
469
                                            strictly_successful=True,
470
                                            check_for_outputs=check_for_outputs,
471
                                            resubmit_partial_complete=resubmit_partial_complete,
472
                                            use_specter=use_specter,
473
                                            extra_job_args=extra_job_args)
474
    else:
475
        ptable, calibjobs, sciences, internal_id \
×
476
            = checkfor_and_submit_joint_job(ptable, arcs, flats, sciences, calibjobs,
477
                                            lasttype, internal_id,
478
                                            dry_run=dry_run_level, queue=queue,
479
                                            strictly_successful=True,
480
                                            check_for_outputs=check_for_outputs,
481
                                            resubmit_partial_complete=resubmit_partial_complete,
482
                                            z_submit_types=z_submit_types)
483
    ## All jobs now submitted, update information from job queue and save
484
    ptable = update_from_queue(ptable, dry_run=dry_run_level)
×
485
    if dry_run_level < 3:
×
486
        write_table(ptable, tablename=proc_table_pathname)
×
487

488
    print(f"Completed submission of exposures for night {night}.")
×
489

490
    # #######################################
491
    # ########## Queue Cleanup ##############
492
    # #######################################
493
    # print("Now resolving job failures.")
494
    #
495
    # ## Flush the outputs
496
    # sys.stdout.flush()
497
    # sys.stderr.flush()
498
    # ## Now we resubmit failed jobs and their dependencies until all jobs have un-submittable end state
499
    # ## e.g. they either succeeded or failed with a code-related issue
500
    # ii,nsubmits = 0, 0
501
    # while ii < 4 and any_jobs_not_complete(ptable['STATUS']):
502
    #     print(f"Starting iteration {ii} of queue updating and resubmissions of failures.")
503
    #     ptable, nsubmits = update_and_recurvsively_submit(ptable, submits=nsubmits,
504
    #                                                       ptab_name=proc_table_pathname, dry_run=dry_run_level)
505
    #     if dry_run_level < 3:
506
    #          write_table(ptable, tablename=proc_table_pathname)
507
    #     if any_jobs_not_complete(ptable['STATUS']):
508
    #         sleep_and_report(queue_cadence_time, message_suffix=f"after resubmitting job to queue",
509
    #                          dry_run=(dry_run and (override_night is not None) and not (continue_looping_debug)))
510
    #
511
    #     ptable = update_from_queue(ptable, dry_run=dry_run_level)
512
    #     if dry_run_level < 3:
513
    #          write_table(ptable, tablename=proc_table_pathname)
514
    #     ## Flush the outputs
515
    #     sys.stdout.flush()
516
    #     sys.stderr.flush()
517
    #     ii += 1
518
    #
519
    # print("No job failures left.")
520
    print("Exiting")
×
521
    ## Flush the outputs
522
    sys.stdout.flush()
×
523
    sys.stderr.flush()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc