• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

desihub / desispec / 19150128419

06 Nov 2025 09:17PM UTC coverage: 37.704% (+0.7%) from 37.002%
19150128419

Pull #2521

github

web-flow
Merge 2934f9b9b into 6a90a0547
Pull Request #2521: Add redshift QA scripts

12985 of 34439 relevant lines covered (37.7%)

0.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.53
/py/desispec/scripts/zproc.py
1
"""
2
desispec.scripts.zproc
3
======================
4

5
One stop shopping for redshifting  DESI spectra
6
"""
7

8
import time
1✔
9

10
from desispec.workflow.batch_writer import create_desi_proc_batch_script
1✔
11
from desispec.workflow.timing import log_timer
1✔
12
start_imports = time.time()
1✔
13

14
#- python imports
15
import datetime
1✔
16
import sys, os, argparse, re
1✔
17
import subprocess
1✔
18
from copy import deepcopy
1✔
19
import json
1✔
20
import glob
1✔
21

22
#- external 3rd party imports
23
import numpy as np
1✔
24
import fitsio
1✔
25
from astropy.io import fits
1✔
26
from astropy.table import Table,vstack
1✔
27

28
#- external desi imports
29
from redrock.external import desi
1✔
30
import desiutil.timer
1✔
31
from desiutil.log import get_logger, DEBUG, INFO
1✔
32
import desiutil.iers
1✔
33

34
from desispec.io.meta import get_nights_up_to_date
1✔
35
from desispec.workflow.redshifts import create_desi_zproc_batch_script
1✔
36

37
#- internal desispec imports
38
import desispec.io
1✔
39
from desispec.io import findfile, specprod_root, replace_prefix, shorten_filename, get_readonly_filepath
1✔
40
from desispec.io.util import create_camword, decode_camword, parse_cameras, \
1✔
41
    camword_to_spectros, columns_to_goodcamword, difference_camwords
42
from desispec.io.util import validate_badamps, get_tempfilename, backup_filename
1✔
43
from desispec.util import runcmd
1✔
44
from desispec.scripts import group_spectra, coadd_spectra
1✔
45
from desispec.parallel import stdouterr_redirected
1✔
46
from desispec.workflow import batch
1✔
47
from desispec.workflow.exptable import get_exposure_table_pathname, \
1✔
48
    read_minimal_science_exptab_cols
49
from desispec.workflow.desi_proc_funcs import assign_mpi, update_args_with_headers
1✔
50
from desispec.workflow.batch import determine_resources
1✔
51

52
stop_imports = time.time()
1✔
53

54
#########################################
55
######## Begin Body of the Code #########
56
#########################################
57
def parse(options=None):
1✔
58
    """
59
    Parses an argparser object for use with desi_zproc and returns arguments
60
    """
61
    parser = argparse.ArgumentParser(usage="{prog} [options]")
×
62

63
    parser.add_argument("-g", "--groupname", type=str,
×
64
                        help="Redshift grouping type: cumulative, perexp, pernight, healpix, or custom name")
65

66
    #- Options for tile-based redshifts
67
    tiles_options = parser.add_argument_group("tile-based options (--groupname perexp, pernight, or cumulative)")
×
68
    tiles_options.add_argument("-t", "--tileid", type=int, default=None,
×
69
                        help="Tile ID")
70
    tiles_options.add_argument("-n", "--nights", type=int, nargs='*', default=None,
×
71
                        help="YEARMMDD night")
72
    tiles_options.add_argument("-e", "--expids", type=int, nargs='*', default=None,
×
73
                        help="Exposure IDs")
74
    tiles_options.add_argument("--thrunight", type=int, default=None,
×
75
                        help="Last night to include (YEARMMDD night) for "
76
                             + "cumulative redshift jobs. Used instead of nights.")
77
    tiles_options.add_argument("-c", "--cameras", type=str,
×
78
                        help="Subset of cameras to process, either as a camword (e.g. a012)" +
79
                             "Or a comma separated list (e.g. b0,r0,z0).")
80
    parser.add_argument("--subgroup", type=str,
×
81
                        help="subgroup to use for non-standard groupname values")
82

83
    #- Options for healpix-based redshifts
84
    healpix_options = parser.add_argument_group("healpix-based options (--groupname healpix)")
×
85
    healpix_options.add_argument("-p", "--healpix", type=int, nargs='*', default=None,
×
86
            help="healpix pixels (nested nside=64")
87
    healpix_options.add_argument("--survey", help="survey name, e.g. main,sv1,sv3")
×
88
    healpix_options.add_argument("--program", help="program name, e.g. dark,bright,backup,other")
×
89
    healpix_options.add_argument("--expfiles", nargs='*',
×
90
                        help="csv files with NIGHT,EXPID,SPECTRO,HEALPIX")
91
    healpix_options.add_argument("--prodexpfile", help="production summary exposure file (using pre-generated --expfiles is more efficient)")
×
92

93
    #- Processing options
94
    processing_options = parser.add_argument_group('processing options')
×
95
    processing_options.add_argument("--mpi", action="store_true",
×
96
                        help="Use MPI parallelism")
97
    processing_options.add_argument("--no-gpu", action="store_true",
×
98
                        help="Don't use gpus")
99
    processing_options.add_argument("--max-gpuprocs", type=int, default=4,
×
100
                        help="Number of GPU prcocesses per node")
101
    processing_options.add_argument("--run-zmtl", action="store_true",
×
102
                        help="Whether to run zmtl or not")
103
    processing_options.add_argument("--no-afterburners", action="store_true",
×
104
                        help="Set if you don't want to run afterburners")
105
    processing_options.add_argument("--starttime", type=float,
×
106
                        help='start time; use "--starttime $(date +%%s)"')
107
    processing_options.add_argument("--timingfile", type=str,
×
108
                        help='save runtime info to this json file; augment if pre-existing')
109
    processing_options.add_argument("-d", "--dryrun", action="store_true",
×
110
                        help="show commands only, do not run")
111

112
    #- Batch submission options
113
    batch_options = parser.add_argument_group("batch queue options")
×
114
    batch_options.add_argument("--batch", action="store_true",
×
115
                        help="Submit a batch job to process this exposure")
116
    batch_options.add_argument("--nosubmit", action="store_true",
×
117
                        help="Create batch script but don't submit")
118
    batch_options.add_argument("-q", "--queue", type=str, default="realtime",
×
119
                        help="batch queue to use (default %(default)s)")
120
    batch_options.add_argument("--batch-opts", type=str, default=None,
×
121
                        help="additional batch commands")
122
    batch_options.add_argument("--batch-reservation", type=str,
×
123
                        help="batch reservation name")
124
    batch_options.add_argument("--batch-dependency", type=str,
×
125
                        help="job dependencies passed to sbatch --dependency")
126
    batch_options.add_argument("--runtime", type=int, default=None,
×
127
                        help="batch runtime in minutes")
128
    batch_options.add_argument("--system-name", type=str,
×
129
                        default=None,
130
                        help='Batch system name (cori-haswell, perlmutter-gpu, ...)')
131

132
    if options is not None:
×
133
        args = parser.parse_args(options)
×
134
    else:
135
        args = parser.parse_args()
×
136

137
    return args
×
138

139

140
def main(args=None, comm=None):
141
    """
142
    Run spectral grouping, coaddition, redshifts, and afterburners
143

144
    Options:
145
        args (argparse.Namespace or list of str): options to use instead of sys.argv
146
        comm: mpi4py communicator to use for parallelism
147

148
    Returns: integer error code (0=good)
149
    """
150
    #- save original command line before parsing it
151
    #- in case we need it for creating a batch script
152
    if args is None or isinstance(args, argparse.Namespace):
153
        cmdline = sys.argv.copy()
154
    else:
155
        cmdline = ['desi_zproc',] + list(args)
156

157
    if not isinstance(args, argparse.Namespace):
158
        args = parse(options=args)
159

160
    if args.starttime is not None:
161
        start_time = args.starttime
162
    else:
163
        start_time = time.time()
164

165
    log = get_logger()
166

167
    start_mpi_connect = time.time()
168
    if comm is not None:
169
        ## Use the provided comm to determine rank and size
170
        rank = comm.rank
171
        size = comm.size
172
    else:
173
        ## Check MPI flags and determine the comm, rank, and size given the arguments
174
        comm, rank, size = assign_mpi(do_mpi=args.mpi, do_batch=args.batch, log=log)
175
    stop_mpi_connect = time.time()
176

177
    #- set default groupname if needed (cumulative for tiles, otherwise healpix)
178
    if args.groupname is None:
179
        if args.tileid is not None:
180
            args.groupname = 'cumulative'
181
        elif args.healpix is not None:
182
            args.groupname = 'healpix'
183
        else:
184
            msg = 'Must specify --tileid or --healpix'
185
            log.critical(msg)
186
            raise ValueError(msg)
187

188
    #- consistency of options
189
    if args.groupname == 'healpix':
190
        assert args.healpix is not None, "--groupname healpix requires setting --healpix too"
191
        assert args.nights is None, f"--groupname healpix doesn't use --nights {args.nights}"
192
        assert args.expids is None, f"--groupname healpix doesn't use --expids {args.expids}"
193
        assert args.thrunight is None, f"--groupname healpix doesn't use --thrunight {args.thrunight}"
194
        assert args.cameras is None, f"--groupname healpix doesn't use --cameras {args.cameras}"
195
        assert (args.expfiles is None) or (args.prodexpfile is None), \
196
                "--groupname healpix use --expfiles OR --prodexpfile but not both"
197
    else:
198
        assert args.tileid is not None, f"--groupname {args.groupname} requires setting --tileid too"
199
        if args.cameras is None:
200
            args.cameras = 'a0123456789'
201

202
    if args.expfiles is not None:
203
        if args.nights is not None or args.expids is not None:
204
            msg = "use --expfiles OR --nights and --expids, but not both"
205
            log.error(msg)
206
            raise ValueError(msg)
207

208
    today = int(time.strftime('%Y%m%d'))
209
    if args.thrunight is not None:
210
        if args.groupname not in ['cumulative',]:
211
            msg = f"--thrunight only valid for cumulative redshifts."
212
            log.error(msg)
213
            raise ValueError(msg)
214
        #- very early data isn't supported, and future dates aren't supported
215
        #- because that implies data are included that don't yet exist
216
        elif args.thrunight < 20200214 or args.thrunight > today:
217
            msg = f"--thrunight must be between 20200214 and today"
218
            log.error(msg)
219
            raise ValueError(msg)
220

221
    if args.expids is not None:
222
        if args.nights is None:
223
            msg = f"Must specify --nights if specifying --expids."
224
            log.error(msg)
225
            raise ValueError(msg)
226
        else:
227
            if rank == 0:
228
                msg = f"Only using exposures specified with --expids {args.expids}"
229
                log.info(msg)
230

231
    if args.groupname in ['perexp', 'pernight'] and args.nights is not None:
232
        if len(args.nights) > 1:
233
            msg = f"Only expect one night for groupname {args.groupname}" \
234
                  + f" but received nights={args.nights}."
235
            log.error(msg)
236
            raise ValueError(msg)
237

238
    if (args.groupname == 'healpix') and (args.expfiles is None) and (args.prodexpfile is None):
239
        args.prodexpfile = findfile('exposures')
240
        if rank == 0:
241
            log.info(f'Using default --prodexpfile {args.prodexpfile}')
242
        if not os.path.exists(args.prodexpfile):
243
            msg = f'Missing {args.prodexpfile}; please create with desi_tsnr_afterburner or specify different --prodexpfile'
244
            if rank == 0:
245
                log.critical(msg)
246

247
            raise ValueError(msg)
248

249
    ## Unpack arguments for shorter names (tileid might be None, ok)
250
    tileid, groupname, subgroup = args.tileid, args.groupname, args.subgroup
251

252
    known_groups = ['cumulative', 'pernight', 'perexp', 'healpix']
253
    if groupname not in known_groups:
254
        if subgroup is None:
255
            msg = f'Non-standard --groupname={groupname} requires --subgroup'
256
            if rank == 0:
257
                log.critical(msg)
258
            raise ValueError(msg)
259
        else:
260
            msg = f'Non-standard {groupname=} not in {known_groups}; using {subgroup=}'
261
            if rank == 0:
262
                log.warning(msg)
263

264
    #- redrock non-MPI mode isn't compatible with GPUs,
265
    #- so if zproc is running in non-MPI mode, force --no-gpu
266
    #- https://github.com/desihub/redrock/issues/223
267
    if (args.mpi == False) and (args.no_gpu == False) and (not args.batch):
268
        log.warning("Redrock+GPU currently only works with MPI; since this is non-MPI, forcing --no-gpu")
269
        log.warning("See https://github.com/desihub/redrock/issues/223")
270
        args.no_gpu = True
271

272
    error_count = 0
273

274
    if rank == 0:
275
        thisfile=os.path.dirname(os.path.abspath(__file__))
276
        thistime=datetime.datetime.fromtimestamp(start_imports).isoformat()
277
        log.info(f'rank 0 started {thisfile} at {thistime}')
278
    ## Start timer; only print log messages from rank 0 (others are silent)
279
    timer = desiutil.timer.Timer(silent=rank>0)
280

281
    ## Fill in timing information for steps before we had the timer created
282
    if args.starttime is not None:
283
        timer.start('startup', starttime=args.starttime)
284
        timer.stop('startup', stoptime=start_imports)
285

286
    timer.start('imports', starttime=start_imports)
287
    timer.stop('imports', stoptime=stop_imports)
288

289
    timer.start('mpi_connect', starttime=start_mpi_connect)
290
    timer.stop('mpi_connect', stoptime=stop_mpi_connect)
291

292
    ## Freeze IERS after parsing args so that it doesn't bother if only --help
293
    timer.start('freeze_iers')
294
    ## Redirect all of the freeze_iers messages to /dev/null
295
    with stdouterr_redirected(comm=comm):
296
        desiutil.iers.freeze_iers()
297
    if rank == 0:
298
        log.info("Froze iers for all ranks")
299
    timer.stop('freeze_iers')
300

301

302
    timer.start('preflight')
303

304
    ## Derive the available cameras
305
    if args.groupname == 'healpix':
306
        camword = 'a0123456789'
307
    elif isinstance(args.cameras, str):
308
        if rank == 0:
309
            camword = parse_cameras(args.cameras)
310
        else:
311
            camword = parse_cameras(args.cameras, loglevel='ERROR')
312
    else:
313
        camword = create_camword(args.cameras)
314

315
    if args.batch:
316
        err = 0
317
        #-------------------------------------------------------------------------
318
        ## Create and submit a batch job if requested
319
        if rank == 0:
320
            ## create the batch script
321
            scriptfile = create_desi_zproc_batch_script(group=groupname,
322
                                                        subgroup=subgroup,
323
                                                        tileid=tileid,
324
                                                        cameras=camword,
325
                                                        thrunight=args.thrunight,
326
                                                        nights=args.nights,
327
                                                        expids=args.expids,
328
                                                        healpix=args.healpix,
329
                                                        survey=args.survey,
330
                                                        program=args.program,
331
                                                        queue=args.queue,
332
                                                        runtime=args.runtime,
333
                                                        batch_opts=args.batch_opts,
334
                                                        timingfile=args.timingfile,
335
                                                        system_name=args.system_name,
336
                                                        no_gpu=args.no_gpu,
337
                                                        max_gpuprocs=args.max_gpuprocs,
338
                                                        cmdline=cmdline)
339

340
            log.info("Generating batch script and exiting.")
341

342
            if not args.nosubmit and not args.dryrun:
343
                err = subprocess.call(['sbatch', scriptfile])
344

345
        ## All ranks need to exit if submitted batch
346
        if comm is not None:
347
            err = comm.bcast(err, root=0)
348

349
        return int(err)
350

351
    exposure_table = None
352
    hpixexp = None
353
    if rank == 0:
354

355
        if groupname != 'healpix' and args.expfiles is not None:
356
            tmp = vstack([Table.read(fn) for fn in args.expfiles])
357
            args.expids = list(tmp['EXPID'])
358
            args.nights = list(tmp['NIGHT'])
359

360
        if groupname == 'healpix':
361
            if args.expfiles is not None:
362
                hpixexp = vstack([Table.read(fn) for fn in args.expfiles])
363
            else:
364
                from desispec.pixgroup import get_exp2healpix_map
365
                hpixexp = get_exp2healpix_map(args.prodexpfile, survey=args.survey, program=args.program)
366

367
            keep = np.isin(hpixexp['HEALPIX'], args.healpix)
368
            hpixexp = hpixexp[keep]
369

370
        elif groupname == 'perexp' and args.nights is not None \
371
                and args.cameras is not None and args.expids is not None:
372
            assert len(args.expids) == 1, "perexp job should only have one exposure"
373
            assert len(args.nights) == 1, "perexp job should only have one night"
374
            exposure_table = Table([Table.Column(name='EXPID', data=args.expids),
375
                                    Table.Column(name='NIGHT', data=args.nights),
376
                                    Table.Column(name='CAMWORD', data=[camword]),
377
                                    Table.Column(name='BADCAMWORD', data=[''])])
378
        else:
379
            if args.nights is not None:
380
                nights = args.nights
381
            elif args.thrunight is None:
382
                ## None will glob for all nights
383
                nights = None
384
            else:
385
                ## Get list of only nights up to date of thrunight
386
                nights = get_nights_up_to_date(args.thrunight)
387

388
            exposure_table = read_minimal_science_exptab_cols(nights=nights,
389
                                                              tileids=[tileid])
390
            if args.expids is not None:
391
                exposure_table = exposure_table[np.isin(exposure_table['EXPID'],
392
                                                        args.expids)]
393
            exposure_table.sort(keys=['EXPID'])
394

395
    ## Should remove, just nice for printouts while performance isn't important
396
    if comm is not None:
397
        comm.barrier()
398
        if groupname == 'healpix':
399
            hpixexp = comm.bcast(hpixexp, root=0)
400
        else:
401
            exposure_table = comm.bcast(exposure_table, root=0)
402

403
    if groupname != 'healpix':
404
        if len(exposure_table) == 0:
405
            msg = f"Didn't find any exposures!"
406
            log.error(msg)
407
            raise ValueError(msg)
408

409
        ## Get night and expid information
410
        expids = np.unique(exposure_table['EXPID'].data)
411
        nights = np.unique(exposure_table['NIGHT'].data)
412
        thrunight = np.max(nights)
413
    else:
414
        expids = None
415
        nights = None
416
        thrunight = None
417

418
    #------------------------------------------------------------------------#
419
    #------------------------ Proceed with running --------------------------#
420
    #------------------------------------------------------------------------#
421

422
    ## Print a summary of what we're going to do
423
    if rank == 0:
424
        log.info('------------------------------')
425
        log.info('Groupname {}'.format(groupname))
426
        if subgroup is not None:
427
            log.info('Subgroup {}'.format(subgroup))
428

429
        if args.healpix is not None:
430
            log.info(f'Healpixels {args.healpix}')
431
        else:
432
            log.info(f'Tileid={tileid} nights={nights} expids={expids}')
433

434
        log.info(f'Supplied camword: {camword}')
435
        log.info('Output root {}'.format(desispec.io.specprod_root()))
436
        if args.run_zmtl:
437
            log.info(f'Will be running zmtl')
438
        if not args.no_afterburners:
439
            log.info(f'Will be running aferburners')
440
        log.info('------------------------------')
441

442
    if comm is not None:
443
        comm.barrier()
444

445
    ## Derive the available spectrographs
446
    if groupname == 'healpix':
447
        all_subgroups = args.healpix
448
    else:
449
        ## Find nights, exposures, and camwords
450
        expnight_dict = dict()
451
        complete_cam_set = set()
452

453
        camword_set = set(decode_camword(camword))
454
        for erow in exposure_table:
455
            key = (erow['EXPID'],erow['NIGHT'])
456
            val = set(decode_camword(difference_camwords(erow['CAMWORD'],
457
                                                         erow['BADCAMWORD'],
458
                                                         suppress_logging=True)))
459
            if camword != 'a0123456789':
460
                val = camword_set.intersection(val)
461

462
            complete_cam_set = complete_cam_set.union(val)
463
            expnight_dict[key] = val
464

465
        all_subgroups = camword_to_spectros(create_camword(list(complete_cam_set)),
466
                                           full_spectros_only=False)
467

468
        if len(all_subgroups) == 0:
469
            msg = f"Didn't find any spectrographs! complete_cam_set={complete_cam_set}"
470
            log.error(msg)
471
            raise ValueError(msg)
472

473
    ## options to be used by findfile for all output files
474
    if groupname == 'healpix':
475
        findfileopts = dict(groupname=groupname, survey=args.survey, faprogram=args.program)
476
    else:
477
        findfileopts = dict(tile=tileid, groupname=groupname, subgroup=subgroup)
478
        if groupname in ('cumulative', 'pernight'):
479
            findfileopts['night'] = thrunight
480
        elif groupname == 'perexp':
481
            assert len(expids) == 1
482
            findfileopts['expid'] = expids[0]
483
        elif subgroup is not None:
484
            findfileopts['subgroup'] = subgroup
485

486
    timer.stop('preflight')
487

488
    #-------------------------------------------------------------------------
489
    ## Do spectral grouping and coadding
490
    timer.start('groupspec')
491

492
    nblocks, block_size, block_rank, block_num = \
493
        distribute_ranks_to_blocks(len(all_subgroups), rank=rank, size=size, log=log)
494

495
    if rank == 0:
496
        if groupname == 'healpix':
497
            for hpix in args.healpix:
498
                findfileopts['healpix'] = hpix
499
                splog = findfile('spectra', spectrograph=0, logfile=True, **findfileopts)
500
                os.makedirs(os.path.dirname(splog), exist_ok=True)
501
        else:
502
            splog = findfile('spectra', spectrograph=0, logfile=True, **findfileopts)
503
            os.makedirs(os.path.dirname(splog), exist_ok=True)
504

505
    if comm is not None:
506
        comm.barrier()
507

508
    if block_rank == 0:
509
        for i in range(block_num, len(all_subgroups), nblocks):
510
            result, success = 0, True
511
            if groupname == 'healpix':
512
                healpix = all_subgroups[i]
513
                log.info(f'Coadding spectra for healpix {healpix}')
514
                findfileopts['healpix'] = healpix
515

516
                cframes = []
517
                ii = hpixexp['HEALPIX'] == healpix
518
                for night, expid, spectro in hpixexp['NIGHT', 'EXPID', 'SPECTRO'][ii]:
519
                    for band in ('b', 'r', 'z'):
520
                        camera = band+str(spectro)
521
                        filename = findfile('cframe', night=night, expid=expid, camera=camera,
522
                                            readonly=True)
523
                        if os.path.exists(filename):
524
                            cframes.append(filename)
525
                        else:
526
                            log.warning(f'Missing {filename}')
527

528
                if len(cframes) < 3:
529
                    log.error(f'healpix {healpix} only has {len(cframes)} cframes; skipping')
530
                    error_count += 1
531
                    continue
532

533
            else:
534
                spectro = all_subgroups[i]
535
                log.info(f'Coadding spectra for spectrograph {spectro}')
536
                findfileopts['spectrograph'] = spectro
537

538
                # generate list of cframes from dict of exposures, nights, and cameras
539
                cframes = []
540
                for (expid, night), cameras in expnight_dict.items():
541
                    for camera in cameras:
542
                        if int(spectro) == int(camera[1]):
543
                            cframes.append(findfile('cframe', night=night,
544
                                                    expid=expid, camera=camera,
545
                                                    readonly=True))
546

547
            spectrafile = findfile('spectra', **findfileopts)
548
            splog = findfile('spectra', logfile=True, **findfileopts)
549
            coaddfile = findfile('coadd', **findfileopts)
550

551
            if os.path.exists(spectrafile):
552
                log.info(f'Spectra file {os.path.basename(spectrafile)} already exists; only running coaddition')
553
                group_func = coadd_spectra.main
554
                inputs = [spectrafile,]
555
                outputs = [coaddfile,]
556
                cmd = f"desi_coadd_spectra -i {spectrafile} -o {coaddfile}"
557
                if groupname != 'healpix':
558
                    cmd += ' --onetile'
559

560
            else:
561
                group_func = group_spectra.main
562
                inputs = cframes
563
                outputs = [spectrafile, coaddfile]
564
                cmd = f"desi_group_spectra --inframes {' '.join(cframes)} " \
565
                      + f"--outfile {spectrafile} " \
566
                      + f"--coaddfile {coaddfile} "
567

568
                if groupname == 'healpix':
569
                    cmd += f"--healpix {healpix} "
570
                    cmd += f"--header SURVEY={args.survey} PROGRAM={args.program} "
571
                else:
572
                    cmd += "--onetile "
573
                    cmd += (f"--header SPGRP={groupname} SPGRPVAL={thrunight} "
574
                            f"NIGHT={thrunight} TILEID={tileid} SPECTRO={spectro} PETAL={spectro} ")
575

576
                    if groupname == 'perexp':
577
                        cmd += f'EXPID={expids[0]} '
578

579
            cmdargs = cmd.split()[1:]
580
            if args.dryrun:
581
                if rank == 0:
582
                    log.info(f"dryrun: Would have run {cmd}")
583
            else:
584
                with stdouterr_redirected(splog):
585
                    result, success = runcmd(group_func,
586
                                             args=cmdargs, inputs=inputs,
587
                                             outputs=outputs)
588

589
            if not success:
590
                log.error(f'desi_group_spectra petal {spectro} failed; see {splog}')
591
                error_count += 1
592

593
    timer.stop('groupspec')
594

595
    if comm is not None:
596
        comm.barrier()
597

598
    if rank == 0:
599
        log.info("Done with spectra")
600

601
    #-------------------------------------------------------------------------
602
    ## Do redshifting
603
    timer.start('redrock')
604
    for subgroup in all_subgroups:
605
        result, success = 0, True
606

607
        if groupname == 'healpix':
608
            findfileopts['healpix'] = subgroup
609
        else:
610
            findfileopts['spectrograph'] = subgroup
611

612
        coaddfile = findfile('coadd', **findfileopts)
613
        rrfile = findfile('redrock', **findfileopts)
614
        rdfile = findfile('rrdetails', **findfileopts)
615
        rmfile = findfile('rrmodel', **findfileopts)
616
        rrlog = findfile('redrock', logfile=True, **findfileopts)
617

618
        cmd = f"rrdesi_mpi -i {coaddfile} -o {rrfile} -d {rdfile} --model {rmfile}"
619
        if not args.no_gpu:
620
            cmd += f' --gpu --max-gpuprocs {args.max_gpuprocs}'
621

622
        cmdargs = cmd.split()[1:]
623
        if args.dryrun:
624
            if rank == 0:
625
                log.info(f"dryrun: Would have run {cmd}")
626
        else:
627
            with stdouterr_redirected(rrlog, comm=comm):
628
                result, success = runcmd(desi.rrdesi, comm=comm, args=cmdargs,
629
                                         inputs=[coaddfile], outputs=[rrfile, rdfile, rmfile])
630

631
        ## Since all ranks running redrock, only count failure/success once
632
        if rank == 0 and not success:
633
            log.error(f'Redrock petal/healpix {subgroup} failed; see {rrlog}')
634
            error_count += 1
635

636
        ## Since all ranks running redrock, ensure we're all moving on to next
637
        ## iteration together
638
        if comm is not None:
639
            comm.barrier()
640

641
    if comm is not None:
642
        comm.barrier()
643

644
    timer.stop('redrock')
645

646
    if rank == 0:
647
        log.info("Done with redrock")
648

649
    #-------------------------------------------------------------------------
650
    ## Do tileqa if a tile (i.e. not for healpix)
651
    timer.start('tileqa')
652

653
    if rank == 0 and groupname in ['pernight', 'cumulative']:
654
        from desispec.scripts import tileqa
655

656
        result, success = 0, True
657
        qafile = findfile('tileqa', **findfileopts)
658
        qapng = findfile('tileqapng', **findfileopts)
659
        qalog = findfile('tileqa', logfile=True, **findfileopts)
660
        ## requires all coadd and redrock outputs in addition to exposureqa
661
        ## note that exposure_table is populated for both pernight and cumulative
662
        ## so we can use it to loop over exposures
663
        infiles = []
664
        for erow in exposure_table:
665
            infiles.append(findfile('exposureqa', expid=erow['EXPID'],
666
                                    night=erow['NIGHT'], readonly=True))
667
        for spectro in all_subgroups:
668
            findfileopts['spectrograph'] = spectro
669
            infiles.append(findfile('coadd', **findfileopts))
670
            infiles.append(findfile('redrock', **findfileopts))
671
        cmd = f"desi_tile_qa -g {groupname} -n {thrunight} -t {tileid}"
672
        cmdargs = cmd.split()[1:]
673
        if args.dryrun:
674
            log.info(f"dryrun: Would have run {cmd} with"
675
                     + f"outputs {qafile}, {qapng}")
676
        else:
677
            with stdouterr_redirected(qalog):
678
                result, success = runcmd(tileqa.main, args=cmdargs,
679
                                         inputs=infiles, outputs=[qafile, qapng])
680

681
            ## count failure/success
682
            if not success:
683
                log.error(f'tileqa failed; see {qalog}')
684
                error_count += 1
685

686
        log.info("Done with tileqa")
687

688
    timer.stop('tileqa')
689

690
    if comm is not None:
691
        comm.barrier()
692

693
    #-------------------------------------------------------------------------
694
    ## Do zmtl if asked to
695
    if args.run_zmtl:
696
        from desispec.scripts import makezmtl
697

698
        timer.start('zmtl')
699
        if block_rank == 0:
700
            for i in range(block_num, len(all_subgroups), nblocks):
701
                result, success = 0, True
702
                if groupname == 'healpix':
703
                    findfileopts['healpix'] = all_subgroups[i]
704
                else:
705
                    findfileopts['spectrograph'] = all_subgroups[i]
706

707
                rrfile = findfile('redrock', **findfileopts)
708
                zmtlfile = findfile('zmtl', **findfileopts)
709
                zmtllog = findfile('zmtl', logfile=True, **findfileopts)
710
                cmd = f"make_zmtl_files --input_file {rrfile} --output_file {zmtlfile}"
711
                cmdargs = cmd.split()[1:]
712
                if args.dryrun:
713
                    if rank == 0:
714
                        log.info(f"dryrun: Would have run {cmd}")
715
                else:
716
                    with stdouterr_redirected(zmtllog):
717
                        result, success = runcmd(makezmtl.main, args=cmdargs,
718
                                                 inputs=[rrfile],
719
                                                 outputs=[zmtlfile])
720
                if not success:
721
                    log.error(f'zmtl petal/healpix {all_subgroups[i]} failed; see {zmtllog}')
722
                    error_count += 1
723

724
        if rank == 0:
725
            log.info("Done with zmtl")
726

727
        timer.stop('zmtl')
728

729
    if comm is not None:
730
        comm.barrier()
731

732
    #-------------------------------------------------------------------------
733
    ## Do afterburners if asked to
734
    if not args.no_afterburners:
735
        from desispec.scripts import qsoqn, qsomgii, emlinefit
736
        """
737
        for SPECTRO in 0 1 2 3 4 5 6 7 8 9; do
738
            coadd=tiles/cumulative/2288/20220918/coadd-$SPECTRO-2288-thru20220918.fits
739
            redrock=tiles/cumulative/2288/20220918/redrock-$SPECTRO-2288-thru20220918.fits
740
            qsomgii=tiles/cumulative/2288/20220918/qso_mgii-$SPECTRO-2288-thru20220918.fits
741
            qsoqn=tiles/cumulative/2288/20220918/qso_qn-$SPECTRO-2288-thru20220918.fits
742
            emfit=tiles/cumulative/2288/20220918/emline-$SPECTRO-2288-thru20220918.fits
743
            qsomgiilog=tiles/cumulative/2288/20220918/logs/qso_mgii-$SPECTRO-2288-thru20220918.log
744
            qsoqnlog=tiles/cumulative/2288/20220918/logs/qso_qn-$SPECTRO-2288-thru20220918.log
745
            emfitlog=tiles/cumulative/2288/20220918/logs/emline-$SPECTRO-2288-thru20220918.log
746
            cmd="srun -N 1 -n 1 -c 64 --cpu-bind=none desi_qso_mgii_afterburner --coadd $coadd --redrock $redrock --output $qsomgii --target_selection all --save_target all"
747
            cmd="srun -N 1 -n 1 -c 64 --cpu-bind=none desi_qso_qn_afterburner --coadd $coadd --redrock $redrock --output $qsoqn --target_selection all --save_target all"
748
            cmd="srun -N 1 -n 1 -c 64 --cpu-bind=none desi_emlinefit_afterburner --coadd $coadd --redrock $redrock --output $emfit"
749
        """
750
        timer.start('afterburners')
751
        nafterburners = 3
752
        nsubgroups = len(all_subgroups)
753
        ntasks = nafterburners * nsubgroups
754

755
        # TODO: for 64//3, this creates 4 blocks, with rank 63/64 being block 3 block_rank==0,
756
        # which ends up running mgii afterburner twice
757
        nblocks, block_size, block_rank, block_num = \
758
            distribute_ranks_to_blocks(ntasks, rank=rank, size=size, log=log)
759

760
        #- Create a subcommunicator with just this rank, e.g. for
761
        #- qsoqn afterburner that needs a communicator to pass to
762
        #- redrock, but is otherwise only a single rank.
763
        if comm is not None:
764
            monocomm = comm.Split(color=comm.rank)
765
        else:
766
            monocomm = None
767

768
        if block_rank == 0:
769
            ## If running mutiple afterburners at once, wait some time so
770
            ## I/O isn't hit all at once
771
            ## afterburner 2 runs with 10s delay, 3 with 20s delay
772
            time.sleep(0.2*block_num)
773
            for i in range(block_num, ntasks, nblocks):
774
                result, success = 0, True
775
                ## If running mutiple afterburners at once, wait some time so
776
                ## I/O isn't hit all at once
777
                ## afterburner 2 runs with 10s delay, 3 with 20s delay
778
                #time.sleep(0.2*i)
779
                subgroup = all_subgroups[i % nsubgroups]
780
                if groupname == 'healpix':
781
                    findfileopts['healpix'] = subgroup
782
                else:
783
                    findfileopts['spectrograph'] = subgroup
784

785
                coaddfile = findfile('coadd', **findfileopts)
786
                rrfile = findfile('redrock', **findfileopts)
787
                ## First set of nsubgroups ranks go to desi_qso_mgii_afterburner
788
                if i // nsubgroups == 0:
789
                    log.info(f"rank {rank}, block_rank {block_rank}, block_num {block_num}, is running spectro/healpix {subgroup} for qso mgii")
790
                    mgiifile = findfile('qso_mgii', **findfileopts)
791
                    mgiilog = findfile('qso_mgii', logfile=True, **findfileopts)
792
                    cmd = f"desi_qso_mgii_afterburner --coadd {coaddfile} " \
793
                          + f"--redrock {rrfile} --output {mgiifile} " \
794
                          + f"--target_selection all --save_target all"
795
                    cmdargs = cmd.split()[1:]
796
                    if args.dryrun:
797
                        if rank == 0:
798
                            log.info(f"dryrun: Would have run {cmd}")
799
                    else:
800
                        with stdouterr_redirected(mgiilog):
801
                            result, success = runcmd(qsomgii.main, args=cmdargs,
802
                                                     inputs=[coaddfile, rrfile],
803
                                                     outputs=[mgiifile])
804

805
                        if not success:
806
                            log.error(f'qsomgii afterburner petal/healpix {subgroup} failed; see {mgiilog}')
807

808
                ## Second set of nsubgroups ranks go to desi_qso_qn_afterburner
809
                elif i // nsubgroups == 1:
810
                    log.info(f"rank {rank}, block_rank {block_rank}, block_num {block_num}, is running spectro/healpix {subgroup} for qso qn")
811
                    qnfile = findfile('qso_qn', **findfileopts)
812
                    qnlog = findfile('qso_qn', logfile=True, **findfileopts)
813
                    cmd = f"desi_qso_qn_afterburner --coadd {coaddfile} " \
814
                          + f"--redrock {rrfile} --output {qnfile} " \
815
                          + f"--target_selection all --save_target all"
816
                    cmdargs = cmd.split()[1:]
817
                    if args.dryrun:
818
                        if rank == 0:
819
                            log.info(f"dryrun: Would have run {cmd}")
820
                    else:
821
                        with stdouterr_redirected(qnlog):
822
                            result, success = runcmd(qsoqn.main, args=cmdargs,
823
                                                     inputs=[coaddfile, rrfile],
824
                                                     outputs=[qnfile], comm=monocomm)
825

826
                        if not success:
827
                            log.error(f'qsoqn afterburner petal/healpix {subgroup} failed; see {qnlog}')
828

829
                ## Third set of nsubgroups ranks go to desi_emlinefit_afterburner
830
                elif i // nsubgroups == 2:
831
                    log.info(f"rank {rank}, block_rank {block_rank}, block_num {block_num}, is running spectro/healpix {subgroup} for emlinefit")
832
                    emfile = findfile('emline', **findfileopts)
833
                    emlog = findfile('emline', logfile=True, **findfileopts)
834
                    cmd = f"desi_emlinefit_afterburner --coadd {coaddfile} " \
835
                          + f"--redrock {rrfile} --output {emfile}"
836
                    cmdargs = cmd.split()[1:]
837
                    if args.dryrun:
838
                        if rank == 0:
839
                            log.info(f"dryrun: Would have run {cmd}")
840
                    else:
841
                        with stdouterr_redirected(emlog):
842
                            result, success = runcmd(emlinefit.main, args=cmdargs,
843
                                                     inputs=[coaddfile, rrfile],
844
                                                     outputs=[emfile])
845

846
                        if not success:
847
                            log.error(f'emlinefit afterburner petal/healpix {subgroup} failed; see {emlog}')
848

849
                ## For now only 3 afterburners, so shout if we loop goes higher than that
850
                else:
851
                    log.error(f"Index i={i} // nsubgroups={nsubgroups} should " \
852
                              + f"be between 0 and {nafterburners-1}!")
853

854
                if not success:
855
                    error_count += 1
856

857
        if rank == 0:
858
            log.info("Done with afterburners")
859

860
        timer.stop('afterburners')
861

862
    if comm is not None:
863
        comm.barrier()
864

865
    #-------------------------------------------------------------------------
866
    ## Collect error count and wrap up
867

868
    if comm is not None:
869
        all_error_counts = comm.gather(error_count, root=0)
870
        if rank == 0:
871
            final_error_count = int(np.sum(all_error_counts))
872
        else:
873
            final_error_count = 0
874
        error_count = comm.bcast(final_error_count, root=0)
875

876
    #- save / print timing information
877
    log_timer(timer, args.timingfile, comm=comm)
878

879
    if rank == 0:
880
        duration_seconds = time.time() - start_time
881
        mm = int(duration_seconds) // 60
882
        ss = int(duration_seconds - mm*60)
883
        goodbye = f'All done at {time.asctime()}; duration {mm}m{ss}s'
884

885
        if error_count > 0:
886
            log.error(f'{error_count} processing errors; see logs above')
887
            log.error(goodbye)
888
        else:
889
            log.info(goodbye)
890

891
    return int(error_count)
892

893

894
def distribute_ranks_to_blocks(nblocks, rank=None, size=None, comm=None,
1✔
895
                               log=None, split_comm=False):
896
    """
897
    Function to split up a set of ranks of size 'size' into nblock number
898
    of blocks or roughly equal size.
899

900
    Args:
901
        nblocks (int): the number of blocks to split the ranks into
902
        rank (int): the MPI world rank
903
        size (int): the number of world MPI ranks
904
        comm (object): MPI communicator
905
        log (object): logger
906
        split_comm (bool): whether to split the world communicator into blocks and return
907
            the block communicator
908

909
    Returns:
910
        tuple: A tuple containing:
911

912
        * nblocks, int: the achievable number of block based on size
913
        * block_size, int: the number of ranks in the assigned block of current rank
914
        * block_rank. int: the rank in the assigned block of the current rank
915
        * block_num, int: the block number (of nblocks blocks) in which the rank
916
          was assigned
917
        * block_comm (optional): if split_comm is true, returns a communicator of
918
          only the ranks in the current block. Splits from
919
          the world communicator
920
    """
921
    if rank is None or size is None:
1✔
922
        if comm is not None:
×
923
            # - Use the provided comm to determine rank and size
924
            rank = comm.rank
×
925
            size = comm.size
×
926
        else:
927
            msg = 'Either rank and size or comm must be defined. '
×
928
            msg += f'Received rank={rank}, size={size}, comm={comm}'
×
929
            if log is None:
×
930
                log = get_logger()
×
931
            log.error(msg)
×
932
            raise ValueError(msg)
×
933

934
    if log is not None and rank == 0:
1✔
935
        log.info(f"Attempting to split MPI ranks of size {size} into " +
×
936
                 f"{nblocks} blocks")
937

938
    if size <= nblocks:
1✔
939
        nblocks = size
1✔
940
        block_size = 1
1✔
941
        block_rank = 0
1✔
942
        block_num = rank
1✔
943
        if split_comm:
1✔
944
            block_comm = comm
×
945
    else:
946
        # nblocks = nblocks
947
        block_num = int(rank / (size/nblocks))
1✔
948
        block_rank = int(rank % (size/nblocks))
1✔
949

950
        # Calculate assignment for all ranks to be able to calculate
951
        # how many other ranks are in this same block
952
        all_ranks = np.arange(size)
1✔
953
        all_block_num = (all_ranks / (size/nblocks)).astype(int)
1✔
954
        assert all_block_num[rank] == block_num
1✔
955
        ii_this_block = all_block_num == block_num
1✔
956
        block_size = np.sum(ii_this_block)
1✔
957

958
        if split_comm:
1✔
959
            if comm is not None:
×
960
                block_comm = comm.Split(block_num, block_rank)
×
961
                assert block_rank == block_comm.Get_rank()
×
962
            else:
963
                block_comm = comm
×
964

965
    if log is not None:
1✔
966
        log.info(f"World rank/size: {rank}/{size} mapped to: Block #{block_num}, " +
×
967
                 f"block_rank/block_size: {block_rank}/{block_size}")
968

969
    if split_comm:
1✔
970
        return nblocks, block_size, block_rank, block_num, block_comm
×
971
    else:
972
        return nblocks, block_size, block_rank, block_num
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc