• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

desihub / desispec / 19149943254

06 Nov 2025 09:09PM UTC coverage: 37.713% (+0.7%) from 37.002%
19149943254

Pull #2521

github

web-flow
Merge c9b4e1570 into 6a90a0547
Pull Request #2521: Add redshift QA scripts

12988 of 34439 relevant lines covered (37.71%)

0.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.55
/py/desispec/workflow/batch.py
1
"""
2
desispec.workflow.batch
3
=======================
4

5
Utilities for working with slurm batch queues.
6
"""
7

8
import os
1✔
9
from importlib import resources
1✔
10
import yaml
1✔
11

12
from desiutil.log import get_logger
1✔
13

14
_config_cache = dict()
1✔
15
def get_config(name):
1✔
16
    """
17
    Return configuration dictionary for system `name`
18

19
    Args:
20
        name (str): e.g. cori-haswell, cori-knl, dirac, perlmutter-gpu, ...
21

22
    Returns dictionary with keys:
23
        * site: location of system, e.g. 'NERSC'
24
        * cores_per_node: number of physical cores per node
25
        * threads_per_core: hyperthreading / SMT per core
26
        * memory: memory per node in GB
27
        * timefactor: scale time estimates by this amount on this system
28
        * gpus_per_node: number of GPUs per node
29
        * batch_opts: list of additional batch options for script header
30
    """
31
    if name is None:
1✔
32
        name = default_system()
×
33

34
    global _config_cache
35
    if name in _config_cache:
1✔
36
        return _config_cache[name]
1✔
37

38
    configfile = resources.files('desispec').joinpath('data/batch_config.yaml')
1✔
39
    with open(configfile) as fx:
1✔
40
        config = yaml.safe_load(fx)
1✔
41

42
    #- Add the name for reference, in case it was default selected
43
    config['name'] = name
1✔
44

45
    #- Add to cache so that we don't have to re-read batch_config.yaml every time
46
    _config_cache[name] = config[name]
1✔
47

48
    return config[name]
1✔
49

50

51
def default_system(jobdesc=None, no_gpu=False):
1✔
52
    """
53
    Guess default system to use based on environment
54

55
    Args:
56
        jobdesc (str): Description of the job in the processing table (optional).
57
        no_gpu (bool): Don't use GPU's even if available. Default False.
58

59
    Returns:
60
         name (str): default system name to use
61
    """
62
    log = get_logger()
1✔
63
    name = None
1✔
64
    if 'NERSC_HOST' in os.environ:
1✔
65
        if os.environ['NERSC_HOST'] == 'perlmutter':
1✔
66
            ## HARDCODED: for now arcs and biases can't use gpu's, so use cpu's
67
            if jobdesc in ['linkcal', 'arc', 'biasnight', 'biaspdark',
1✔
68
                           'ccdcalib', 'badcol', 'psfnight', 'pdark' ]:
69
                name = 'perlmutter-cpu'
1✔
70
            elif no_gpu:
1✔
71
                name = 'perlmutter-cpu'
×
72
            else:
73
                name = 'perlmutter-gpu'
1✔
74
    elif os.path.isdir('/clusterfs/dirac1'):
×
75
        name = 'dirac'
×
76

77
    if name is None:
1✔
78
        msg = 'Unable to determine default batch system from environment'
×
79
        log.error(msg)
×
80
        raise RuntimeError(msg)
×
81
    else:
82
        log.info(f'Guessing default batch system {name}')
1✔
83

84
    return name
1✔
85

86

87
def parse_reservation(reservation, jobdesc):
1✔
88
    """
89
    Parse reservation name into cpu/gpu reservation based upon jobdesc
90

91
    Args:
92
        reservation (str): resvname or resvname_cpu,resvname_gpu or None
93
        jobdesc (str): job description string e.g. 'arc', 'flat', 'tilenight'
94

95
    Returns:
96
        cpu_reservation_name, gpu_reservation_name
97

98
    If a single reservation name is provided, return both cpu/gpu as the same.
99
    If either is 'none' (case-insensitive), return None for that reservation
100
    """
101
    if reservation is None:
1✔
102
        return reservation
1✔
103

104
    tmp = reservation.split(',')
1✔
105
    if len(tmp) == 1:
1✔
106
        reservation_cpu = reservation_gpu = reservation
1✔
107
    elif len(tmp) == 2:
1✔
108
        reservation_cpu, reservation_gpu = tmp
1✔
109
    else:
110
        raise ValueError(f'Unable to parse {reservation} as rescpu,resgpu')
1✔
111

112
    if reservation_cpu.lower() == 'none':
1✔
113
        reservation_cpu = None
1✔
114

115
    if reservation_gpu.lower() == 'none':
1✔
116
        reservation_gpu = None
1✔
117

118
    system_name = default_system(jobdesc)
1✔
119
    config = get_config(system_name)
1✔
120

121
    if 'gpus_per_node' not in config or config['gpus_per_node'] == 0:
1✔
122
        return reservation_cpu
1✔
123
    else:
124
        return reservation_gpu
1✔
125

126

127
def determine_resources(ncameras, jobdesc, nexps=1, forced_runtime=None, queue=None, system_name=None):
1✔
128
    """
129
    Determine the resources that should be assigned to the batch script given what
130
    desi_proc needs for the given input information.
131

132
    Args:
133
        ncameras (int): number of cameras to be processed
134
        jobdesc (str): type of data being processed
135
        nexps (int, optional): the number of exposures processed in this step
136
        queue (str, optional): the Slurm queue to be submitted to. Currently not used.
137
        system_name (str, optional): batch compute system, e.g. cori-haswell or perlmutter-gpu
138

139
    Returns:
140
        tuple: A tuple containing:
141

142
        * ncores: int, number of cores (actually 2xphysical cores) that should be submitted via "-n {ncores}"
143
        * nodes:  int, number of nodes to be requested in the script. Typically  (ncores-1) // cores_per_node + 1
144
        * runtime: int, the max time requested for the script in minutes for the processing.
145
    """
146
    if system_name is None:
1✔
147
        system_name = default_system(jobdesc=jobdesc)
×
148

149
    config = get_config(system_name)
1✔
150
    log = get_logger()
1✔
151
    jobdesc = jobdesc.upper()
1✔
152

153
    nspectro = (ncameras - 1) // 3 + 1
1✔
154
    nodes = None
1✔
155
    if jobdesc in ('ARC', 'TESTARC'):
1✔
156
        ncores          = 20 * (10*(ncameras+1)//20) # lowest multiple of 20 exceeding 10 per camera
1✔
157
        ncores, runtime = ncores + 1, 45             # + 1 for worflow.schedule scheduler proc
1✔
158
    elif jobdesc in ('FLAT', 'TESTFLAT'):
1✔
159
        runtime = 40
1✔
160
        if system_name.startswith('perlmutter'):
1✔
161
            ncores = config['cores_per_node']
1✔
162
        else:
163
            ncores = 20 * nspectro
×
164
    elif jobdesc == 'TILENIGHT':
1✔
165
        runtime  = int(60. / 140. * ncameras * nexps) # 140 frames per node hour
1✔
166
        runtime += 40                                 # overhead
1✔
167
        ncores = config['cores_per_node']
1✔
168
        if not system_name.startswith('perlmutter'):
1✔
169
            msg = 'tilenight cannot run on system_name={}'.format(system_name)
×
170
            log.critical(msg)
×
171
            raise ValueError(msg)
×
172
    elif jobdesc in ('SKY', 'TWILIGHT', 'SCIENCE','PRESTDSTAR'):
1✔
173
        runtime = 30
×
174
        if system_name.startswith('perlmutter'):
×
175
            ncores = config['cores_per_node']
×
176
        else:
177
            ncores = 20 * nspectro
×
178
    elif jobdesc in ('DARK', 'BADCOL'):
1✔
179
        ncores, runtime = ncameras, 5
×
180
    elif jobdesc in ('BIASNIGHT', 'BIASPDARK'):
1✔
181
        ## Jobs are memory limited, so use 15 cores per node
182
        ## and split work of 30 cameras across 2 nodes
183
        nodes = (ncameras // 16) + 1 # 2 nodes unless ncameras <= 15
1✔
184
        ncores = 15
1✔
185
        ## 8 minutes base plus 4 mins per loop over dark exposures
186
        pdarkcores = min([ncameras*nexps, nodes*config['cores_per_node']])
1✔
187
        runtime = 8 + 4.*(float(nodes*config['cores_per_node'])/float(pdarkcores))
1✔
188
    elif jobdesc in ('PDARK'):
1✔
189
        nodes = 1 
1✔
190
        # can do 1 core per camera per exp, but limit to cores available
191
        ncores = min([ncameras*nexps, nodes*config['cores_per_node']])
1✔
192
        ## 4 minutes base plus 4 mins per loop over dark exposures    
193
        runtime = 4 + 4.*(float(nodes*config['cores_per_node'])/float(ncores))
1✔
194
    elif jobdesc == 'CCDCALIB':
1✔
195
        nodes = 1
1✔
196
        ncores, runtime = ncameras, 7 # 5 mins after perlmutter system scaling factor
1✔
197
    elif jobdesc == 'ZERO':
1✔
198
        ncores, runtime = 2, 5
×
199
    elif jobdesc == 'PSFNIGHT':
1✔
200
        ncores, runtime = ncameras, 5
1✔
201
    elif jobdesc == 'NIGHTLYFLAT':
1✔
202
        ncores, runtime = ncameras, 5
1✔
203
    elif jobdesc == 'STDSTARFIT':
1✔
204
        #- Special case hardcode: stdstar parallelism maxes out at ~30 cores
205
        #- and on KNL, it OOMs above that anyway.
206
        #- This might be more related to using a max of 30 standards, not that
207
        #- there are 30 cameras (coincidence).
208
        #- Use 32 as power of 2 for core packing
209
        ncores = 32
×
210
        runtime = 8+2*nexps
×
211
    elif jobdesc == 'POSTSTDSTAR':
1✔
212
        runtime = 10
×
213
        ncores = ncameras
×
214
    elif jobdesc == 'NIGHTLYBIAS':
1✔
215
        ncores, runtime = 15, 5
×
216
        nodes = 2
×
217
    elif jobdesc in ['PEREXP', 'PERNIGHT', 'CUMULATIVE', 'CUSTOMZTILE']:
1✔
218
        if system_name.startswith('perlmutter'):
1✔
219
            nodes, runtime = 1, 50  #- timefactor will bring time back down
1✔
220
        else:
221
            nodes, runtime = 2, 30
×
222
        ncores = nodes * config['cores_per_node']
1✔
223
    elif jobdesc == 'HEALPIX':
×
224
        nodes = 1
×
225
        runtime = 100
×
226
        ncores = nodes * config['cores_per_node']
×
227
    elif jobdesc == 'LINKCAL':
×
228
        nodes, ncores = 1, 1
×
229
        runtime = 5.
×
230
    else:
231
        msg = 'unknown jobdesc={}'.format(jobdesc)
×
232
        log.critical(msg)
×
233
        raise ValueError(msg)
×
234

235
    if forced_runtime is not None:
1✔
236
        runtime = forced_runtime
×
237

238
    if nodes is None:
1✔
239
        nodes = (ncores - 1) // config['cores_per_node'] + 1
1✔
240

241
    # - Arcs and flats make good use of full nodes, but throttle science
242
    # - exposures to 5 nodes to enable two to run together in the 10-node
243
    # - realtime queue, since their wallclock is dominated by less
244
    # - efficient sky and fluxcalib steps
245
    if jobdesc in ('ARC', 'TESTARC'):#, 'FLAT', 'TESTFLAT'):
1✔
246
        max_realtime_nodes = 10
1✔
247
    else:
248
        max_realtime_nodes = 5
1✔
249

250
    #- Pending further optimizations, use same number of nodes in all queues
251
    ### if (queue == 'realtime') and (nodes > max_realtime_nodes):
252
    if (nodes > max_realtime_nodes):
1✔
253
        nodes = max_realtime_nodes
×
254
        ncores = config['cores_per_node'] * nodes
×
255
        if jobdesc in ('ARC', 'TESTARC'):
×
256
            # adjust for workflow.schedule scheduler proc
257
            ncores = ((ncores - 1) // 20) * 20 + 1
×
258

259
    #- Allow KNL jobs to be slower than Haswell,
260
    #- except for ARC so that we don't have ridiculously long times
261
    #- (Normal arc is still ~15 minutes, albeit with a tail)
262
    if jobdesc not in ['ARC', 'TESTARC']:
1✔
263
        runtime *= config['timefactor']
1✔
264

265
    #- Do not allow runtime to be less than 5 min
266
    if runtime < 5:
1✔
267
        runtime = 5
1✔
268

269
    #- Add additional overhead factor if needed
270
    if 'NERSC_RUNTIME_OVERHEAD' in os.environ:
1✔
271
        t = os.environ['NERSC_RUNTIME_OVERHEAD']
×
272
        log.info(f'Adding $NERSC_RUNTIME_OVERHEAD={t} minutes to batch runtime request')
×
273
        runtime += float(runtime)
×
274

275
    return ncores, nodes, runtime
1✔
276

277

278

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc