• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

desihub / desispec / 11265301581

10 Oct 2024 12:38AM UTC coverage: 30.084% (-0.1%) from 30.218%
11265301581

Pull #2386

github

segasai
fix variable name
Pull Request #2386: Trace fix improvements

3 of 112 new or added lines in 2 files covered. (2.68%)

434 existing lines in 7 files now uncovered.

14637 of 48654 relevant lines covered (30.08%)

0.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

9.45
/py/desispec/scripts/zproc.py
1
"""
2
desispec.scripts.zproc
3
======================
4

5
One stop shopping for redshifting  DESI spectra
6
"""
7

8
import time
1✔
9
start_imports = time.time()
1✔
10

11
#- python imports
12
import datetime
1✔
13
import sys, os, argparse, re
1✔
14
import subprocess
1✔
15
from copy import deepcopy
1✔
16
import json
1✔
17
import glob
1✔
18

19
#- external 3rd party imports
20
import numpy as np
1✔
21
import fitsio
1✔
22
from astropy.io import fits
1✔
23
from astropy.table import Table,vstack
1✔
24

25
#- external desi imports
26
from redrock.external import desi
1✔
27
import desiutil.timer
1✔
28
from desiutil.log import get_logger, DEBUG, INFO
1✔
29
import desiutil.iers
1✔
30

31
from desispec.io.meta import get_nights_up_to_date
1✔
32
from desispec.workflow.redshifts import create_desi_zproc_batch_script
1✔
33

34
#- internal desispec imports
35
import desispec.io
1✔
36
from desispec.io import findfile, specprod_root, replace_prefix, shorten_filename, get_readonly_filepath
1✔
37
from desispec.io.util import create_camword, decode_camword, parse_cameras, \
1✔
38
    camword_to_spectros, columns_to_goodcamword, difference_camwords
39
from desispec.io.util import validate_badamps, get_tempfilename, backup_filename
1✔
40
from desispec.util import runcmd
1✔
41
from desispec.scripts import group_spectra
1✔
42
from desispec.parallel import stdouterr_redirected
1✔
43
from desispec.workflow import batch
1✔
44
from desispec.workflow.exptable import get_exposure_table_pathname, \
1✔
45
    read_minimal_science_exptab_cols
46
from desispec.workflow.desi_proc_funcs import assign_mpi, update_args_with_headers, log_timer
1✔
47
from desispec.workflow.desi_proc_funcs import determine_resources, create_desi_proc_batch_script
1✔
48

49
stop_imports = time.time()
1✔
50

51
#########################################
52
######## Begin Body of the Code #########
53
#########################################
54
def parse(options=None):
1✔
55
    """
56
    Parses an argparser object for use with desi_zproc and returns arguments
57
    """
58
    parser = argparse.ArgumentParser(usage="{prog} [options]")
×
59

60
    parser.add_argument("-g", "--groupname", type=str,
×
61
                        help="Redshift grouping type: cumulative, perexp, pernight, healpix, or custom name")
62

63
    #- Options for tile-based redshifts
64
    tiles_options = parser.add_argument_group("tile-based options (--groupname perexp, pernight, or cumulative)")
×
65
    tiles_options.add_argument("-t", "--tileid", type=int, default=None,
×
66
                        help="Tile ID")
67
    tiles_options.add_argument("-n", "--nights", type=int, nargs='*', default=None,
×
68
                        help="YEARMMDD night")
69
    tiles_options.add_argument("-e", "--expids", type=int, nargs='*', default=None,
×
70
                        help="Exposure IDs")
71
    tiles_options.add_argument("--thrunight", type=int, default=None,
×
72
                        help="Last night to include (YEARMMDD night) for "
73
                             + "cumulative redshift jobs. Used instead of nights.")
74
    tiles_options.add_argument("-c", "--cameras", type=str,
×
75
                        help="Subset of cameras to process, either as a camword (e.g. a012)" +
76
                             "Or a comma separated list (e.g. b0,r0,z0).")
77
    parser.add_argument("--subgroup", type=str,
×
78
                        help="subgroup to use for non-standard groupname values")
79

80
    #- Options for healpix-based redshifts
81
    healpix_options = parser.add_argument_group("healpix-based options (--groupname healpix)")
×
82
    healpix_options.add_argument("-p", "--healpix", type=int, nargs='*', default=None,
×
83
            help="healpix pixels (nested nside=64")
84
    healpix_options.add_argument("--survey", help="survey name, e.g. main,sv1,sv3")
×
85
    healpix_options.add_argument("--program", help="program name, e.g. dark,bright,backup,other")
×
86
    healpix_options.add_argument("--expfiles", nargs='*',
×
87
                        help="csv files with NIGHT,EXPID,SPECTRO,HEALPIX")
88
    healpix_options.add_argument("--prodexpfile", help="production summary exposure file (using pre-generated --expfiles is more efficient)")
×
89

90
    #- Processing options
91
    processing_options = parser.add_argument_group('processing options')
×
92
    processing_options.add_argument("--mpi", action="store_true",
×
93
                        help="Use MPI parallelism")
94
    processing_options.add_argument("--no-gpu", action="store_true",
×
95
                        help="Don't use gpus")
96
    processing_options.add_argument("--max-gpuprocs", type=int, default=4,
×
97
                        help="Number of GPU prcocesses per node")
98
    processing_options.add_argument("--run-zmtl", action="store_true",
×
99
                        help="Whether to run zmtl or not")
100
    processing_options.add_argument("--no-afterburners", action="store_true",
×
101
                        help="Set if you don't want to run afterburners")
102
    processing_options.add_argument("--starttime", type=float,
×
103
                        help='start time; use "--starttime $(date +%%s)"')
104
    processing_options.add_argument("--timingfile", type=str,
×
105
                        help='save runtime info to this json file; augment if pre-existing')
106
    processing_options.add_argument("-d", "--dryrun", action="store_true",
×
107
                        help="show commands only, do not run")
108

109
    #- Batch submission options
110
    batch_options = parser.add_argument_group("batch queue options")
×
111
    batch_options.add_argument("--batch", action="store_true",
×
112
                        help="Submit a batch job to process this exposure")
113
    batch_options.add_argument("--nosubmit", action="store_true",
×
114
                        help="Create batch script but don't submit")
115
    batch_options.add_argument("-q", "--queue", type=str, default="realtime",
×
116
                        help="batch queue to use (default %(default)s)")
117
    batch_options.add_argument("--batch-opts", type=str, default=None,
×
118
                        help="additional batch commands")
119
    batch_options.add_argument("--batch-reservation", type=str,
×
120
                        help="batch reservation name")
121
    batch_options.add_argument("--batch-dependency", type=str,
×
122
                        help="job dependencies passed to sbatch --dependency")
123
    batch_options.add_argument("--runtime", type=int, default=None,
×
124
                        help="batch runtime in minutes")
125
    batch_options.add_argument("--system-name", type=str,
×
126
                        default=None,
127
                        help='Batch system name (cori-haswell, perlmutter-gpu, ...)')
128

129
    if options is not None:
×
130
        args = parser.parse_args(options)
×
131
    else:
132
        args = parser.parse_args()
×
133

134
    return args
×
135

136

137
def main(args=None, comm=None):
1✔
138
    if not isinstance(args, argparse.Namespace):
×
139
        args = parse(options=args)
×
140

141
    if args.starttime is not None:
×
142
        start_time = args.starttime
×
143
    else:
144
        start_time = time.time()
×
145

146
    log = get_logger()
×
147

148
    start_mpi_connect = time.time()
×
149
    if comm is not None:
×
150
        ## Use the provided comm to determine rank and size
151
        rank = comm.rank
×
152
        size = comm.size
×
153
    else:
154
        ## Check MPI flags and determine the comm, rank, and size given the arguments
155
        comm, rank, size = assign_mpi(do_mpi=args.mpi, do_batch=args.batch, log=log)
×
156
    stop_mpi_connect = time.time()
×
157

158
    #- set default groupname if needed (cumulative for tiles, otherwise healpix)
159
    if args.groupname is None:
×
160
        if args.tileid is not None:
×
161
            args.groupname = 'cumulative'
×
162
        elif args.healpix is not None:
×
163
            args.groupname = 'healpix'
×
164
        else:
165
            msg = 'Must specify --tileid or --healpix'
×
166
            log.critical(msg)
×
167
            raise ValueError(msg)
×
168

169
    #- consistency of options
170
    if args.groupname == 'healpix':
×
171
        assert args.healpix is not None, "--groupname healpix requires setting --healpix too"
×
172
        assert args.nights is None, f"--groupname healpix doesn't use --nights {args.nights}"
×
173
        assert args.expids is None, f"--groupname healpix doesn't use --expids {args.expids}"
×
174
        assert args.thrunight is None, f"--groupname healpix doesn't use --thrunight {args.thrunight}"
×
175
        assert args.cameras is None, f"--groupname healpix doesn't use --cameras {args.cameras}"
×
176
        assert (args.expfiles is None) or (args.prodexpfile is None), \
×
177
                "--groupname healpix use --expfiles OR --prodexpfile but not both"
178
    else:
179
        assert args.tileid is not None, f"--groupname {args.groupname} requires setting --tileid too"
×
180
        if args.cameras is None:
×
181
            args.cameras = 'a0123456789'
×
182

183
    if args.expfiles is not None:
×
184
        if args.nights is not None or args.expids is not None:
×
185
            msg = "use --expfiles OR --nights and --expids, but not both"
×
186
            log.error(msg)
×
187
            raise ValueError(msg)
×
188

189
    today = int(time.strftime('%Y%m%d'))
×
190
    if args.thrunight is not None:
×
191
        if args.groupname not in ['cumulative',]:
×
192
            msg = f"--thrunight only valid for cumulative redshifts."
×
193
            log.error(msg)
×
194
            raise ValueError(msg)
×
195
        #- very early data isn't supported, and future dates aren't supported
196
        #- because that implies data are included that don't yet exist
197
        elif args.thrunight < 20200214 or args.thrunight > today:
×
198
            msg = f"--thrunight must be between 20200214 and today"
×
199
            log.error(msg)
×
200
            raise ValueError(msg)
×
201

202
    if args.expids is not None:
×
203
        if args.nights is None:
×
204
            msg = f"Must specify --nights if specifying --expids."
×
205
            log.error(msg)
×
206
            raise ValueError(msg)
×
207
        else:
208
            if rank == 0:
×
209
                msg = f"Only using exposures specified with --expids {args.expids}"
×
210
                log.info(msg)
×
211

212
    if args.groupname in ['perexp', 'pernight'] and args.nights is not None:
×
213
        if len(args.nights) > 1:
×
214
            msg = f"Only expect one night for groupname {args.groupname}" \
×
215
                  + f" but received nights={args.nights}."
216
            log.error(msg)
×
217
            raise ValueError(msg)
×
218

219
    if (args.groupname == 'healpix') and (args.expfiles is None) and (args.prodexpfile is None):
×
220
        args.prodexpfile = findfile('exposures')
×
221
        if rank == 0:
×
222
            log.info(f'Using default --prodexpfile {args.prodexpfile}')
×
223
        if not os.path.exists(args.prodexpfile):
×
224
            msg = f'Missing {args.prodexpfile}; please create with desi_tsnr_afterburner or specify different --prodexpfile'
×
225
            if rank == 0:
×
226
                log.critical(msg)
×
227

228
            raise ValueError(msg)
×
229

230
    ## Unpack arguments for shorter names (tileid might be None, ok)
231
    tileid, groupname, subgroup = args.tileid, args.groupname, args.subgroup
×
232

233
    known_groups = ['cumulative', 'pernight', 'perexp', 'healpix']
×
234
    if groupname not in known_groups:
×
235
        if subgroup is None:
×
236
            msg = f'Non-standard --groupname={groupname} requires --subgroup'
×
237
            if rank == 0:
×
238
                log.critical(msg)
×
239
            raise ValueError(msg)
×
240
        else:
241
            msg = f'Non-standard {groupname=} not in {known_groups}; using {subgroup=}'
×
242
            if rank == 0:
×
243
                log.warning(msg)
×
244

245
    #- redrock non-MPI mode isn't compatible with GPUs,
246
    #- so if zproc is running in non-MPI mode, force --no-gpu
247
    #- https://github.com/desihub/redrock/issues/223
248
    if (args.mpi == False) and (args.no_gpu == False) and (not args.batch):
×
249
        log.warning("Redrock+GPU currently only works with MPI; since this is non-MPI, forcing --no-gpu")
×
250
        log.warning("See https://github.com/desihub/redrock/issues/223")
×
251
        args.no_gpu = True
×
252

253
    error_count = 0
×
254

255
    if rank == 0:
×
256
        thisfile=os.path.dirname(os.path.abspath(__file__))
×
257
        thistime=datetime.datetime.fromtimestamp(start_imports).isoformat()
×
258
        log.info(f'rank 0 started {thisfile} at {thistime}')
×
259
    ## Start timer; only print log messages from rank 0 (others are silent)
260
    timer = desiutil.timer.Timer(silent=rank>0)
×
261

262
    ## Fill in timing information for steps before we had the timer created
263
    if args.starttime is not None:
×
264
        timer.start('startup', starttime=args.starttime)
×
265
        timer.stop('startup', stoptime=start_imports)
×
266

267
    timer.start('imports', starttime=start_imports)
×
268
    timer.stop('imports', stoptime=stop_imports)
×
269

270
    timer.start('mpi_connect', starttime=start_mpi_connect)
×
271
    timer.stop('mpi_connect', stoptime=stop_mpi_connect)
×
272

273
    ## Freeze IERS after parsing args so that it doesn't bother if only --help
274
    timer.start('freeze_iers')
×
275
    ## Redirect all of the freeze_iers messages to /dev/null
276
    with stdouterr_redirected(comm=comm):
×
277
        desiutil.iers.freeze_iers()
×
278
    if rank == 0:
×
279
        log.info("Froze iers for all ranks")
×
280
    timer.stop('freeze_iers')
×
281

282

283
    timer.start('preflight')
×
284

285
    ## Derive the available cameras
286
    if args.groupname == 'healpix':
×
287
        camword = 'a0123456789'
×
288
    elif isinstance(args.cameras, str):
×
289
        if rank == 0:
×
290
            camword = parse_cameras(args.cameras)
×
291
        else:
292
            camword = parse_cameras(args.cameras, loglevel='ERROR')
×
293
    else:
294
        camword = create_camword(args.cameras)
×
295

296
    if args.batch:
×
297
        err = 0
×
298
        #-------------------------------------------------------------------------
299
        ## Create and submit a batch job if requested
300
        if rank == 0:
×
301
            ## create the batch script
302
            cmdline = list(sys.argv).copy()
×
303
            scriptfile = create_desi_zproc_batch_script(group=groupname,
×
304
                                                        subgroup=subgroup,
305
                                                        tileid=tileid,
306
                                                        cameras=camword,
307
                                                        thrunight=args.thrunight,
308
                                                        nights=args.nights,
309
                                                        expids=args.expids,
310
                                                        healpix=args.healpix,
311
                                                        survey=args.survey,
312
                                                        program=args.program,
313
                                                        queue=args.queue,
314
                                                        runtime=args.runtime,
315
                                                        batch_opts=args.batch_opts,
316
                                                        timingfile=args.timingfile,
317
                                                        system_name=args.system_name,
318
                                                        no_gpu=args.no_gpu,
319
                                                        max_gpuprocs=args.max_gpuprocs,
320
                                                        cmdline=cmdline)
321

322
            log.info("Generating batch script and exiting.")
×
323

324
            if not args.nosubmit and not args.dryrun:
×
325
                err = subprocess.call(['sbatch', scriptfile])
×
326

327
        ## All ranks need to exit if submitted batch
328
        if comm is not None:
×
329
            err = comm.bcast(err, root=0)
×
330
        sys.exit(err)
×
331

332
    exposure_table = None
×
333
    hpixexp = None
×
334
    if rank == 0:
×
335

336
        if groupname != 'healpix' and args.expfiles is not None:
×
337
            tmp = vstack([Table.read(fn) for fn in args.expfiles])
×
338
            args.expids = list(tmp['EXPID'])
×
339
            args.nights = list(tmp['NIGHT'])
×
340

341
        if groupname == 'healpix':
×
342
            if args.expfiles is not None:
×
343
                hpixexp = vstack([Table.read(fn) for fn in args.expfiles])
×
344
            else:
345
                from desispec.pixgroup import get_exp2healpix_map
×
346
                hpixexp = get_exp2healpix_map(args.prodexpfile, survey=args.survey, program=args.program)
×
347

348
            keep = np.isin(hpixexp['HEALPIX'], args.healpix)
×
349
            hpixexp = hpixexp[keep]
×
350

351
        elif groupname == 'perexp' and args.nights is not None \
×
352
                and args.cameras is not None and args.expids is not None:
353
            assert len(args.expids) == 1, "perexp job should only have one exposure"
×
354
            assert len(args.nights) == 1, "perexp job should only have one night"
×
355
            exposure_table = Table([Table.Column(name='EXPID', data=args.expids),
×
356
                                    Table.Column(name='NIGHT', data=args.nights),
357
                                    Table.Column(name='CAMWORD', data=[camword]),
358
                                    Table.Column(name='BADCAMWORD', data=[''])])
359
        else:
360
            if args.nights is not None:
×
361
                nights = args.nights
×
362
            elif args.thrunight is None:
×
363
                ## None will glob for all nights
364
                nights = None
×
365
            else:
366
                ## Get list of only nights up to date of thrunight
367
                nights = get_nights_up_to_date(args.thrunight)
×
368

369
            exposure_table = read_minimal_science_exptab_cols(nights=nights,
×
370
                                                              tileids=[tileid])
371
            if args.expids is not None:
×
372
                exposure_table = exposure_table[np.isin(exposure_table['EXPID'],
×
373
                                                        args.expids)]
374
            exposure_table.sort(keys=['EXPID'])
×
375

376
    ## Should remove, just nice for printouts while performance isn't important
377
    if comm is not None:
×
378
        comm.barrier()
×
379
        if groupname == 'healpix':
×
380
            hpixexp = comm.bcast(hpixexp, root=0)
×
381
        else:
382
            exposure_table = comm.bcast(exposure_table, root=0)
×
383

384
    if groupname != 'healpix':
×
385
        if len(exposure_table) == 0:
×
386
            msg = f"Didn't find any exposures!"
×
387
            log.error(msg)
×
388
            raise ValueError(msg)
×
389

390
        ## Get night and expid information
391
        expids = np.unique(exposure_table['EXPID'].data)
×
392
        nights = np.unique(exposure_table['NIGHT'].data)
×
393
        thrunight = np.max(nights)
×
394
    else:
395
        expids = None
×
396
        nights = None
×
397
        thrunight = None
×
398

399
    #------------------------------------------------------------------------#
400
    #------------------------ Proceed with running --------------------------#
401
    #------------------------------------------------------------------------#
402

403
    ## Print a summary of what we're going to do
404
    if rank == 0:
×
405
        log.info('------------------------------')
×
406
        log.info('Groupname {}'.format(groupname))
×
407
        if subgroup is not None:
×
408
            log.info('Subgroup {}'.format(subgroup))
×
409

410
        if args.healpix is not None:
×
411
            log.info(f'Healpixels {args.healpix}')
×
412
        else:
413
            log.info(f'Tileid={tileid} nights={nights} expids={expids}')
×
414

415
        log.info(f'Supplied camword: {camword}')
×
416
        log.info('Output root {}'.format(desispec.io.specprod_root()))
×
417
        if args.run_zmtl:
×
418
            log.info(f'Will be running zmtl')
×
419
        if not args.no_afterburners:
×
420
            log.info(f'Will be running aferburners')
×
421
        log.info('------------------------------')
×
422

423
    if comm is not None:
×
424
        comm.barrier()
×
425

426
    ## Derive the available spectrographs
427
    if groupname == 'healpix':
×
428
        all_subgroups = args.healpix
×
429
    else:
430
        ## Find nights, exposures, and camwords
431
        expnight_dict = dict()
×
432
        complete_cam_set = set()
×
433

434
        camword_set = set(decode_camword(camword))
×
435
        for erow in exposure_table:
×
436
            key = (erow['EXPID'],erow['NIGHT'])
×
437
            val = set(decode_camword(difference_camwords(erow['CAMWORD'],
×
438
                                                         erow['BADCAMWORD'],
439
                                                         suppress_logging=True)))
440
            if camword != 'a0123456789':
×
441
                val = camword_set.intersection(val)
×
442

443
            complete_cam_set = complete_cam_set.union(val)
×
444
            expnight_dict[key] = val
×
445

446
        all_subgroups = camword_to_spectros(create_camword(list(complete_cam_set)),
×
447
                                           full_spectros_only=False)
448

449
        if len(all_subgroups) == 0:
×
450
            msg = f"Didn't find any spectrographs! complete_cam_set={complete_cam_set}"
×
451
            log.error(msg)
×
452
            raise ValueError(msg)
×
453

454
    ## options to be used by findfile for all output files
455
    if groupname == 'healpix':
×
456
        findfileopts = dict(groupname=groupname, survey=args.survey, faprogram=args.program)
×
457
    else:
458
        findfileopts = dict(tile=tileid, groupname=groupname, subgroup=subgroup)
×
459
        if groupname in ('cumulative', 'pernight'):
×
460
            findfileopts['night'] = thrunight
×
461
        elif groupname == 'perexp':
×
462
            assert len(expids) == 1
×
463
            findfileopts['expid'] = expids[0]
×
464
        elif subgroup is not None:
×
465
            findfileopts['subgroup'] = subgroup
×
466

467
    timer.stop('preflight')
×
468

469
    #-------------------------------------------------------------------------
470
    ## Do spectral grouping and coadding
471
    timer.start('groupspec')
×
472

473
    nblocks, block_size, block_rank, block_num = \
×
474
        distribute_ranks_to_blocks(len(all_subgroups), rank=rank, size=size, log=log)
475

476
    if rank == 0:
×
477
        if groupname == 'healpix':
×
478
            for hpix in args.healpix:
×
479
                findfileopts['healpix'] = hpix
×
480
                splog = findfile('spectra', spectrograph=0, logfile=True, **findfileopts)
×
481
                os.makedirs(os.path.dirname(splog), exist_ok=True)
×
482
        else:
483
            splog = findfile('spectra', spectrograph=0, logfile=True, **findfileopts)
×
484
            os.makedirs(os.path.dirname(splog), exist_ok=True)
×
485

486
    if comm is not None:
×
487
        comm.barrier()
×
488

489
    if block_rank == 0:
×
490
        for i in range(block_num, len(all_subgroups), nblocks):
×
491
            result, success = 0, True
×
492
            if groupname == 'healpix':
×
493
                healpix = all_subgroups[i]
×
494
                log.info(f'Coadding spectra for healpix {healpix}')
×
495
                findfileopts['healpix'] = healpix
×
496

497
                cframes = []
×
498
                ii = hpixexp['HEALPIX'] == healpix
×
499
                for night, expid, spectro in hpixexp['NIGHT', 'EXPID', 'SPECTRO'][ii]:
×
500
                    for band in ('b', 'r', 'z'):
×
501
                        camera = band+str(spectro)
×
502
                        filename = findfile('cframe', night=night, expid=expid, camera=camera,
×
503
                                            readonly=True)
504
                        if os.path.exists(filename):
×
505
                            cframes.append(filename)
×
506
                        else:
507
                            log.warning(f'Missing {filename}')
×
508

509
                if len(cframes) < 3:
×
510
                    log.error(f'healpix {healpix} only has {len(cframes)} cframes; skipping')
×
511
                    error_count += 1
×
512
                    continue
×
513

514
            else:
515
                spectro = all_subgroups[i]
×
516
                log.info(f'Coadding spectra for spectrograph {spectro}')
×
517
                findfileopts['spectrograph'] = spectro
×
518

519
                # generate list of cframes from dict of exposures, nights, and cameras
520
                cframes = []
×
521
                for (expid, night), cameras in expnight_dict.items():
×
522
                    for camera in cameras:
×
523
                        if int(spectro) == int(camera[1]):
×
524
                            cframes.append(findfile('cframe', night=night,
×
525
                                                    expid=expid, camera=camera,
526
                                                    readonly=True))
527

528
            spectrafile = findfile('spectra', **findfileopts)
×
529
            splog = findfile('spectra', logfile=True, **findfileopts)
×
530
            coaddfile = findfile('coadd', **findfileopts)
×
531

532
            cmd = f"desi_group_spectra --inframes {' '.join(cframes)} " \
×
533
                  + f"--outfile {spectrafile} " \
534
                  + f"--coaddfile {coaddfile} "
535

536
            if groupname == 'healpix':
×
537
                cmd += f"--healpix {healpix} "
×
538
                cmd += f"--header SURVEY={args.survey} PROGRAM={args.program} "
×
539
            else:
UNCOV
540
                cmd += "--onetile "
×
UNCOV
541
                cmd += (f"--header SPGRP={groupname} SPGRPVAL={thrunight} "
×
542
                        f"NIGHT={thrunight} TILEID={tileid} SPECTRO={spectro} PETAL={spectro} ")
543

544
                if groupname == 'perexp':
×
545
                    cmd += f'EXPID={expids[0]} '
×
546

UNCOV
547
            cmdargs = cmd.split()[1:]
×
UNCOV
548
            if args.dryrun:
×
549
                if rank == 0:
×
550
                    log.info(f"dryrun: Would have run {cmd}")
×
551
            else:
UNCOV
552
                with stdouterr_redirected(splog):
×
553
                    result, success = runcmd(group_spectra.main,
×
554
                                             args=cmdargs, inputs=cframes,
555
                                             outputs=[spectrafile, coaddfile])
556

557
            if not success:
×
558
                log.error(f'desi_group_spectra petal {spectro} failed; see {splog}')
×
UNCOV
559
                error_count += 1
×
560

561
    timer.stop('groupspec')
×
562

563
    if comm is not None:
×
UNCOV
564
        comm.barrier()
×
565

566
    if rank == 0:
×
UNCOV
567
        log.info("Done with spectra")
×
568

569
    #-------------------------------------------------------------------------
570
    ## Do redshifting
571
    timer.start('redrock')
×
572
    for subgroup in all_subgroups:
×
UNCOV
573
        result, success = 0, True
×
574

UNCOV
575
        if groupname == 'healpix':
×
576
            findfileopts['healpix'] = subgroup
×
577
        else:
UNCOV
578
            findfileopts['spectrograph'] = subgroup
×
579

580
        coaddfile = findfile('coadd', **findfileopts)
×
UNCOV
581
        rrfile = findfile('redrock', **findfileopts)
×
UNCOV
582
        rdfile = findfile('rrdetails', **findfileopts)
×
UNCOV
583
        rmfile = findfile('rrmodel', **findfileopts)
×
584
        rrlog = findfile('redrock', logfile=True, **findfileopts)
×
585

586
        cmd = f"rrdesi_mpi -i {coaddfile} -o {rrfile} -d {rdfile} --model {rmfile}"
×
UNCOV
587
        if not args.no_gpu:
×
588
            cmd += f' --gpu --max-gpuprocs {args.max_gpuprocs}'
×
589

UNCOV
590
        cmdargs = cmd.split()[1:]
×
591
        if args.dryrun:
×
UNCOV
592
            if rank == 0:
×
593
                log.info(f"dryrun: Would have run {cmd}")
×
594
        else:
595
            with stdouterr_redirected(rrlog, comm=comm):
×
596
                result, success = runcmd(desi.rrdesi, comm=comm, args=cmdargs,
×
597
                                         inputs=[coaddfile], outputs=[rrfile, rdfile, rmfile])
598

599
        ## Since all ranks running redrock, only count failure/success once
600
        if rank == 0 and not success:
×
601
            log.error(f'Redrock petal/healpix {subgroup} failed; see {rrlog}')
×
UNCOV
602
            error_count += 1
×
603

604
        ## Since all ranks running redrock, ensure we're all moving on to next
605
        ## iteration together
606
        if comm is not None:
×
UNCOV
607
            comm.barrier()
×
608

609
    if comm is not None:
×
UNCOV
610
        comm.barrier()
×
611

UNCOV
612
    timer.stop('redrock')
×
613

614
    if rank == 0:
×
615
        log.info("Done with redrock")
×
616

617
    #-------------------------------------------------------------------------
618
    ## Do tileqa if a tile (i.e. not for healpix)
619
    timer.start('tileqa')
×
620

UNCOV
621
    if rank == 0 and groupname in ['pernight', 'cumulative']:
×
622
        from desispec.scripts import tileqa
×
623

UNCOV
624
        result, success = 0, True
×
625
        qafile = findfile('tileqa', **findfileopts)
×
UNCOV
626
        qapng = findfile('tileqapng', **findfileopts)
×
627
        qalog = findfile('tileqa', logfile=True, **findfileopts)
×
628
        ## requires all coadd and redrock outputs in addition to exposureqa
629
        ## note that exposure_table is populated for both pernight and cumulative
630
        ## so we can use it to loop over exposures
UNCOV
631
        infiles = []
×
632
        for erow in exposure_table:
×
UNCOV
633
            infiles.append(findfile('exposureqa', expid=erow['EXPID'],
×
634
                                    night=erow['NIGHT'], readonly=True))
635
        for spectro in all_subgroups:
×
UNCOV
636
            findfileopts['spectrograph'] = spectro
×
637
            infiles.append(findfile('coadd', **findfileopts))
×
638
            infiles.append(findfile('redrock', **findfileopts))
×
639
        cmd = f"desi_tile_qa -g {groupname} -n {thrunight} -t {tileid}"
×
640
        cmdargs = cmd.split()[1:]
×
UNCOV
641
        if args.dryrun:
×
UNCOV
642
            log.info(f"dryrun: Would have run {cmd} with"
×
643
                     + f"outputs {qafile}, {qapng}")
644
        else:
645
            with stdouterr_redirected(qalog):
×
646
                result, success = runcmd(tileqa.main, args=cmdargs,
×
647
                                         inputs=infiles, outputs=[qafile, qapng])
648

649
            ## count failure/success
650
            if not success:
×
651
                log.error(f'tileqa failed; see {qalog}')
×
652
                error_count += 1
×
653

654
        log.info("Done with tileqa")
×
655

UNCOV
656
    timer.stop('tileqa')
×
657

658
    if comm is not None:
×
659
        comm.barrier()
×
660

661
    #-------------------------------------------------------------------------
662
    ## Do zmtl if asked to
663
    if args.run_zmtl:
×
664
        from desispec.scripts import makezmtl
×
665

UNCOV
666
        timer.start('zmtl')
×
667
        if block_rank == 0:
×
UNCOV
668
            for i in range(block_num, len(all_subgroups), nblocks):
×
669
                result, success = 0, True
×
UNCOV
670
                if groupname == 'healpix':
×
671
                    findfileopts['healpix'] = all_subgroups[i]
×
672
                else:
UNCOV
673
                    findfileopts['spectrograph'] = all_subgroups[i]
×
674

UNCOV
675
                rrfile = findfile('redrock', **findfileopts)
×
676
                zmtlfile = findfile('zmtl', **findfileopts)
×
677
                zmtllog = findfile('zmtl', logfile=True, **findfileopts)
×
UNCOV
678
                cmd = f"make_zmtl_files --input_file {rrfile} --output_file {zmtlfile}"
×
679
                cmdargs = cmd.split()[1:]
×
680
                if args.dryrun:
×
681
                    if rank == 0:
×
682
                        log.info(f"dryrun: Would have run {cmd}")
×
683
                else:
684
                    with stdouterr_redirected(zmtllog):
×
UNCOV
685
                        result, success = runcmd(makezmtl.main, args=cmdargs,
×
686
                                                 inputs=[rrfile],
687
                                                 outputs=[zmtlfile])
688
                if not success:
×
689
                    log.error(f'zmtl petal/healpix {all_subgroups[i]} failed; see {zmtllog}')
×
690
                    error_count += 1
×
691

692
        if rank == 0:
×
693
            log.info("Done with zmtl")
×
694

695
        timer.stop('zmtl')
×
696

697
    if comm is not None:
×
698
        comm.barrier()
×
699

700
    #-------------------------------------------------------------------------
701
    ## Do afterburners if asked to
702
    if not args.no_afterburners:
×
703
        from desispec.scripts import qsoqn, qsomgii, emlinefit
×
UNCOV
704
        """
×
705
        for SPECTRO in 0 1 2 3 4 5 6 7 8 9; do
706
            coadd=tiles/cumulative/2288/20220918/coadd-$SPECTRO-2288-thru20220918.fits
707
            redrock=tiles/cumulative/2288/20220918/redrock-$SPECTRO-2288-thru20220918.fits
708
            qsomgii=tiles/cumulative/2288/20220918/qso_mgii-$SPECTRO-2288-thru20220918.fits
709
            qsoqn=tiles/cumulative/2288/20220918/qso_qn-$SPECTRO-2288-thru20220918.fits
710
            emfit=tiles/cumulative/2288/20220918/emline-$SPECTRO-2288-thru20220918.fits
711
            qsomgiilog=tiles/cumulative/2288/20220918/logs/qso_mgii-$SPECTRO-2288-thru20220918.log
712
            qsoqnlog=tiles/cumulative/2288/20220918/logs/qso_qn-$SPECTRO-2288-thru20220918.log
713
            emfitlog=tiles/cumulative/2288/20220918/logs/emline-$SPECTRO-2288-thru20220918.log
714
            cmd="srun -N 1 -n 1 -c 64 --cpu-bind=none desi_qso_mgii_afterburner --coadd $coadd --redrock $redrock --output $qsomgii --target_selection all --save_target all"
715
            cmd="srun -N 1 -n 1 -c 64 --cpu-bind=none desi_qso_qn_afterburner --coadd $coadd --redrock $redrock --output $qsoqn --target_selection all --save_target all"
716
            cmd="srun -N 1 -n 1 -c 64 --cpu-bind=none desi_emlinefit_afterburner --coadd $coadd --redrock $redrock --output $emfit"
717
        """
UNCOV
718
        timer.start('afterburners')
×
UNCOV
719
        nafterburners = 3
×
UNCOV
720
        nsubgroups = len(all_subgroups)
×
UNCOV
721
        ntasks = nafterburners * nsubgroups
×
722

723
        # TODO: for 64//3, this creates 4 blocks, with rank 63/64 being block 3 block_rank==0,
724
        # which ends up running mgii afterburner twice
UNCOV
725
        nblocks, block_size, block_rank, block_num = \
×
726
            distribute_ranks_to_blocks(ntasks, rank=rank, size=size, log=log)
727

728
        #- Create a subcommunicator with just this rank, e.g. for
729
        #- qsoqn afterburner that needs a communicator to pass to
730
        #- redrock, but is otherwise only a single rank.
731
        if comm is not None:
×
732
            monocomm = comm.Split(color=comm.rank)
×
733
        else:
734
            monocomm = None
×
735

UNCOV
736
        if block_rank == 0:
×
737
            ## If running mutiple afterburners at once, wait some time so
738
            ## I/O isn't hit all at once
739
            ## afterburner 2 runs with 10s delay, 3 with 20s delay
UNCOV
740
            time.sleep(0.2*block_num)
×
UNCOV
741
            for i in range(block_num, ntasks, nblocks):
×
UNCOV
742
                result, success = 0, True
×
743
                ## If running mutiple afterburners at once, wait some time so
744
                ## I/O isn't hit all at once
745
                ## afterburner 2 runs with 10s delay, 3 with 20s delay
746
                #time.sleep(0.2*i)
747
                subgroup = all_subgroups[i % nsubgroups]
×
UNCOV
748
                if groupname == 'healpix':
×
749
                    findfileopts['healpix'] = subgroup
×
750
                else:
UNCOV
751
                    findfileopts['spectrograph'] = subgroup
×
752

753
                coaddfile = findfile('coadd', **findfileopts)
×
754
                rrfile = findfile('redrock', **findfileopts)
×
755
                ## First set of nsubgroups ranks go to desi_qso_mgii_afterburner
UNCOV
756
                if i // nsubgroups == 0:
×
UNCOV
757
                    log.info(f"rank {rank}, block_rank {block_rank}, block_num {block_num}, is running spectro/healpix {subgroup} for qso mgii")
×
UNCOV
758
                    mgiifile = findfile('qso_mgii', **findfileopts)
×
UNCOV
759
                    mgiilog = findfile('qso_mgii', logfile=True, **findfileopts)
×
760
                    cmd = f"desi_qso_mgii_afterburner --coadd {coaddfile} " \
×
761
                          + f"--redrock {rrfile} --output {mgiifile} " \
762
                          + f"--target_selection all --save_target all"
UNCOV
763
                    cmdargs = cmd.split()[1:]
×
764
                    if args.dryrun:
×
UNCOV
765
                        if rank == 0:
×
766
                            log.info(f"dryrun: Would have run {cmd}")
×
767
                    else:
UNCOV
768
                        with stdouterr_redirected(mgiilog):
×
769
                            result, success = runcmd(qsomgii.main, args=cmdargs,
×
770
                                                     inputs=[coaddfile, rrfile],
771
                                                     outputs=[mgiifile])
772

773
                        if not success:
×
UNCOV
774
                            log.error(f'qsomgii afterburner petal/healpix {subgroup} failed; see {mgiilog}')
×
775

776
                ## Second set of nsubgroups ranks go to desi_qso_qn_afterburner
777
                elif i // nsubgroups == 1:
×
778
                    log.info(f"rank {rank}, block_rank {block_rank}, block_num {block_num}, is running spectro/healpix {subgroup} for qso qn")
×
779
                    qnfile = findfile('qso_qn', **findfileopts)
×
UNCOV
780
                    qnlog = findfile('qso_qn', logfile=True, **findfileopts)
×
781
                    cmd = f"desi_qso_qn_afterburner --coadd {coaddfile} " \
×
782
                          + f"--redrock {rrfile} --output {qnfile} " \
783
                          + f"--target_selection all --save_target all"
UNCOV
784
                    cmdargs = cmd.split()[1:]
×
UNCOV
785
                    if args.dryrun:
×
786
                        if rank == 0:
×
787
                            log.info(f"dryrun: Would have run {cmd}")
×
788
                    else:
UNCOV
789
                        with stdouterr_redirected(qnlog):
×
790
                            result, success = runcmd(qsoqn.main, args=cmdargs,
×
791
                                                     inputs=[coaddfile, rrfile],
792
                                                     outputs=[qnfile], comm=monocomm)
793

794
                        if not success:
×
UNCOV
795
                            log.error(f'qsoqn afterburner petal/healpix {subgroup} failed; see {qnlog}')
×
796

797
                ## Third set of nsubgroups ranks go to desi_emlinefit_afterburner
798
                elif i // nsubgroups == 2:
×
799
                    log.info(f"rank {rank}, block_rank {block_rank}, block_num {block_num}, is running spectro/healpix {subgroup} for emlinefit")
×
800
                    emfile = findfile('emline', **findfileopts)
×
UNCOV
801
                    emlog = findfile('emline', logfile=True, **findfileopts)
×
802
                    cmd = f"desi_emlinefit_afterburner --coadd {coaddfile} " \
×
803
                          + f"--redrock {rrfile} --output {emfile}"
UNCOV
804
                    cmdargs = cmd.split()[1:]
×
UNCOV
805
                    if args.dryrun:
×
UNCOV
806
                        if rank == 0:
×
807
                            log.info(f"dryrun: Would have run {cmd}")
×
808
                    else:
UNCOV
809
                        with stdouterr_redirected(emlog):
×
UNCOV
810
                            result, success = runcmd(emlinefit.main, args=cmdargs,
×
811
                                                     inputs=[coaddfile, rrfile],
812
                                                     outputs=[emfile])
813

814
                        if not success:
×
815
                            log.error(f'emlinefit afterburner petal/healpix {subgroup} failed; see {emlog}')
×
816

817
                ## For now only 3 afterburners, so shout if we loop goes higher than that
818
                else:
819
                    log.error(f"Index i={i} // nsubgroups={nsubgroups} should " \
×
820
                              + f"be between 0 and {nafterburners-1}!")
821

822
                if not success:
×
823
                    error_count += 1
×
824

UNCOV
825
        if rank == 0:
×
UNCOV
826
            log.info("Done with afterburners")
×
827

828
        timer.stop('afterburners')
×
829

UNCOV
830
    if comm is not None:
×
UNCOV
831
        comm.barrier()
×
832

833
    #-------------------------------------------------------------------------
834
    ## Collect error count and wrap up
835

836
    if comm is not None:
×
UNCOV
837
        all_error_counts = comm.gather(error_count, root=0)
×
838
        if rank == 0:
×
839
            final_error_count = int(np.sum(all_error_counts))
×
840
        else:
841
            final_error_count = 0
×
UNCOV
842
        error_count = comm.bcast(final_error_count, root=0)
×
843

844
    #- save / print timing information
UNCOV
845
    log_timer(timer, args.timingfile, comm=comm)
×
846

UNCOV
847
    if rank == 0:
×
UNCOV
848
        duration_seconds = time.time() - start_time
×
849
        mm = int(duration_seconds) // 60
×
850
        ss = int(duration_seconds - mm*60)
×
851
        goodbye = f'All done at {time.asctime()}; duration {mm}m{ss}s'
×
852

UNCOV
853
        if error_count > 0:
×
854
            log.error(f'{error_count} processing errors; see logs above')
×
855
            log.error(goodbye)
×
856
        else:
UNCOV
857
            log.info(goodbye)
×
858

UNCOV
859
    if error_count > 0:
×
860
        sys.exit(int(error_count))
×
861
    else:
862
        return 0
×
863

864

865
def distribute_ranks_to_blocks(nblocks, rank=None, size=None, comm=None,
1✔
866
                               log=None, split_comm=False):
867
    """
868
    Function to split up a set of ranks of size 'size' into nblock number
869
    of blocks or roughly equal size.
870

871
    Args:
872
        nblocks (int): the number of blocks to split the ranks into
873
        rank (int): the MPI world rank
874
        size (int): the number of world MPI ranks
875
        comm (object): MPI communicator
876
        log (object): logger
877
        split_comm (bool): whether to split the world communicator into blocks and return
878
            the block communicator
879

880
    Returns:
881
        tuple: A tuple containing:
882

883
        * nblocks, int: the achievable number of block based on size
884
        * block_size, int: the number of ranks in the assigned block of current rank
885
        * block_rank. int: the rank in the assigned block of the current rank
886
        * block_num, int: the block number (of nblocks blocks) in which the rank
887
          was assigned
888
        * block_comm (optional): if split_comm is true, returns a communicator of
889
          only the ranks in the current block. Splits from
890
          the world communicator
891
    """
892
    if rank is None or size is None:
1✔
UNCOV
893
        if comm is not None:
×
894
            # - Use the provided comm to determine rank and size
UNCOV
895
            rank = comm.rank
×
UNCOV
896
            size = comm.size
×
897
        else:
UNCOV
898
            msg = 'Either rank and size or comm must be defined. '
×
UNCOV
899
            msg += f'Received rank={rank}, size={size}, comm={comm}'
×
UNCOV
900
            if log is None:
×
UNCOV
901
                log = get_logger()
×
UNCOV
902
            log.error(msg)
×
UNCOV
903
            raise ValueError(msg)
×
904

905
    if log is not None and rank == 0:
1✔
906
        log.info(f"Attempting to split MPI ranks of size {size} into " +
×
907
                 f"{nblocks} blocks")
908

909
    if size <= nblocks:
1✔
910
        nblocks = size
1✔
911
        block_size = 1
1✔
912
        block_rank = 0
1✔
913
        block_num = rank
1✔
914
        if split_comm:
1✔
915
            block_comm = comm
×
916
    else:
917
        # nblocks = nblocks
918
        block_num = int(rank / (size/nblocks))
1✔
919
        block_rank = int(rank % (size/nblocks))
1✔
920

921
        # Calculate assignment for all ranks to be able to calculate
922
        # how many other ranks are in this same block
923
        all_ranks = np.arange(size)
1✔
924
        all_block_num = (all_ranks / (size/nblocks)).astype(int)
1✔
925
        assert all_block_num[rank] == block_num
1✔
926
        ii_this_block = all_block_num == block_num
1✔
927
        block_size = np.sum(ii_this_block)
1✔
928

929
        if split_comm:
1✔
UNCOV
930
            if comm is not None:
×
UNCOV
931
                block_comm = comm.Split(block_num, block_rank)
×
UNCOV
932
                assert block_rank == block_comm.Get_rank()
×
933
            else:
UNCOV
934
                block_comm = comm
×
935

936
    if log is not None:
1✔
UNCOV
937
        log.info(f"World rank/size: {rank}/{size} mapped to: Block #{block_num}, " +
×
938
                 f"block_rank/block_size: {block_rank}/{block_size}")
939

940
    if split_comm:
1✔
UNCOV
941
        return nblocks, block_size, block_rank, block_num, block_comm
×
942
    else:
943
        return nblocks, block_size, block_rank, block_num
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc