• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

desihub / desispec / 15788802466

20 Jun 2025 10:21PM UTC coverage: 37.029% (-2.1%) from 39.083%
15788802466

Pull #2502

github

weaverba137
fix more deprecation warnings
Pull Request #2502: [WIP] NumPy 2 compatibility

7 of 15 new or added lines in 11 files covered. (46.67%)

692 existing lines in 4 files now uncovered.

12422 of 33547 relevant lines covered (37.03%)

0.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/py/desispec/scripts/submit_prod.py
1
"""
2
desispec.scripts.submit_prod
3
============================
4

5
"""
6
import yaml
×
7
import numpy as np
×
8
import os
×
9
import sys
×
10
import time
×
11
import re
×
12
import glob
×
13

14
from desispec.parallel import stdouterr_redirected
×
15
from desiutil.log import get_logger
×
16
from desispec.io import findfile
×
17
from desispec.scripts.proc_night import proc_night
×
18
## Import some helper functions, you can see their definitions by uncomenting the bash shell command
19
from desispec.workflow.utils import verify_variable_with_environment, listpath, \
×
20
    remove_slurm_environment_variables
21
from desispec.workflow.exptable import read_minimal_science_exptab_cols
×
22
from desispec.scripts.submit_night import submit_night
×
23
from desispec.workflow.queue import check_queue_count
×
24
import desispec.workflow.proctable
×
25

26
def get_nights_in_date_range(first_night, last_night):
×
27
    """
28
    Returns a full list of all nights that have an exposure table
29
    exposure
30

31
    Args:
32
        first_night, int. First night to include (inclusive).
33
        last_night, int. Last night to include (inclusive).
34

35
    Returns:
36
        nights, list. A list of nights on or after Jan 1 2020 in which data exists at NERSC.
37
    """
38
    etab_path = findfile('exptable', night='99999999', readonly=True)
×
39
    glob_path = etab_path.replace('99999999', '202?????').replace('999999', '202???')
×
40
    etab_files = sorted(glob.glob(glob_path))
×
41
    nights = []
×
42
    for n in etab_files:
×
43
        # - nights are 20YYMMDD
NEW
44
        if re.match(r'^20\d{6}$', n):
×
UNCOV
45
            nights.append(int(n))
×
46

47
    nights = np.array(nights)
×
48
    nights = nights[((nights >= first_night) & (nights <= last_night))]
×
49
    return nights
×
50

51
def get_all_valid_nights(first_night, last_night):
×
52
    """
53
    Returns a full list of all nights that have at least one valid science
54
    exposure
55

56
    Args:
57
        first_night, int. First night to include (inclusive).
58
        last_night, int. Last night to include (inclusive).
59

60
    Returns:
61
        nights, list. A list of nights on or after Jan 1 2020 in which data exists at NERSC.
62
    """
63
    fulletab = read_minimal_science_exptab_cols()
×
64
    nights = np.unique(fulletab['NIGHT'])
×
65
    nights = nights[((nights>=first_night)&(nights<=last_night))]
×
66
    return nights
×
67

68
def get_nights_to_process(production_yaml, verbose=False):
×
69
    """
70
    Derives the nights to be processed based on a production yaml file and
71
    returns a list of int nights.
72

73
    Args:
74
        production_yaml (str or dict): Production yaml or pathname of the
75
            yaml file that defines the production.
76
        verbose (bool): Whether to be verbose in log outputs.
77

78
    Returns:
79
        nights, list. A list of nights on or after Jan 1 2020 in which data exists at NERSC.
80
    """
81
    log = get_logger()
×
82
    ## If production_yaml not loaded, load the file
83
    if isinstance(production_yaml, str):
×
84
        if not os.path.exists(production_yaml):
×
85
            raise IOError(f"Prod yaml file doesn't exist: {production_yaml} not found.")
×
86
        with open(production_yaml, 'rb') as yamlfile:
×
87
            config = yaml.safe_load(yamlfile)
×
88
    else:
89
        config = production_yaml
×
90

91
    all_nights, first_night = None, None
×
92
    if 'NIGHTS' in config and 'LAST_NIGHT' in config:
×
93
        log.error(f"Both NIGHTS and LAST_NIGHT specified. Using NIGHTS "
×
94
                  + f"and ignoring LAST_NIGHT.")
95
    if 'NIGHTS' in config:
×
96
        all_nights = np.array(list(config['NIGHTS'])).astype(int)
×
97
        if verbose:
×
98
            log.info(f"Setting all_nights to NIGHTS: {all_nights}")
×
99
            log.info("Setting first_night to earliest night in NIGHTS:"
×
100
                     + f" {np.min(all_nights)}")
101
        first_night = np.min(all_nights)
×
102
        if verbose:
×
103
            log.info("Setting last_night to latest night in NIGHTS: "
×
104
                     + f"{np.max(all_nights)}")
105
        last_night = np.max(all_nights)
×
106
    elif 'LAST_NIGHT' in config:
×
107
        last_night = int(config['LAST_NIGHT'])
×
108
        if verbose:
×
109
            log.info(f"Setting last_night to LATEST_NIGHT: {last_night}")
×
110
    else:
111
        raise ValueError("Either NIGHT or LAST_NIGHT required in yaml "
×
112
                         + f"file {production_yaml}")
113

114
    if first_night is None:
×
115
        if 'FIRST_NIGHT' in config:
×
116
            first_night = int(config['FIRST_NIGHT'])
×
117
            if verbose:
×
118
                log.info(f"Setting first_night to FIRST_NIGHT: {first_night}")
×
119
        else:
120
            if verbose:
×
121
                log.info("Setting first_night to earliest in a normal prod: 20201214")
×
122
            first_night = 20201214
×
123

124
    if all_nights is None:
×
125
        # all_nights = get_nights_in_date_range(first_night, last_night)
126
        if verbose:
×
127
            log.info("Populating all_nights with all of the nights with valid science "
×
128
                     + f"exposures between {first_night} and {last_night} inclusive")
129
        all_nights = get_all_valid_nights(first_night, last_night)
×
130
    return sorted(all_nights)
×
131

132

133
def submit_production(production_yaml, queue_threshold=4500, dry_run_level=False):
×
134
    """
135
    Interprets a production_yaml file and submits the respective nights for processing
136
    within the defined production.
137

138
    Args:
139
        production_yaml (str): Pathname of the yaml file that defines the production.
140
        queue_threshold (int): The number of jobs for the current user in the queue
141
            at which the script stops submitting new jobs.
142
        dry_run_level (int, optional): Default is 0. Should the jobs written to the processing table actually be submitted
143
            for processing. This is passed directly to desi_proc_night.
144

145
    Returns:
146
        None.
147
    """
148
    log = get_logger()
×
149
    ## Load the yaml file
150
    if not os.path.exists(production_yaml):
×
151
        raise IOError(f"Prod yaml file doesn't exist: {production_yaml} not found.")
×
152
    with open(production_yaml, 'rb') as yamlfile:
×
153
        conf = yaml.safe_load(yamlfile)
×
154

155
    ## Unset Slurm environment variables set when running in scrontab
156
    remove_slurm_environment_variables()
×
157

158
    ## Make sure the specprod matches, if not set it to that in the file
159
    if 'SPECPROD' not in conf:
×
160
        raise ValueError(f"SPECPROD required in yaml file {production_yaml}")
×
161
    specprod = str(conf['SPECPROD']).lower()
×
162
    specprod = verify_variable_with_environment(var=specprod, var_name='specprod',
×
163
                                                env_name='SPECPROD')
164

165
    ## Define the user
166
    user = os.environ['USER']
×
167

168
    ## Look for sentinal
169
    sentinel_file = os.path.join(os.environ['DESI_SPECTRO_REDUX'],
×
170
                                 os.environ['SPECPROD'], 'run',
171
                                 'prod_submission_complete.txt')
172
    if os.path.exists(sentinel_file):
×
173
        log.info(f"Sentinel file {sentinel_file} exists, therefore all "
×
174
                 + f"nights already submitted.")
175
        return 0
×
176

177
    ## Load the nights to process
178
    all_nights = get_nights_to_process(production_yaml=conf, verbose=True)
×
179

180
    ## Load the other parameters for running desi_proc_night
181
    if 'THRU_NIGHT' in conf:
×
182
        thru_night = int(conf['THRU_NIGHT'])
×
183
        log.info(f"Setting thru_night to THRU_NIGHT: {thru_night}")
×
184
    else:
185
        thru_night = np.max(all_nights)
×
186
        log.warning(f"Setting thru_night to last night: {thru_night}")
×
187

188
    ## If not specified, run "cumulative" redshifts, otherwise do
189
    ## as directed
190
    no_redshifts = False
×
191
    if 'Z_SUBMIT_TYPES' in conf:
×
192
        z_submit_types_str = str(conf['Z_SUBMIT_TYPES'])
×
193
        if z_submit_types_str.lower() in ['false', 'none']:
×
194
            z_submit_types = None
×
195
            no_redshifts = True
×
196
        else:
197
            z_submit_types = [ztype.strip().lower() for ztype in
×
198
                                   z_submit_types_str.split(',')]
199
    else:
200
        z_submit_types = ['cumulative']
×
201

202
    if 'SURVEYS' in conf:
×
203
        surveys_str = str(conf['SURVEYS'])
×
204
        if surveys_str.lower() in ['false', 'none']:
×
205
            surveys = None
×
206
        else:
207
            surveys = [survey.strip().lower() for survey in
×
208
                       surveys_str.split(',')]
209
    else:
210
        surveys = None
×
211

212
    ## Bring in the queue and reservation information, if any
213
    if 'QUEUE' in conf:
×
214
        queue = conf['QUEUE']
×
215
    else:
216
        queue = 'regular'
×
217

218
    if 'RESERVATION' in conf:
×
219
        reservation = str(conf['RESERVATION'])
×
220
        if reservation.lower() == 'none':
×
221
            reservation = None
×
222
    else:
223
        reservation = None
×
224

225
    ## Let user know what was defined
226
    if z_submit_types is not None:
×
227
        log.info(f'Using z_submit_types: {z_submit_types}')
×
228
    if surveys is not None:
×
229
        log.info(f'Using surveys: {surveys}')
×
230
    log.info(f'Using queue: {queue}')
×
231
    if reservation is not None:
×
232
        log.info(f'Using reservation: {reservation}')
×
233

234
    ## Define log location
235
    logpath = os.path.join(os.environ['DESI_SPECTRO_REDUX'],
×
236
                          os.environ['SPECPROD'], 'run', 'logs')
237
    if dry_run_level < 4:
×
238
        os.makedirs(logpath, exist_ok=True)
×
239
    else:
240
        log.info(f"{dry_run_level=} so not creating {logpath}")
×
241

242
    ## Do the main processing
243
    finished = False
×
244
    processed_nights, skipped_nights = [], []
×
245
    all_nights = sorted(all_nights)
×
246
    log.info(f"Processing {all_nights=}")
×
247
    for night in sorted(all_nights):
×
248
        ## If proctable exists, assume we've already completed that night
249
        if os.path.exists(findfile('proctable', night=night, readonly=True)):
×
250
            skipped_nights.append(night)
×
251
            log.info(f"{night=} already has a proctable, skipping.")
×
252
            continue
×
253

254
        ## If the queue is too full, stop submitting nights
255
        num_in_queue = check_queue_count(user=user, include_scron=False,
×
256
                                         dry_run_level=dry_run_level)
257
        ## In Jura the largest night had 115 jobs, to be conservative we submit
258
        ## up to 4500 jobs (out of a 5000 limit) by default
259
        if num_in_queue > queue_threshold:
×
260
            log.info(f"{num_in_queue} jobs in the queue > {queue_threshold},"
×
261
                     + " so stopping the job submissions.")
262
            break
×
263

264
        ## We don't expect exposure tables to change during code execution here
265
        ## but we do expect processing tables to evolve, so clear that cache
266
        log.info(f"Processing {night=}")
×
267

268
        ## Belt-and-suspenders: reset the processing table cache to force a re-read.
269
        ## This shouldn't be necessary, but resetting the cache is conservative.
270
        desispec.workflow.proctable.reset_tilenight_ptab_cache()
×
271

272
        if dry_run_level < 4:
×
273
            logfile = os.path.join(logpath, f'night-{night}.log')
×
274
            with stdouterr_redirected(logfile):
×
275
                proc_night(night=night, z_submit_types=z_submit_types,
×
276
                           no_redshifts=no_redshifts,
277
                           complete_tiles_thrunight=thru_night,
278
                           surveys=surveys, dry_run_level=dry_run_level,
279
                           queue=queue, reservation=reservation)
280
        else:
281
            log.info(f"{dry_run_level=} so not running desi_proc_night. "
×
282
                     + f"Would have run for {night=}")
283

284
        processed_nights.append(night)
×
285
        # proc_night(night=None, proc_obstypes=None, z_submit_types=None,
286
        #            queue=None, reservation=None, system_name=None,
287
        #            exp_table_pathname=None, proc_table_pathname=None,
288
        #            override_pathname=None, update_exptable=False,
289
        #            dry_run_level=0, dry_run=False, no_redshifts=False,
290
        #            ignore_proc_table_failures=False,
291
        #            dont_check_job_outputs=False,
292
        #            dont_resubmit_partial_jobs=False,
293
        #            tiles=None, surveys=None, science_laststeps=None,
294
        #            all_tiles=False, specstatus_path=None, use_specter=False,
295
        #            no_cte_flats=False, complete_tiles_thrunight=None,
296
        #            all_cumulatives=False, daily=False, specprod=None,
297
        #            path_to_data=None, exp_obstypes=None, camword=None,
298
        #            badcamword=None, badamps=None, exps_to_ignore=None,
299
        #            sub_wait_time=0.1, verbose=False,
300
        #            dont_require_cals=False,
301
        #            psf_linking_without_fflat=False,
302
        #            still_acquiring=False)
303
        log.info(f"Completed {night=}.")
×
304
    else:
305
        ## I.e. if the above loop didn't "break" because of exceeding the queue
306
        ## and all nights finished
307
        finished = True
×
308
        # write the sentinel
309
        if dry_run_level < 4:
×
310
            with open(sentinel_file, 'w') as sentinel:
×
311
                sentinel.write(
×
312
                    f"All done with processing for {production_yaml}\n")
313
                sentinel.write(f"Nights processed: {all_nights}\n")
×
314
        else:
315
            log.info(f"{dry_run_level=} so not creating {sentinel_file}")
×
316

317

318
    log.info("Skipped the following nights that already had a processing table:")
×
319
    log.info(skipped_nights)
×
320
    log.info("Processed the following nights:")
×
321
    log.info(processed_nights)
×
322
    if finished:
×
323
        log.info('\n\n\n')
×
324
        log.info("All nights submitted")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc