• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

desihub / desispec / 4888068324

pending completion
4888068324

Pull #2037

github-actions

GitHub
Merge b9577a3f9 into e3cee2f1c
Pull Request #2037: Workaround for sbatch bug on Perlmutter

11 of 11 new or added lines in 2 files covered. (100.0%)

10653 of 43760 relevant lines covered (24.34%)

0.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/py/desispec/scripts/daily_processing.py
1
"""
2
desispec.scripts.daily_processing
3
=================================
4

5
"""
6
import numpy as np
×
7
import os
×
8
import sys
×
9
import time
×
10
from astropy.table import Table
×
11
import glob
×
12

13
## Import some helper functions, you can see their definitions by uncomenting the bash shell command
14
from desispec.workflow.tableio import load_tables, write_tables, write_table
×
15
from desispec.workflow.utils import verify_variable_with_environment, pathjoin, listpath, \
×
16
                                    get_printable_banner, sleep_and_report
17
from desispec.workflow.timing import during_operating_hours, what_night_is_it
×
18
from desispec.workflow.exptable import default_obstypes_for_exptable, get_exposure_table_column_defs, \
×
19
    get_exposure_table_path, get_exposure_table_name, summarize_exposure
20
from desispec.workflow.proctable import default_obstypes_for_proctable, \
×
21
                                        get_processing_table_path, \
22
                                        get_processing_table_name, \
23
                                        erow_to_prow, default_prow
24
from desispec.workflow.procfuncs import parse_previous_tables, flat_joint_fit, arc_joint_fit, get_type_and_tile, \
×
25
                                        science_joint_fit, define_and_assign_dependency, create_and_submit, \
26
                                        update_and_recurvsively_submit, checkfor_and_submit_joint_job, \
27
                                        submit_tilenight_and_redshifts
28
from desispec.workflow.queue import update_from_queue, any_jobs_not_complete
×
29
from desispec.io.util import difference_camwords, parse_badamps, validate_badamps
×
30

31
def daily_processing_manager(specprod=None, exp_table_path=None, proc_table_path=None, path_to_data=None,
×
32
                             expobstypes=None, procobstypes=None, z_submit_types=None, camword=None, badcamword=None,
33
                             badamps=None, override_night=None, tab_filetype='csv', queue='realtime',
34
                             exps_to_ignore=None, data_cadence_time=300, queue_cadence_time=1800,
35
                             dry_run_level=0, dry_run=False, no_redshifts=False, continue_looping_debug=False, dont_check_job_outputs=False,
36
                             dont_resubmit_partial_jobs=False, verbose=False, use_specter=False, use_tilenight=False):
37
    """
38
    Generates processing tables for the nights requested. Requires exposure tables to exist on disk.
39

40
    Args:
41
        specprod: str. The name of the current production. If used, this will overwrite the SPECPROD environment variable.
42
        exp_table_path: str. Full path to where to exposure tables are stored, WITHOUT the monthly directory included.
43
        proc_table_path: str. Full path to where to processing tables to be written.
44
        path_to_data: str. Path to the raw data.
45
        expobstypes: str or comma separated list of strings. The exposure OBSTYPE's that you want to include in the exposure table.
46
        procobstypes: str or comma separated list of strings. The exposure OBSTYPE's that you want to include in the processing table.
47
        z_submit_types: list of str's or comma separated list of string. The "group" types of redshifts that should be
48
                                       submitted with each exposure. If not specified, default for daily processing is
49
                                       ['cumulative', 'pernight-v0']. If false, 'false', or [], then no redshifts are submitted.
50
        camword: str. Camword that, if set, alters the set of cameras that will be set for processing.
51
                      Examples: a0123456789, a1, a2b3r3, a2b3r4z3.
52
        badcamword: str. Camword that, if set, will be removed from the camword defined in camword if given, or the camword
53
                         inferred from the data if camword is not given.
54
        badamps: str. Comma seperated list of bad amplifiers that should not be processed. Should be of the
55
                      form "{camera}{petal}{amp}", i.e. "[brz][0-9][ABCD]". Example: 'b7D,z8A'
56
        override_night: str or int. 8 digit night, e.g. 20200314, of data to run on. If None, it runs on the current night.
57
        tab_filetype: str. The file extension (without the '.') of the exposure and processing tables.
58
        queue: str. The name of the queue to submit the jobs to. Default is "realtime".
59
        exps_to_ignore: list. A list of exposure id's that should not be processed. Each should be an integer.
60
        data_cadence_time: int. Wait time in seconds between loops in looking for new data. Default is 30 seconds.
61
        queue_cadence_time: int. Wait time in seconds between loops in checking queue statuses and resubmitting failures. Default is 1800s.
62
        dry_run_level, int, If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
63
                      dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
64
                      for testing as though scripts are being submitted. Default is 0 (false).
65
        dry_run, bool. When to run without submitting scripts or not. If dry_run_level is defined, then it over-rides
66
                       this flag. dry_run_level not set and dry_run=True, dry_run_level is set to 2 (no scripts
67
                       generated or run). Default for dry_run is False.
68
        no_redshifts, bool. Whether to submit redshifts or not. If True, redshifts are not submitted.
69
        continue_looping_debug: bool. FOR DEBUG purposes only. Will continue looping in search of new data until the process
70
                                 is terminated. Default is False.
71
        dont_check_job_outputs, bool. Default is False. If False, the code checks for the existence of the expected final
72
                                 data products for the script being submitted. If all files exist and this is False,
73
                                 then the script will not be submitted. If some files exist and this is False, only the
74
                                 subset of the cameras without the final data products will be generated and submitted.
75
        dont_resubmit_partial_jobs, bool. Default is False. Must be used with dont_check_job_outputs=False. If this flag is
76
                                          False, jobs with some prior data are pruned using PROCCAMWORD to only process the
77
                                          remaining cameras not found to exist.
78
        verbose: bool. True if you want more verbose output, false otherwise. Current not propagated to lower code,
79
                       so it is only used in the main daily_processing script itself.
80
        use_specter, bool, optional. Default is False. If True, use specter, otherwise use gpu_specter by default.
81
        use_tilenight (bool, optional): Default is False. If True, use desi_proc_tilenight for prestdstar, stdstar,
82
                    and poststdstar steps for science exposures.
83

84
    Returns: Nothing
85

86
    Notes:
87
        Generates both exposure table and processing tables 'on the fly' and saves them at various checkpoints. These
88
        should be capable of being reloaded in case of interuption or accidental termination of the manager's process.
89
    """
90
    ## If not being done during operating hours, and we're not simulating data or running a catchup run, exit.
91
    if not during_operating_hours(dry_run=dry_run) and override_night is None:
×
92
        print("Not during operating hours, and not asked to perform a dry run or run on historic data. Exiting.")
×
93
        sys.exit(0)
×
94

95
    ## What night are we running on?
96
    true_night = what_night_is_it()
×
97
    if override_night is not None:
×
98
        night = int(override_night)
×
99
        print(f"True night is {true_night}, but running for night={night}")
×
100
    else:
101
        night = true_night
×
102

103
    if continue_looping_debug:
×
104
        print("continue_looping_debug is set. Will continue looking for new data and needs to be terminated by the user.")
×
105

106
    ## Recast booleans from double negative
107
    check_for_outputs = (not dont_check_job_outputs)
×
108
    resubmit_partial_complete = (not dont_resubmit_partial_jobs)
×
109

110
    ## Define the obstypes to process
111
    if procobstypes is None:
×
112
        procobstypes = default_obstypes_for_proctable()
×
113
    elif isinstance(procobstypes, str):
×
114
        procobstypes = procobstypes.split(',')
×
115

116
    ## Define the obstypes to save information for in the exposure table
117
    if expobstypes is None:
×
118
        expobstypes = default_obstypes_for_exptable()
×
119
    elif isinstance(expobstypes, str):
×
120
        expobstypes = expobstypes.split(',')
×
121

122
    ## Define the group types of redshifts you want to generate for each tile
123
    if no_redshifts:
×
124
        z_submit_types = None
×
125
    else:
126
        if z_submit_types is None:
×
127
            pass
×
128
        elif isinstance(z_submit_types, str):
×
129
            if z_submit_types.lower() == 'false':
×
130
                z_submit_types = None
×
131
            elif z_submit_types.lower() == 'none':
×
132
                z_submit_types = None
×
133
            else:
134
                z_submit_types = [ztype.strip().lower() for ztype in z_submit_types.split(',')]
×
135
                for ztype in z_submit_types:
×
136
                    if ztype not in ['cumulative', 'pernight-v0', 'pernight', 'perexp']:
×
137
                        raise ValueError(f"Couldn't understand ztype={ztype} in z_submit_types={z_submit_types}.")
×
138
        else:
139
            raise ValueError(f"Couldn't understand z_submit_types={z_submit_types}, type={type(z_submit_types)}.")
×
140

141
    if z_submit_types is None:
×
142
        print("Not submitting scripts for redshift fitting")
×
143
    else:
144
        print(f"Redshift fitting with redshift group types: {z_submit_types}")
×
145

146
    ## Reconcile the dry_run and dry_run_level
147
    if dry_run and dry_run_level == 0:
×
148
        dry_run_level = 2
×
149
    elif dry_run_level > 0:
×
150
        dry_run = True
×
151

152
    ## expobstypes must contain all the types used in processing
153
    for typ in procobstypes:
×
154
        if typ not in expobstypes:
×
155
            expobstypes.append(typ)
×
156

157
    ## Warn people if changing camword
158
    finalcamword = 'a0123456789'
×
159
    if camword is not None and badcamword is None:
×
160
        badcamword = difference_camwords(finalcamword,camword)
×
161
        finalcamword = camword
×
162
    elif camword is not None and badcamword is not None:
×
163
        finalcamword = difference_camwords(camword, badcamword)
×
164
        badcamword = difference_camwords('a0123456789', finalcamword)
×
165
    elif badcamword is not None:
×
166
        finalcamword = difference_camwords(finalcamword,badcamword)
×
167
    else:
168
        badcamword = ''
×
169

170
    if badcamword != '':
×
171
        ## Inform the user what will be done with it.
172
        print(f"Modifying camword of data to be processed with badcamword: {badcamword}. "+\
×
173
              f"Camword to be processed: {finalcamword}")
174

175
    ## Make sure badamps is formatted properly
176
    if badamps is None:
×
177
        badamps = ''
×
178
    else:
179
        badamps = validate_badamps(badamps)
×
180

181
    ## Define the set of exposures to ignore
182
    if exps_to_ignore is None:
×
183
        exps_to_ignore = set()
×
184
    else:
185
        exps_to_ignore = np.sort(np.array(exps_to_ignore).astype(int))
×
186
        print(f"\nReceived exposures to ignore: {exps_to_ignore}")
×
187
        exps_to_ignore = set(exps_to_ignore)
×
188

189
    ## Get context specific variable values
190
    colnames, coltypes, coldefaults = get_exposure_table_column_defs(return_default_values=True)
×
191

192
    ## Define where to find the data
193
    path_to_data = verify_variable_with_environment(var=path_to_data,var_name='path_to_data', env_name='DESI_SPECTRO_DATA')
×
194
    specprod = verify_variable_with_environment(var=specprod,var_name='specprod',env_name='SPECPROD')
×
195

196
    ## Define the naming scheme for the raw data
197
    ## Manifests (describing end of cals, etc.) don't have a data file, so search for those separately
198
    data_glob = os.path.join(path_to_data, str(night), '*', 'desi-*.fit*')
×
199
    manifest_glob = os.path.join(path_to_data, str(night), '*', 'manifest_*.json')
×
200

201
    ## Determine where the exposure table will be written
202
    if exp_table_path is None:
×
203
        exp_table_path = get_exposure_table_path(night=night, usespecprod=True)
×
204
    os.makedirs(exp_table_path, exist_ok=True)
×
205
    name = get_exposure_table_name(night=night, extension=tab_filetype)
×
206
    exp_table_pathname = pathjoin(exp_table_path, name)
×
207

208
    ## Determine where the processing table will be written
209
    if proc_table_path is None:
×
210
        proc_table_path = get_processing_table_path()
×
211
    os.makedirs(proc_table_path, exist_ok=True)
×
212
    name = get_processing_table_name(prodmod=night, extension=tab_filetype)
×
213
    proc_table_pathname = pathjoin(proc_table_path, name)
×
214

215
    ## Determine where the unprocessed data table will be written
216
    unproc_table_pathname = pathjoin(proc_table_path,name.replace('processing', 'unprocessed'))
×
217

218
    ## Combine the table names and types for easier passing to io functions
219
    table_pathnames = [exp_table_pathname, proc_table_pathname, unproc_table_pathname]
×
220
    table_types = ['exptable','proctable','unproctable']
×
221

222
    ## Load in the files defined above
223
    etable, ptable, unproc_table = load_tables(tablenames=table_pathnames, \
×
224
                                               tabletypes=table_types)
225

226
    ## Get relevant data from the tables
227
    all_exps = set(etable['EXPID'])
×
228
    arcs, flats, sciences, calibjobs, curtype, lasttype, \
×
229
    curtile, lasttile, internal_id = parse_previous_tables(etable, ptable, night)
230
    do_bias = ('bias' in procobstypes or 'dark' in procobstypes)
×
231

232
    ## While running on the proper night and during night hours,
233
    ## or doing a dry_run or override_night, keep looping
234
    while ( (night == what_night_is_it()) and during_operating_hours(dry_run=dry_run) ) or ( override_night is not None ):
×
235
        ## Get a list of new exposures that have been found
236
        print(f"\n\n\nPreviously known exposures: {all_exps}")
×
237
        data_exps = set(sorted([int(os.path.basename(os.path.dirname(fil))) for fil in glob.glob(data_glob)]))
×
238
        manifest_exps = set(sorted([int(os.path.basename(os.path.dirname(fil))) for fil in glob.glob(manifest_glob)]))
×
239
        located_exps = data_exps.union(manifest_exps)
×
240

241
        new_exps = located_exps.difference(all_exps)
×
242
        all_exps = located_exps # i.e. new_exps.union(all_exps)
×
243
        print(f"\nNew exposures: {new_exps}\n\n")
×
244

245
        ## If there aren't any new exps and there won't be more because we're running on an old night or simulating things, exit
246
        if (not continue_looping_debug) and ( override_night is not None ) and ( len(list(new_exps))==0 ):
×
247
            print("Terminating the search for new exposures because no new exposures are present and you have" + \
×
248
                  " override_night set without continue_looping_debug")
249
            break
×
250

251
        ## Loop over new exposures and process them as relevant to that type
252
        for exp in sorted(list(new_exps)):
×
253
            if verbose:
×
254
                print(get_printable_banner(str(exp)))
×
255
            else:
256
                print(f'\n\n##################### {exp} #########################')
×
257

258
            ## Open relevant raw data files to understand what we're dealing with
259
            erow = summarize_exposure(path_to_data, night, exp, expobstypes, colnames, coldefaults, verbosely=False)
×
260

261
            ## If there was an issue, continue. If it's a string summarizing the end of some sequence, use that info.
262
            ## If the exposure is assosciated with data, process that data.
263
            if erow is None:
×
264
                continue
×
265
            elif type(erow) is str:
×
266
                writeout = False
×
267
                if exp in exps_to_ignore:
×
268
                    print(f"Located {erow} in exposure {exp}, but the exposure was listed in the expids to ignore. Ignoring this.")
×
269
                elif erow == 'endofarcs' and calibjobs['psfnight'] is None and 'arc' in procobstypes:
×
270
                    print("\nLocated end of arc calibration sequence flag. Processing psfnight.\n")
×
271
                    ptable, calibjobs['psfnight'], internal_id = arc_joint_fit(ptable, arcs, internal_id, dry_run=dry_run_level, queue=queue)
×
272
                    writeout = True
×
273
                elif erow == 'endofflats' and calibjobs['nightlyflat'] is None and 'flat' in procobstypes:
×
274
                    print("\nLocated end of long flat calibration sequence flag. Processing nightlyflat.\n")
×
275
                    ptable, calibjobs['nightlyflat'], internal_id = flat_joint_fit(ptable, flats, internal_id, dry_run=dry_run_level, queue=queue)
×
276
                    writeout = True
×
277
                elif 'short' in erow and calibjobs['nightlyflat'] is None:
×
278
                    print("\nLocated end of short flat calibration flag. Removing flats from list for nightlyflat processing.\n")
×
279
                    flats = []
×
280
                if writeout and dry_run_level < 3:
×
281
                    write_tables([ptable], tablenames=[proc_table_pathname])
×
282
                    sleep_and_report(2, message_suffix=f"after joint fit", dry_run=dry_run)
×
283
                del writeout
×
284
                continue
×
285
            else:
286
                ## Else it's a real row so start processing it
287
                pass
288

289
            erow['BADCAMWORD'] = badcamword
×
290
            erow['BADAMPS'] = badamps
×
291
            unproc = False
×
292
            if exp in exps_to_ignore:
×
293
                print(f"\n{exp} given as exposure id to ignore. Not processing.")
×
294
                erow['LASTSTEP'] = 'ignore'
×
295
                # erow['EXPFLAG'] = np.append(erow['EXPFLAG'], )
296
                unproc = True
×
297
            elif erow['LASTSTEP'] == 'ignore':
×
298
                print(f"\n{exp} identified by the pipeline as something to ignore. Not processing.")
×
299
                unproc = True
×
300
            elif erow['OBSTYPE'] not in procobstypes:
×
301
                print(f"\n{erow['OBSTYPE']} not in obstypes to process: {procobstypes}. Not processing.")
×
302
                unproc = True
×
303
            elif str(erow['OBSTYPE']).lower() == 'arc' and float(erow['EXPTIME']) > 8.0:
×
304
                print("\nArc exposure with EXPTIME greater than 8s. Not processing.")
×
305
                unproc = True
×
306
            elif str(erow['OBSTYPE']).lower() == 'dark' and np.abs(float(erow['EXPTIME'])-300.) > 1:
×
307
                print("\nDark exposure with EXPTIME not consistent with 300s. Not processing.")
×
308
                unproc = True
×
309
            elif str(erow['OBSTYPE']).lower() == 'dark' and calibjobs['ccdcalib'] is not None:
×
310
                print("\nDark exposure found, but already proocessed dark with" +
×
311
                      f" expID {calibjobs['ccdcalib']['EXPID']}. Skipping this one.")
312
                unproc = True
×
313

314
            print(f"\nFound: {erow}")
×
315
            etable.add_row(erow)
×
316
            if unproc:
×
317
                unproc_table.add_row(erow)
×
318
                sleep_and_report(0.5, message_suffix=f"after exposure", dry_run=dry_run) # CHANGE BACK AFTER SBATCH TESTING
×
319
                if dry_run_level < 3:
×
320
                    write_tables([etable, unproc_table], tablenames=[exp_table_pathname, unproc_table_pathname])
×
321
                continue
×
322

323
            curtype,curtile = get_type_and_tile(erow)
×
324

325
            if lasttype is None and curtype != 'dark' and do_bias:
×
326
                print("\nNo dark found at the beginning of the night."
×
327
                      + "Submitting nightlybias before processing exposures.\n")
328
                prow = default_prow()
×
329
                prow['INTID'] = internal_id
×
330
                prow['OBSTYPE'] = 'zero'
×
331
                internal_id += 1
×
332
                prow['JOBDESC'] = 'nightlybias'
×
333
                prow['NIGHT'] = night
×
334
                prow['CALIBRATOR'] = 1
×
335
                prow['PROCCAMWORD'] = finalcamword
×
336
                prow = create_and_submit(prow, dry_run=dry_run_level,
×
337
                                         queue=queue,
338
                                         strictly_successful=True,
339
                                         check_for_outputs=check_for_outputs,
340
                                         resubmit_partial_complete=resubmit_partial_complete)
341
                calibjobs['nightlybias'] = prow.copy()
×
342
                ## Add the processing row to the processing table
343
                ptable.add_row(prow)
×
344
                ## Write out the processing table
345
                if dry_run_level < 3:
×
346
                    write_tables([ptable], tablenames=[proc_table_pathname])
×
347
                    sleep_and_report(2, message_suffix=f"after nightlybias",
×
348
                                     dry_run=dry_run)
349

350
            # if this is a new tile/obstype, proceed with submitting all of the jobs for the previous tile
351
            if lasttype is not None and ((curtype != lasttype) or (curtile != lasttile)):
×
352
                print("\nData for previous tile or obstype is complete. Running joint fits. "
×
353
                      + f"{curtype=}, {lasttype=}, {curtile=}, {lasttile=}\n")
354
                old_iid = internal_id
×
355
                # If done with science exposures for a tile and use_tilenight==True, use
356
                # submit_tilenight_and_redshifts, otherwise use checkfor_and_submit_joint_job
357
                if use_tilenight and lasttype == 'science' and len(sciences)>0:
×
358
                    ptable, sciences, internal_id \
×
359
                        = submit_tilenight_and_redshifts(ptable, sciences, calibjobs, lasttype, internal_id,
360
                                                        dry_run=dry_run_level,
361
                                                        queue=queue,
362
                                                        strictly_successful=True,
363
                                                        check_for_outputs=check_for_outputs,
364
                                                        resubmit_partial_complete=resubmit_partial_complete,
365
                                                        z_submit_types=z_submit_types,
366
                                                        use_specter=use_specter,
367
                                                        laststeps = ['all','fluxcalib','skysub'])
368
                else:
369
                    ptable, calibjobs, sciences, internal_id \
×
370
                        = checkfor_and_submit_joint_job(ptable, arcs, flats, sciences, calibjobs,
371
                                                        lasttype, internal_id,
372
                                                        dry_run=dry_run_level,
373
                                                        queue=queue,
374
                                                        strictly_successful=True,
375
                                                        check_for_outputs=check_for_outputs,
376
                                                        resubmit_partial_complete=resubmit_partial_complete,
377
                                                        z_submit_types=z_submit_types)
378

379
                ## if internal_id changed that means we submitted a joint job
380
                ## so lets write that out and pause
381
                if (internal_id > old_iid) and (dry_run_level < 3):
×
382
                    write_tables([ptable], tablenames=[proc_table_pathname])
×
383
                    sleep_and_report(2, message_suffix=f"after joint fit", dry_run=dry_run)
×
384
                del old_iid
×
385

386
            prow = erow_to_prow(erow)
×
387
            prow['INTID'] = internal_id
×
388
            internal_id += 1
×
389
            if prow['OBSTYPE'] == 'dark':
×
390
                prow['JOBDESC'] = 'ccdcalib'
×
391
            else:
392
                prow['JOBDESC'] = prow['OBSTYPE']
×
393
            prow = define_and_assign_dependency(prow, calibjobs)
×
394
            if (not use_tilenight) or erow['OBSTYPE'] != 'science':
×
395
                print(f"\nProcessing: {prow}\n")
×
396
                prow = create_and_submit(prow, dry_run=dry_run_level, queue=queue,
×
397
                                     strictly_successful=True, check_for_outputs=check_for_outputs,
398
                                     resubmit_partial_complete=resubmit_partial_complete,use_specter=use_specter)
399

400
                ## If processed a dark, assign that to the dark job
401
                if curtype == 'dark':
×
402
                    prow['CALIBRATOR'] = 1
×
403
                    calibjobs['ccdcalib'] = prow.copy()
×
404

405
                ## Add the processing row to the processing table
406
                ptable.add_row(prow)
×
407

408
            ## Note: Assumption here on number of flats
409
            if curtype == 'flat' and calibjobs['nightlyflat'] is None \
×
410
                    and int(erow['SEQTOT']) < 5 \
411
                    and np.abs(float(erow['EXPTIME'])-120.) < 1.:
412
                flats.append(prow)
×
413
            elif curtype == 'arc' and calibjobs['psfnight'] is None:
×
414
                arcs.append(prow)
×
415
            elif curtype == 'science' and (use_tilenight or prow['LASTSTEP'] != 'skysub'):
×
416
                sciences.append(prow)
×
417

418
            lasttile = curtile
×
419
            lasttype = curtype
×
420

421
            ## Flush the outputs
422
            sys.stdout.flush()
×
423
            sys.stderr.flush()
×
424

425
            if dry_run_level < 3:
×
426
                write_tables([etable, ptable], tablenames=[exp_table_pathname, proc_table_pathname])
×
427
            # next 3 lines added for sbatch-patch testing only -- CHANGE BACK BEFORE MERGING
428
            if curtype == 'dark':
×
429
                print(f"\n SBATCH TESTING: waiting 3 minutes to allow ccdcalib job to complete for testing\n")
×
430
                time.sleep(180)                
×
431
            sleep_and_report(0.5, message_suffix=f"after exposure", dry_run=dry_run) # CHANGE BACK TO 2 BEFORE MERGING
×
432

433
        print("\nReached the end of current iteration of new exposures.")
×
434
        if override_night is not None and (not continue_looping_debug):
×
435
            print("\nOverride_night set, not waiting for new data before exiting.\n")
×
436
        else:
437
            sleep_and_report(data_cadence_time, message_suffix=f"before looking for more new data",
×
438
                            dry_run=(dry_run and ()))
439

440
        if len(ptable) > 0:
×
441
            ptable = update_from_queue(ptable, dry_run=dry_run_level)
×
442
            # ptable, nsubmits = update_and_recurvsively_submit(ptable,
443
            #                                                   ptab_name=proc_table_pathname, dry_run=dry_run_level)
444

445
            ## Exposure table doesn't change in the interim, so no need to re-write it to disk
446
            if dry_run_level < 3:
×
447
                write_table(ptable, tablename=proc_table_pathname)
×
448
            if override_night is None or continue_looping_debug:
×
449
                sleep_and_report(10, message_suffix=f"after updating queue information", dry_run=dry_run)
×
450

451
    ## Flush the outputs
452
    sys.stdout.flush()
×
453
    sys.stderr.flush()
×
454
    ## No more data coming in, so do bottleneck steps if any apply
455
    if use_tilenight and len(sciences)>0:
×
456
        ptable, sciences, internal_id \
×
457
            = submit_tilenight_and_redshifts(ptable, sciences, calibjobs, lasttype, internal_id,
458
                                            dry_run=dry_run_level,
459
                                            queue=queue,
460
                                            strictly_successful=True,
461
                                            check_for_outputs=check_for_outputs,
462
                                            resubmit_partial_complete=resubmit_partial_complete,
463
                                            z_submit_types=z_submit_types,
464
                                            use_specter=use_specter,
465
                                            laststeps = ['all','fluxcalib','skysub'])
466
    else:
467
        ptable, calibjobs, sciences, internal_id \
×
468
            = checkfor_and_submit_joint_job(ptable, arcs, flats, sciences, calibjobs,
469
                                            lasttype, internal_id,
470
                                            dry_run=dry_run_level, queue=queue,
471
                                            strictly_successful=True,
472
                                            check_for_outputs=check_for_outputs,
473
                                            resubmit_partial_complete=resubmit_partial_complete,
474
                                            z_submit_types=z_submit_types)
475
    ## All jobs now submitted, update information from job queue and save
476
    ptable = update_from_queue(ptable, dry_run=dry_run_level)
×
477
    if dry_run_level < 3:
×
478
        write_table(ptable, tablename=proc_table_pathname)
×
479

480
    print(f"Completed submission of exposures for night {night}.")
×
481

482
    # #######################################
483
    # ########## Queue Cleanup ##############
484
    # #######################################
485
    # print("Now resolving job failures.")
486
    #
487
    # ## Flush the outputs
488
    # sys.stdout.flush()
489
    # sys.stderr.flush()
490
    # ## Now we resubmit failed jobs and their dependencies until all jobs have un-submittable end state
491
    # ## e.g. they either succeeded or failed with a code-related issue
492
    # ii,nsubmits = 0, 0
493
    # while ii < 4 and any_jobs_not_complete(ptable['STATUS']):
494
    #     print(f"Starting iteration {ii} of queue updating and resubmissions of failures.")
495
    #     ptable, nsubmits = update_and_recurvsively_submit(ptable, submits=nsubmits,
496
    #                                                       ptab_name=proc_table_pathname, dry_run=dry_run_level)
497
    #     if dry_run_level < 3:
498
    #          write_table(ptable, tablename=proc_table_pathname)
499
    #     if any_jobs_not_complete(ptable['STATUS']):
500
    #         sleep_and_report(queue_cadence_time, message_suffix=f"after resubmitting job to queue",
501
    #                          dry_run=(dry_run and (override_night is not None) and not (continue_looping_debug)))
502
    #
503
    #     ptable = update_from_queue(ptable, dry_run=dry_run_level)
504
    #     if dry_run_level < 3:
505
    #          write_table(ptable, tablename=proc_table_pathname)
506
    #     ## Flush the outputs
507
    #     sys.stdout.flush()
508
    #     sys.stderr.flush()
509
    #     ii += 1
510
    #
511
    # print("No job failures left.")
512
    print("Exiting")
×
513
    ## Flush the outputs
514
    sys.stdout.flush()
×
515
    sys.stderr.flush()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc