• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

desihub / desispec / 19150128419

06 Nov 2025 09:17PM UTC coverage: 37.704% (+0.7%) from 37.002%
19150128419

Pull #2521

github

web-flow
Merge 2934f9b9b into 6a90a0547
Pull Request #2521: Add redshift QA scripts

12985 of 34439 relevant lines covered (37.7%)

0.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/py/desispec/scripts/proc.py
1
"""
2
desispec.scripts.proc
3
=====================
4

5
One stop shopping for processing a DESI exposure
6

7
Examples at NERSC::
8

9
    # ARC: 18 min on 2 nodes
10
    time srun -N 2 -n 60 -C haswell -t 25:00 --qos realtime desi_proc --mpi -n 20191029 -e 22486
11

12
    # FLAT: 13 min
13
    time srun -n 20 -N 1 -C haswell -t 15:00 --qos realtime desi_proc --mpi -n 20191029 -e 22487
14

15
    # TWILIGHT: 8min
16
    time srun -n 20 -N 1 -C haswell -t 15:00 --qos realtime desi_proc --mpi -n 20191029 -e 22497
17

18
    # SKY: 11 min
19
    time srun -n 20 -N 1 -C haswell -t 15:00 --qos realtime desi_proc --mpi -n 20191029 -e 22536
20

21
    # ZERO: 2 min
22
    time srun -n 20 -N 1 -C haswell -t 15:00 --qos realtime desi_proc --mpi -n 20191029 -e 22561
23
"""
24

25
import time, datetime
×
26

27
from desispec.workflow.batch_writer import create_desi_proc_batch_script
×
28
from desispec.workflow.timing import log_timer
×
29
start_imports = time.time()
×
30

31
#- enforce a batch-friendly matplotlib backend
32
from desispec.util import set_backend
×
33
set_backend()
×
34

35
import sys, os, argparse, re
×
36
import subprocess
×
37
from copy import deepcopy
×
38

39
import numpy as np
×
40
import fitsio
×
41
from astropy.io import fits
×
42

43
from astropy.table import Table,vstack
×
44

45
import glob
×
46
import desiutil.timer
×
47
import desispec.io
×
48
from desispec.io import findfile, replace_prefix, shorten_filename, get_readonly_filepath
×
49
from desispec.io.util import create_camword, decode_camword, parse_cameras
×
50
from desispec.io.util import validate_badamps, get_tempfilename, relsymlink
×
51
from desispec.calibfinder import findcalibfile,CalibFinder,badfibers
×
52
from desispec.fiberflat import apply_fiberflat
×
53
from desispec.sky import subtract_sky
×
54
from desispec.util import runcmd
×
55
import desispec.scripts.assemble_fibermap
×
56
import desispec.scripts.preproc
×
57
import desispec.scripts.inspect_dark
×
58
import desispec.scripts.trace_shifts
×
59
import desispec.scripts.interpolate_fiber_psf
×
60
import desispec.scripts.extract
×
61
import desispec.scripts.badcolumn_mask
×
62
import desispec.scripts.specex
×
63
import desispec.scripts.fiberflat
×
64
import desispec.scripts.humidity_corrected_fiberflat
×
65
import desispec.scripts.sky
×
66
import desispec.scripts.stdstars
×
67
import desispec.scripts.select_calib_stars
×
68
import desispec.scripts.fluxcalibration
×
69
import desispec.scripts.procexp
×
70
import desispec.scripts.nightly_bias
×
71
import desispec.scripts.fit_cte_night
×
72

73
from desispec.maskbits import ccdmask
×
74
from desispec.gpu import is_gpu_available
×
75

76
from desitarget.targetmask import desi_mask
×
77

78
from desiutil.log import get_logger, DEBUG, INFO
×
79
import desiutil.iers
×
80

81
from desispec.workflow.desi_proc_funcs import assign_mpi, get_desi_proc_parser, \
×
82
    update_args_with_headers, find_most_recent
83
from desispec.workflow.batch import determine_resources
×
84

85
stop_imports = time.time()
×
86

87
#########################################
88
######## Begin Body of the Code #########
89
#########################################
90

91
def parse(options=None):
×
92
    parser = get_desi_proc_parser()
×
93
    args = parser.parse_args(options)
×
94
    return args
×
95

96
def main(args=None, comm=None):
97
    if not isinstance(args, argparse.Namespace):
98
        args = parse(args)
99

100
    log = get_logger()
101
    start_time = time.time()
102
    error_count = 0
103

104
    start_mpi_connect = time.time()
105
    if comm is not None:
106
        #- Use the provided comm to determine rank and size
107
        rank = comm.rank
108
        size = comm.size
109
    else:
110
        #- Check MPI flags and determine the comm, rank, and size given the arguments
111
        comm, rank, size = assign_mpi(do_mpi=args.mpi, do_batch=args.batch, log=log)
112
    stop_mpi_connect = time.time()
113

114
    if rank == 0:
115
        thisfile=os.path.dirname(os.path.abspath(__file__))
116
        thistime=datetime.datetime.fromtimestamp(start_imports).isoformat()
117
        log.info(f'rank 0 started {thisfile} at {thistime}')
118

119
    #- Start timer; only print log messages from rank 0 (others are silent)
120
    timer = desiutil.timer.Timer(silent=(rank>0))
121

122
    #- Fill in timing information for steps before we had the timer created
123
    if args.starttime is not None:
124
        timer.start('startup', starttime=args.starttime)
125
        timer.stop('startup', stoptime=start_imports)
126

127
    timer.start('imports', starttime=start_imports)
128
    timer.stop('imports', stoptime=stop_imports)
129

130
    timer.start('mpi_connect', starttime=start_mpi_connect)
131
    timer.stop('mpi_connect', stoptime=stop_mpi_connect)
132

133
    #- Use GPUs?
134
    if is_gpu_available():
135
        if args.no_gpu:
136
            log.warning("GPUs are available but not using them due to --no-gpu")
137
            use_gpu = False
138
        else:
139
            use_gpu = True
140
    else:
141
        use_gpu = False
142

143
    #- Freeze IERS after parsing args so that it doesn't bother if only --help
144
    timer.start('freeze_iers')
145
    desiutil.iers.freeze_iers()
146
    timer.stop('freeze_iers')
147

148
    #- Preflight checks
149
    timer.start('preflight')
150
    if rank > 0:
151
        #- Let rank 0 fetch these, and then broadcast
152
        args, hdr, camhdr = None, None, None
153
    else:
154
        if ( args.nightlybias or args.nightlycte ) and (args.expid is None) and (args.input is None):
155
            hdr = camhdr = None
156
        else:
157
            args, hdr, camhdr = update_args_with_headers(args)
158

159
    ## Make sure badamps is formatted properly
160
    if comm is not None and rank == 0 and args.badamps is not None:
161
        args.badamps = validate_badamps(args.badamps)
162

163
    if comm is not None:
164
        args = comm.bcast(args, root=0)
165
        hdr = comm.bcast(hdr, root=0)
166
        camhdr = comm.bcast(camhdr, root=0)
167

168
    if args.obstype is not None:
169
        args.obstype = args.obstype.upper()
170

171
    known_obstype = ['SCIENCE', 'ARC', 'FLAT', 'ZERO', 'DARK',
172
        'TESTARC', 'TESTFLAT', 'PIXFLAT', 'SKY', 'TWILIGHT', 'OTHER']
173
    only_nightlybias = args.nightlybias and args.expid is None
174
    if args.obstype not in known_obstype and (not only_nightlybias) and (not args.nightlycte) :
175
        raise ValueError('obstype {} not in {}'.format(args.obstype, known_obstype))
176

177
    if args.expid is None and (not args.nightlybias) and (not args.nightlycte) :
178
        msg = 'Must provide --expid or --nightlybias or --nightlycte'
179
        if rank == 0:
180
            log.critical(msg)
181

182
        sys.exit(1)
183

184
    if isinstance(args.cameras, str):
185
        args.cameras = decode_camword(args.cameras)
186

187
    if only_nightlybias  and args.cameras is None:
188
        args.cameras = decode_camword('a0123456789')
189
    if args.nightlycte and args.cameras is None:
190
        args.cameras = decode_camword('r0123456789z0123456789') # no CTE for blue
191

192
    timer.stop('preflight')
193

194
    #-------------------------------------------------------------------------
195
    #- Create and submit a batch job if requested
196

197
    if args.batch:
198
        if args.nightlycte and args.obstype != 'DARK' and not args.nightlybias:
199
            log.critical("don't know what to do in batch for just nightlycte!")
200
            sys.exit(1)
201

202
        #exp_str = '{:08d}'.format(args.expid)
203
        if args.obstype is not None:
204
            jobdesc = args.obstype.lower()
205
        elif only_nightlybias:
206
            jobdesc = 'nightlybias'
207
        else:
208
            log.critical('No --obstype, but also not just nightlybias ?!?')
209
            sys.exit(1)
210

211
        if args.obstype == 'DARK' and args.nightlybias:
212
            jobdesc = 'ccdcalib'
213

214
        if args.obstype == 'SCIENCE':
215
            # if not doing pre-stdstar fitting or stdstar fitting and if there is
216
            # no flag stopping flux calibration, set job to poststdstar
217
            if args.noprestdstarfit and args.nostdstarfit and (not args.nofluxcalib):
218
                jobdesc = 'poststdstar'
219
            # elif told not to do std or post stdstar but the flag for prestdstar isn't set,
220
            # then perform prestdstar
221
            elif (not args.noprestdstarfit) and args.nostdstarfit and args.nofluxcalib:
222
                jobdesc = 'prestdstar'
223
            #elif (not args.noprestdstarfit) and (not args.nostdstarfit) and (not args.nofluxcalib):
224
            #    jobdesc = 'science'
225
        scriptfile = create_desi_proc_batch_script(night=args.night, exp=args.expid,
226
                                                   cameras=args.cameras,
227
                                                   jobdesc=jobdesc, queue=args.queue,
228
                                                   runtime=args.runtime,
229
                                                   batch_opts=args.batch_opts,
230
                                                   timingfile=args.timingfile,
231
                                                   system_name=args.system_name,
232
                                                   nightlybias=args.nightlybias,
233
                                                   nightlycte=args.nightlycte,
234
                                                   cte_expids=args.cte_expids)
235
        err = 0
236
        if not args.nosubmit:
237
            err = subprocess.call(['sbatch', scriptfile])
238
        sys.exit(err)
239

240
    #-------------------------------------------------------------------------
241
    #- Proceeding with running
242

243
    #- What are we going to do?
244
    if rank == 0:
245
        log.info('----------')
246
        log.info('Input {}'.format(args.input))
247
        log.info('Night {} expid {}'.format(args.night, args.expid))
248
        log.info('Obstype {}'.format(args.obstype))
249
        log.info('Cameras {}'.format(args.cameras))
250
        log.info('Output root {}'.format(desispec.io.specprod_root()))
251
        log.info('----------')
252

253
    #-------------------------------------------------------------------------
254
    #- Create nightly bias from N>>1 ZEROs, but only for B-cameras
255
    if args.nightlybias:
256
        timer.start('nightlybias')
257

258
        camword = create_camword(args.cameras)
259
        cmd = f"desi_compute_nightly_bias -n {args.night} -c {camword}"
260

261
        # if args.bias_expids is not None:
262
        #     cmd += f" -e {args.bias_expids}"
263

264
        if rank == 0:
265
            log.info(f'RUNNING {cmd}')
266

267
        #- Note: nightly_bias may not produce all biasnight files if some
268
        #- are determined to be worse than the default, so check existence
269
        #- of output files separately.
270
        result, success = runcmd(desispec.scripts.nightly_bias.main,
271
                args=cmd.split()[1:], inputs=[], outputs=[], check_return=True, comm=comm)
272

273
        #- check for biasnight or biasnighttest output files
274
        missing_biasnight = 0
275
        if rank == 0:
276
            biasnightfiles = [findfile('biasnight', args.night, camera=cam) for cam in args.cameras]
277
            for filename in biasnightfiles:
278
                if not os.path.exists(filename):
279
                    #- ok for biasnight to be missing if biasnighttest is there
280
                    filename = replace_prefix(filename, 'biasnight', 'biasnighttest')
281
                    if not os.path.exists(filename):
282
                        missing_biasnight += 1
283

284
        if comm is not None:
285
            missing_biasnight = comm.bcast(missing_biasnight, root=0)
286

287
        success &= (missing_biasnight == 0)
288

289
        if not success:
290
            error_count += 1
291

292
        timer.stop('nightlybias')
293

294
    #-------------------------------------------------------------------------
295
    #- Create cte model from several LED exposures
296
    if args.nightlycte:
297

298
        timer.start('nightlycte')
299

300
        camword = create_camword(args.cameras)
301
        cmd = f"desi_fit_cte_night -n {args.night} -c {camword}"
302

303
        if args.cte_expids is not None:
304
            cmd += f" -e {args.cte_expids}"
305

306
        ctecorrnightfile = findfile('ctecorrnight', args.night)
307

308
        if rank == 0:
309
            log.info(f'RUNNING {cmd}')
310

311
        result, success = runcmd(desispec.scripts.fit_cte_night.main,
312
                args=cmd.split()[1:], inputs=[], outputs=[ctecorrnightfile,], comm=comm)
313

314
        if not success:
315
            error_count += 1
316

317
        timer.stop('nightlycte')
318

319
    #- this might be just nightly bias, or nightly cte, with no single exposure to process
320
    if args.expid is None:
321
        if comm is not None:
322
            all_error_counts = comm.gather(error_count, root=0)
323
            if rank == 0:
324
                error_count = int(np.sum(all_error_counts))  # all_error_counts is None on other ranks
325

326
            error_count = comm.bcast(error_count, root=0)
327

328
        if rank == 0:
329
            log.info('No expid given so stopping now')
330
            if error_count > 0:
331
                log.error(f'{error_count} processing errors; see logs above')
332

333
            duration_seconds = time.time() - start_time
334
            mm = int(duration_seconds) // 60
335
            ss = int(duration_seconds - mm*60)
336
            log.info('All done at {}; duration {}m{}s'.format(
337
                time.asctime(), mm, ss))
338

339
        sys.exit(error_count)
340

341

342
    #-------------------------------------------------------------------------
343
    #- Create output directories if needed
344
    if rank == 0:
345
        preprocdir = os.path.dirname(findfile('preproc', args.night, args.expid, 'b0'))
346
        expdir = os.path.dirname(findfile('frame', args.night, args.expid, 'b0'))
347
        os.makedirs(preprocdir, exist_ok=True)
348
        if args.obstype not in ('DARK', 'ZERO'):
349
            os.makedirs(expdir, exist_ok=True)
350

351
    #- Wait for rank 0 to make directories before proceeding
352
    if comm is not None:
353
        comm.barrier()
354

355
    #-------------------------------------------------------------------------
356
    #- Preproc
357
    #- All obstypes get preprocessed
358

359
    timer.start('fibermap')
360

361
    #- Assemble fibermap for science exposures
362
    fibermap = None
363
    fibermap_ok = None
364
    if rank == 0 and args.obstype == 'SCIENCE':
365
        fibermap = findfile('fibermap', args.night, args.expid)
366
        if not os.path.exists(fibermap):
367
            tmp = findfile('preproc', args.night, args.expid, 'b0')
368
            preprocdir = os.path.dirname(tmp)
369
            fibermap = os.path.join(preprocdir, os.path.basename(fibermap))
370

371
            tileid = hdr['TILEID']
372
            # tilepix = os.path.join(preprocdir, f'tilepix-{tileid}.json')
373
            tilepix = findfile('tilepix', args.night, args.expid, tile=tileid)
374

375
            log.info('Creating fibermap {}'.format(fibermap))
376
            # This command isn't actually executed, it only exists to populate
377
            # the equivalent of sys.argv.
378
            cmd = 'desi_assemble_fibermap -n {} -e {} -o {} -t {}'.format(
379
                    args.night, args.expid, fibermap, tilepix)
380
            if args.badamps is not None:
381
                cmd += ' --badamps={}'.format(args.badamps)
382
            cmdargs = cmd.split()[1:]
383
            result, success = runcmd(desispec.scripts.assemble_fibermap.main,
384
                    args=cmdargs, inputs=[], outputs=[fibermap, tilepix])
385

386
            if not success:
387
                error_count += 1
388

389
        fibermap_ok = os.path.exists(fibermap)
390

391
        #- Some commissioning files didn't have coords* files that caused assemble_fibermap to fail
392
        #- these are well known failures with no other solution, so for those, just force creation
393
        #- of a fibermap with null coordinate information
394
        if not fibermap_ok and int(args.night) < 20200310:
395
            log.info("Since night is before 20200310, trying to force fibermap creation without coords file")
396
            cmd += ' --force'
397
            cmdargs = cmd.split()[1:]
398
            result, success = runcmd(desispec.scripts.assemble_fibermap.main,
399
                    args=cmdargs, inputs=[], outputs=[fibermap])
400

401
            fibermap_ok = os.path.exists(fibermap)
402
            if not success or not fibermap_ok:
403
                error_count += 1
404

405
    #- If assemble_fibermap failed and obstype is SCIENCE, exit now
406
    if comm is not None:
407
        fibermap_ok = comm.bcast(fibermap_ok, root=0)
408

409
    if args.obstype == 'SCIENCE' and not fibermap_ok:
410
        sys.stdout.flush()
411
        if rank == 0:
412
            log.critical('desi_assemble_fibermap failed for science exposure; exiting now')
413

414
        sys.exit(13)
415

416
    #- Wait for rank 0 to make fibermap if needed
417
    if comm is not None:
418
        fibermap = comm.bcast(fibermap, root=0)
419

420
    timer.stop('fibermap')
421

422
    if not (args.obstype in ['SCIENCE'] and args.noprestdstarfit):
423
        timer.start('preproc')
424
        for i in range(rank, len(args.cameras), size):
425
            camera = args.cameras[i]
426
            outfile = findfile('preproc', args.night, args.expid, camera)
427
            outdir = os.path.dirname(outfile)
428
            cmd = "desi_preproc -i {} -o {} --outdir {} --cameras {}".format(
429
                args.input, outfile, outdir, camera)
430
            if args.scattered_light :
431
                cmd += " --scattered-light"
432
            if args.obstype in ['SCIENCE'] and camera[0].lower() == "b" and ( not args.no_bkgsub ) :
433
                cmd += " --bkgsub-for-science"
434
            if fibermap is not None:
435
                cmd += " --fibermap {}".format(fibermap)
436
            if not args.obstype in ['ARC'] : # never model variance for arcs
437
                if not args.no_model_pixel_variance and args.obstype != 'DARK' :
438
                    cmd += " --model-variance"
439
            if args.badamps is not None:
440
                cmd += f" --badamps {args.badamps}"
441

442
            inputs = [args.input]
443

444
            #- TBD: require ctecorrnight file here, or allow to be missing?
445
            # if args.obstype not in ('ZERO', 'DARK') and camera[0].lower() != 'b':
446
            #     ctecorrfile = findfile('ctecorrnight', args.night, camera=camera)
447
            #     inputs.append(ctecorrfile)
448

449
            cmdargs = cmd.split()[1:]
450
            result, success = runcmd(desispec.scripts.preproc.main,
451
                    args=cmdargs, inputs=inputs, outputs=[outfile,])
452
            if not success:
453
                error_count += 1
454

455
        timer.stop('preproc')
456
        if comm is not None:
457
            comm.barrier()
458

459
    #-------------------------------------------------------------------------
460
    #- Get input PSFs
461
    timer.start('findpsf')
462
    input_psf = dict()
463
    if rank == 0 and args.obstype not in ['DARK',]:
464
        for camera in args.cameras :
465
            if args.psf is not None :
466
                input_psf[camera] = args.psf
467
            elif args.calibnight is not None :
468
                # look for a psfnight psf for this calib night
469
                psfnightfile = findfile('psfnight', args.calibnight, args.expid, camera, readonly=True)
470
                if not os.path.isfile(psfnightfile) :
471
                    log.error("no {}".format(psfnightfile))
472
                    raise IOError("no {}".format(psfnightfile))
473
                input_psf[camera] = psfnightfile
474
            else :
475
                # look for a psfnight psf
476
                psfnightfile = findfile('psfnight', args.night, args.expid, camera, readonly=True)
477
                if os.path.isfile(psfnightfile) :
478
                    input_psf[camera] = psfnightfile
479
                elif args.most_recent_calib:
480
                    nightfile = find_most_recent(args.night, file_type='psfnight')
481
                    if nightfile is None:
482
                        input_psf[camera] = findcalibfile([hdr, camhdr[camera]], 'PSF')
483
                    else:
484
                        input_psf[camera] = nightfile
485
                else :
486
                    input_psf[camera] = findcalibfile([hdr, camhdr[camera]], 'PSF')
487

488
            input_psf[camera] = get_readonly_filepath(input_psf[camera])
489
            log.info("Will use input PSF : {}".format(input_psf[camera]))
490

491
    if comm is not None:
492
        input_psf = comm.bcast(input_psf, root=0)
493

494
    timer.stop('findpsf')
495

496

497
    #-------------------------------------------------------------------------
498
    #- Dark (to detect bad columns)
499

500
    if args.obstype == 'DARK' :
501

502
        # check exposure time and perform a dark inspection only
503
        # if it is a 300s exposure
504
        exptime = None
505
        if rank == 0 :
506
            rawfilename=findfile('raw', args.night, args.expid, readonly=True)
507
            head=fitsio.read_header(rawfilename,1)
508
            exptime=head["EXPTIME"]
509
            #ics_program=head["PROGRAM"]
510
        if comm is not None :
511
            exptime = comm.bcast(exptime, root=0)
512
            #ics_program = comm.bcast(ics_program, root=0)
513

514
        ## TODO: make this a desi_proc flag rather than selecting on exptime or program
515
        #if 'calib dark' in ics_program.lower():
516
        if np.abs(exptime - 300) < 2.0 or np.abs(exptime - 1200) < 2.0:
517
            timer.start('inspect_dark')
518
            if rank == 0 :
519
                log.info('Starting desi_inspect_dark at {}'.format(time.asctime()))
520

521
            for i in range(rank, len(args.cameras), size):
522
                camera = args.cameras[i]
523
                preprocfile = findfile('preproc', args.night, args.expid, camera, readonly=True)
524
                badcolumnsfile = findfile('badcolumns', night=args.night, camera=camera)
525
                if not os.path.isfile(badcolumnsfile) :
526
                    cmd = "desi_inspect_dark"
527
                    cmd += " -i {}".format(preprocfile)
528
                    cmd += " --badcol-table {}".format(badcolumnsfile)
529
                    cmdargs = cmd.split()[1:]
530
                    result, success = runcmd(desispec.scripts.inspect_dark.main,
531
                            args=cmdargs, inputs=[preprocfile], outputs=[badcolumnsfile])
532

533
                    if not success:
534
                        error_count += 1
535
                else:
536
                    log.info(f'{badcolumnsfile} already exists; skipping desi_inspect_dark')
537

538
            if comm is not None :
539
                comm.barrier()
540

541
            timer.stop('inspect_dark')
542
        elif rank == 0:
543
            log.warning(f'Not running desi_inspect_dark for DARK with EXPTIME={exptime:.1f}')
544

545
    #-------------------------------------------------------------------------
546
    #- Traceshift
547

548
    if ( args.obstype in ['FLAT', 'TESTFLAT', 'SKY', 'TWILIGHT']     )   or \
549
    ( args.obstype in ['SCIENCE'] and (not args.noprestdstarfit) ):
550

551
        timer.start('traceshift')
552

553
        if rank == 0 and args.traceshift :
554
            log.warning('desi_proc option --traceshift is deprecated because this is now the default')
555

556
        if rank == 0 and (not args.no_traceshift) :
557
            log.info('Starting traceshift at {}'.format(time.asctime()))
558

559
        for i in range(rank, len(args.cameras), size):
560
            camera = args.cameras[i]
561
            preprocfile = findfile('preproc', args.night, args.expid, camera, readonly=True)
562
            inpsf  = input_psf[camera]
563
            outpsf = findfile('psf', args.night, args.expid, camera)
564
            if not os.path.isfile(outpsf) :
565
                if (not args.no_traceshift):
566
                    cmd = "desi_compute_trace_shifts"
567
                    cmd += " -i {}".format(preprocfile)
568
                    cmd += " --psf {}".format(inpsf)
569
                    cmd += " --degxx 2 --degxy 0"
570
                    if args.obstype in ['FLAT', 'TESTFLAT', 'TWILIGHT'] :
571
                        cmd += " --continuum --no-large-shift-scan"
572
                    else :
573
                        cmd += " --degyx 2 --degyy 0"
574
                    if args.obstype in ['SCIENCE', 'SKY']:
575
                        cmd += ' --sky'
576
                    cmd += " --outpsf {}".format(outpsf)
577
                    cmdargs = cmd.split()[1:]
578
                    cmd = desispec.scripts.trace_shifts.main
579
                    expandargs = False
580
                else:
581
                    cmdargs = (inpsf, outpsf)
582
                    cmd = relsymlink
583
                    expandargs = True
584

585
                result, success = runcmd(cmd, args=cmdargs, expandargs=expandargs,
586
                        inputs=[preprocfile, inpsf], outputs=[outpsf])
587

588
                if not success:
589
                    error_count += 1
590
            else :
591
                log.info("PSF {} exists".format(outpsf))
592

593
        timer.stop('traceshift')
594
        if comm is not None:
595
            comm.barrier()
596

597
    #-------------------------------------------------------------------------
598
    #- PSF
599
    #- MPI parallelize this step
600

601
    if args.obstype in ['ARC', 'TESTARC']:
602

603
        timer.start('arc_traceshift')
604

605
        if rank == 0:
606
            log.info('Starting traceshift before specex PSF fit at {}'.format(time.asctime()))
607

608
        for i in range(rank, len(args.cameras), size):
609
            camera = args.cameras[i]
610
            preprocfile = findfile('preproc', args.night, args.expid, camera, readonly=True)
611
            inpsf  = input_psf[camera]
612
            outpsf = findfile('psf', args.night, args.expid, camera)
613
            outpsf = replace_prefix(outpsf, "psf", "shifted-input-psf")
614
            if not os.path.isfile(outpsf) :
615
                cmd = "desi_compute_trace_shifts"
616
                cmd += " -i {}".format(preprocfile)
617
                cmd += " --psf {}".format(inpsf)
618
                cmd += " --degxx 0 --degxy 0 --degyx 0 --degyy 0"
619
                cmd += ' --arc-lamps'
620
                cmd += " --outpsf {}".format(outpsf)
621
                cmdargs = cmd.split()[1:]
622
                result, success = runcmd(desispec.scripts.trace_shifts.main,
623
                        args=cmdargs, inputs=[preprocfile, inpsf], outputs=[outpsf])
624
                if not success:
625
                    error_count += 1
626

627
            else :
628
                log.info("PSF {} exists".format(outpsf))
629

630
        timer.stop('arc_traceshift')
631
        if comm is not None:
632
            comm.barrier()
633

634
        timer.start('psf')
635

636
        if rank == 0:
637
            log.info('Starting specex PSF fitting at {}'.format(time.asctime()))
638

639
        if rank > 0:
640
            cmds = inputs = outputs = None
641
        else:
642
            cmds = dict()
643
            inputs = dict()
644
            outputs = dict()
645
            for camera in args.cameras:
646
                preprocfile = findfile('preproc', args.night, args.expid, camera, readonly=True)
647
                tmpname = findfile('psf', args.night, args.expid, camera)
648
                inpsf = get_readonly_filepath(replace_prefix(tmpname,"psf","shifted-input-psf"))
649
                outpsf = replace_prefix(tmpname,"psf","fit-psf")
650

651
                log.info("now run specex psf fit")
652

653
                cmd = 'desi_compute_psf'
654
                cmd += ' --input-image {}'.format(preprocfile)
655
                cmd += ' --input-psf {}'.format(inpsf)
656
                cmd += ' --output-psf {}'.format(outpsf)
657

658
                if args.dont_merge_with_psf_input :
659
                    cmd += ' --dont-merge-with-input'
660

661
                # fibers to ignore for the PSF fit
662
                # specex uses the fiber index in a camera
663
                fibers_to_ignore = badfibers([hdr, camhdr[camera]],["BROKENFIBERS","BADCOLUMNFIBERS"])%500
664
                if fibers_to_ignore.size>0 :
665
                    fibers_to_ignore_str=str(fibers_to_ignore[0])
666
                    for fiber in fibers_to_ignore[1:] :
667
                        fibers_to_ignore_str+=",{}".format(fiber)
668
                    cmd += ' --broken-fibers {}'.format(fibers_to_ignore_str)
669
                    if rank == 0 :
670
                        log.warning('broken fibers: {}'.format(fibers_to_ignore_str))
671

672
                if not os.path.exists(outpsf):
673
                    cmds[camera] = cmd
674
                    inputs[camera] = [preprocfile, inpsf]
675
                    outputs[camera] = [outpsf,]
676

677
        if comm is not None:
678
            cmds = comm.bcast(cmds, root=0)
679
            if len(cmds) > 0:
680
                err = desispec.scripts.specex.run(comm,cmds,args.cameras)
681
                if err != 0:
682
                    error_count += 1
683
        else:
684
            log.warning('fitting PSFs without MPI parallelism; this will be SLOW')
685
            for camera in args.cameras:
686
                if camera in cmds:
687
                    result, success = runcmd(cmds[camera], inputs=inputs[camera], outputs=outputs[camera])
688
                    if not success:
689
                        error_count += 1
690

691
        timer.stop('psf')
692
        if comm is not None:
693
            comm.barrier()
694

695
        # loop on all cameras and interpolate bad fibers
696
        for camera in args.cameras[rank::size]:
697
            t0 = time.time()
698

699
            psfname = findfile('psf', args.night, args.expid, camera)
700
            #- NOTE: not readonly because we need to rename it later
701
            inpsf = replace_prefix(psfname,"psf","fit-psf")
702

703
            #- Check if a noisy amp might have corrupted this PSF;
704
            #- if so, rename to *.badreadnoise
705
            #- Only do this for amps not already pre-flagged as bad
706
            #- Currently the data is flagged per amp (25% of pixels), but do
707
            #- more generic test for 12.5% of pixels (half of one amp)
708
            log.info(f'Rank {rank} checking for noisy input CCD amps')
709
            preprocfile = findfile('preproc', args.night, args.expid, camera, readonly=True)
710
            mask = fitsio.read(preprocfile, 'MASK')
711
            pix_goodamp = (mask & ccdmask.BADAMP) == 0
712
            pix_badnoise = (mask & ccdmask.BADREADNOISE) != 0
713
            noisyfrac = np.sum(pix_badnoise & pix_goodamp) / np.sum(pix_goodamp)
714
            if noisyfrac > 0.25*0.5:
715
                log.error(f"{100*noisyfrac:.0f}% of {camera} input pixels have bad readnoise; don't use this PSF")
716
                if os.path.exists(inpsf):
717
                    os.rename(inpsf, inpsf+'.badreadnoise')
718
                error_count += 1
719
                continue
720

721
            log.info(f'Rank {rank} interpolating {camera} PSF over bad fibers')
722

723
            # fibers to ignore for the PSF fit
724
            # specex uses the fiber index in a camera
725
            fibers_to_ignore = badfibers([hdr, camhdr[camera]],["BROKENFIBERS","BADCOLUMNFIBERS"])%500
726
            if fibers_to_ignore.size>0 :
727
                fibers_to_ignore_str=str(fibers_to_ignore[0])
728
                for fiber in fibers_to_ignore[1:] :
729
                    fibers_to_ignore_str+=",{}".format(fiber)
730

731
                outpsf = replace_prefix(psfname,"psf","fit-psf-fixed-listed")
732
                if os.path.isfile(inpsf) and not os.path.isfile(outpsf):
733
                    cmd = 'desi_interpolate_fiber_psf'
734
                    cmd += ' --infile {}'.format(inpsf)
735
                    cmd += ' --outfile {}'.format(outpsf)
736
                    cmd += ' --fibers {}'.format(fibers_to_ignore_str)
737
                    log.info('For camera {} interpolating PSF for fibers: {}'.format(camera,fibers_to_ignore_str))
738
                    cmdargs = cmd.split()[1:]
739

740
                    result, success = runcmd(desispec.scripts.interpolate_fiber_psf.main,
741
                            args=cmdargs, inputs=[inpsf], outputs=[outpsf])
742

743
                    if not success:
744
                        error_count += 1
745

746
                    if os.path.isfile(outpsf) :
747
                        os.rename(inpsf,inpsf.replace("fit-psf","fit-psf-before-listed-fix"))
748
                        os.system('cp {} {}'.format(outpsf,inpsf))
749

750
            dt = time.time() - t0
751
            log.info(f'Rank {rank} {camera} PSF interpolation took {dt:.1f} sec')
752

753
    #-------------------------------------------------------------------------
754
    #- Extract
755
    #- This is MPI parallel so handle a bit differently
756

757
    # maybe add ARC and TESTARC too
758
    if ( args.obstype in ['FLAT', 'TESTFLAT', 'SKY', 'TWILIGHT']     )   or \
759
    ( args.obstype in ['SCIENCE'] and (not args.noprestdstarfit) ):
760

761
        timer.start('extract')
762
        if rank == 0:
763
            log.info('Starting extractions at {}'.format(time.asctime()))
764

765
        if rank > 0:
766
            cmds = inputs = outputs = None
767
        else:
768
            #- rank 0 collects commands to broadcast to others
769
            cmds = dict()
770
            inputs = dict()
771
            outputs = dict()
772
            for camera in args.cameras:
773
                cmd = 'desi_extract_spectra'
774

775
                #- Based on data from SM1-SM8, looking at central and edge fibers
776
                #- with in mind overlapping arc lamps lines
777
                if camera.startswith('b'):
778
                    cmd += ' -w 3600.0,5800.0,0.8'
779
                elif camera.startswith('r'):
780
                    cmd += ' -w 5760.0,7620.0,0.8'
781
                elif camera.startswith('z'):
782
                    cmd += ' -w 7520.0,9824.0,0.8'
783

784
                preprocfile = findfile('preproc', args.night, args.expid, camera, readonly=True)
785
                psffile = findfile('psf', args.night, args.expid, camera, readonly=True)
786
                finalframefile = findfile('frame', args.night, args.expid, camera)
787
                if os.path.exists(finalframefile):
788
                    log.info('{} already exists; not regenerating'.format(
789
                        os.path.basename(finalframefile)))
790
                    continue
791

792
                #- finalframefile doesn't exist; proceed with command
793
                framefile = finalframefile.replace(".fits","-no-badcolumn-mask.fits")
794
                cmd += ' -i {}'.format(preprocfile)
795
                cmd += ' -p {}'.format(psffile)
796
                cmd += ' -o {}'.format(framefile)
797

798
                #- Larger PSF model uncertainty for the blue cameras because a lower value
799
                #- results in many pixels with specmask.BAD2DFIT on the 5578A sky line.
800
                if camera.startswith('b'):
801
                    cmd += ' --psferr 0.04'
802
                else :
803
                    cmd += ' --psferr 0.01'
804

805
                if args.use_specter:
806
                    cmd += ' --use-specter'
807
                    cmd += ' --mpi'  # gpu_specter is MPI by default, but specter isn't
808

809
                if not use_gpu:
810
                    cmd += ' --no-gpu'
811

812
                if args.obstype == 'SCIENCE' or args.obstype == 'SKY' :
813
                    if not args.no_barycentric_correction :
814
                        log.info('Include barycentric correction')
815
                        cmd += ' --barycentric-correction'
816

817
                missing_inputs = False
818
                for infile in [preprocfile, psffile]:
819
                    if not os.path.exists(infile):
820
                        log.error(f'Missing {infile}')
821
                        missing_inputs = True
822

823
                if missing_inputs:
824
                    log.error(f'Camera {camera} missing inputs; skipping extractions')
825
                    continue
826

827
                if os.path.exists(framefile):
828
                    log.info(f'{framefile} already exists; skipping extraction')
829
                    continue
830

831
                cmds[camera] = cmd
832
                inputs[camera] = [preprocfile, psffile]
833
                outputs[camera] = [framefile,]
834

835
        #- TODO: refactor/combine this with PSF comm splitting logic
836
        if comm is not None:
837
            cmds = comm.bcast(cmds, root=0)
838
            inputs = comm.bcast(inputs, root=0)
839
            outputs = comm.bcast(outputs, root=0)
840

841
            if use_gpu and (not args.use_specter):
842
                import cupy as cp
843
                ngpus = cp.cuda.runtime.getDeviceCount()
844
                if rank == 0 and len(cmds)>0:
845
                    log.info(f"{rank} found {ngpus} gpus")
846

847
            #- Set extraction subcomm group size
848
            extract_subcomm_size = args.extract_subcomm_size
849
            if extract_subcomm_size is None:
850
                if args.use_specter:
851
                    #- CPU extraction with specter uses
852
                    #- 20 ranks.
853
                    extract_subcomm_size = 20
854
                elif use_gpu:
855
                    #- GPU extraction with gpu_specter uses
856
                    #- 5 ranks per GPU plus 2 for IO.
857
                    extract_subcomm_size = 2 + 5 * ngpus
858
                else:
859
                    #- CPU extraction with gpu_specter uses
860
                    #- 16 ranks.
861
                    extract_subcomm_size = 16
862

863
            #- Create list of ranks that will perform extraction
864
            if use_gpu:
865
                #- GPU extraction uses only one extraction group
866
                extract_group      = 0
867
                num_extract_groups = 1
868
            else:
869
                #- CPU extraction uses as many extraction groups as possible
870
                extract_group      = rank // extract_subcomm_size
871
                num_extract_groups = size // extract_subcomm_size
872
            extract_ranks = list(range(num_extract_groups*extract_subcomm_size))
873

874
            #- Create subcomm groups
875
            if use_gpu and len(cmds)>0:
876
                if rank in extract_ranks:
877
                    #- GPU extraction
878
                    extract_incl = comm.group.Incl(extract_ranks)
879
                    comm_extract = comm.Create_group(extract_incl)
880
                    from gpu_specter.mpi import ParallelIOCoordinator
881
                    coordinator = ParallelIOCoordinator(comm_extract)
882
            else:
883
                #- CPU extraction
884
                comm_extract = comm.Split(color=extract_group)
885

886
            if rank in extract_ranks and len(cmds)>0:
887
                #- Run the extractions
888
                for i in range(extract_group, len(args.cameras), num_extract_groups):
889
                    camera = args.cameras[i]
890
                    if camera in cmds:
891
                        cmdargs = cmds[camera].split()[1:]
892
                        extract_args = desispec.scripts.extract.parse(cmdargs)
893

894
                        if comm_extract.rank == 0:
895
                            print('RUNNING: {}'.format(cmds[camera]))
896

897
                        try:
898
                            if args.use_specter:
899
                                #- CPU extraction with specter
900
                                desispec.scripts.extract.main_mpi(extract_args, comm=comm_extract)
901
                            elif use_gpu:
902
                                #- GPU extraction with gpu_specter
903
                                desispec.scripts.extract.main_gpu_specter(extract_args, coordinator=coordinator)
904
                            else:
905
                                #- CPU extraction with gpu_specter
906
                                desispec.scripts.extract.main_gpu_specter(extract_args, comm=comm_extract)
907
                        except Exception as err:
908
                            import traceback
909
                            lines = traceback.format_exception(*sys.exc_info())
910
                            log.error(f"Camera {camera} extraction raised an exception:")
911
                            print("".join(lines))
912
                            error_count += 1
913

914
            elif len(cmds)>0:
915
                #- Skip this rank
916
                log.warning(f'rank {rank} idle during extraction step')
917

918
            comm.barrier()
919

920
        elif len(cmds)>0:
921
            log.warning('running extractions without MPI parallelism; this will be SLOW')
922
            for camera in args.cameras:
923
                if camera in cmds:
924
                    result, success = runcmd(cmds[camera], inputs=inputs[camera], outputs=outputs[camera])
925
                    if not success:
926
                        error_count += 1
927

928
        #- check for missing output files and log
929
        for camera in args.cameras:
930
            if camera in cmds:
931
                for outfile in outputs[camera]:
932
                    if not os.path.exists(outfile):
933
                        if comm is not None:
934
                            if comm.rank > 0:
935
                                continue
936
                        log.error(f'Camera {camera} extraction missing output {outfile}')
937
                        error_count += 1
938

939
        timer.stop('extract')
940
        if comm is not None:
941
            comm.barrier()
942

943
    #-------------------------------------------------------------------------
944
    #- Badcolumn specmask and fibermask
945
    if ( args.obstype in ['FLAT', 'TESTFLAT', 'SKY', 'TWILIGHT']     )   or \
946
       ( args.obstype in ['SCIENCE'] and (not args.noprestdstarfit) ):
947

948
        if rank==0 :
949
            log.info('Starting desi_compute_badcolumn_mask at {}'.format(time.asctime()))
950

951
        for i in range(rank, len(args.cameras), size):
952
            camera     = args.cameras[i]
953
            outfile    = findfile('frame', args.night, args.expid, camera)
954
            #- note: not readonly for "infile" since we'll remove it later
955
            infile     = outfile.replace(".fits","-no-badcolumn-mask.fits")
956
            psffile    = findfile('psf', args.night, args.expid, camera, readonly=True)
957
            badcolfile = findfile('badcolumns', night=args.night, camera=camera, readonly=True)
958
            cmd = "desi_compute_badcolumn_mask -i {} -o {} --psf {} --badcolumns {}".format(
959
                infile, outfile, psffile, badcolfile)
960

961
            if os.path.exists(outfile):
962
                log.info('{} already exists; not (re-)applying bad column mask'.format(os.path.basename(outfile)))
963
                continue
964

965
            if os.path.exists(badcolfile):
966
                cmdargs = cmd.split()[1:]
967

968
                result, success = runcmd(desispec.scripts.badcolumn_mask.main,
969
                        args=cmdargs, inputs=[infile,psffile,badcolfile], outputs=[outfile])
970

971
                if not success:
972
                    error_count += 1
973

974
                #- if successful, remove temporary frame-*-no-badcolumn-mask
975
                if os.path.isfile(outfile) :
976
                    log.info("rm "+infile)
977
                    os.unlink(infile)
978

979
            else:
980
                log.warning(f'Missing {badcolfile}; not applying badcol mask')
981
                log.info(f"mv {infile} {outfile}")
982
                os.rename(infile, outfile)
983

984
        if comm is not None :
985
            comm.barrier()
986

987
    #-------------------------------------------------------------------------
988
    #- Fiberflat
989

990
    if args.obstype in ['FLAT', 'TESTFLAT'] :
991
        exptime = None
992
        if rank == 0 :
993
            rawfilename=findfile('raw', args.night, args.expid, readonly=True)
994
            head=fitsio.read_header(rawfilename,1)
995
            exptime=head["EXPTIME"]
996
        if comm is not None :
997
            exptime = comm.bcast(exptime, root=0)
998

999
        if exptime > 10:
1000
            timer.start('fiberflat')
1001
            if rank == 0:
1002
                log.info('Flat exposure time was greater than 10 seconds')
1003
                log.info('Starting fiberflats at {}'.format(time.asctime()))
1004

1005
            for i in range(rank, len(args.cameras), size):
1006
                camera = args.cameras[i]
1007
                framefile = findfile('frame', args.night, args.expid, camera, readonly=True)
1008
                fiberflatfile = findfile('fiberflat', args.night, args.expid, camera)
1009
                cmd = "desi_compute_fiberflat"
1010
                cmd += " -i {}".format(framefile)
1011
                cmd += " -o {}".format(fiberflatfile)
1012
                cmdargs = cmd.split()[1:]
1013

1014
                result, success = runcmd(desispec.scripts.fiberflat.main,
1015
                        args=cmdargs, inputs=[framefile,], outputs=[fiberflatfile,])
1016

1017
                if not success:
1018
                    error_count += 1
1019

1020
            timer.stop('fiberflat')
1021
            if comm is not None:
1022
                comm.barrier()
1023

1024
    #-------------------------------------------------------------------------
1025
    #- Get input fiberflat
1026
    if args.obstype in ['SCIENCE', 'SKY'] and (not args.nofiberflat):
1027
        timer.start('find_fiberflat')
1028
        input_fiberflat = dict()
1029
        if rank == 0:
1030
            for camera in args.cameras :
1031
                if args.fiberflat is not None :
1032
                    input_fiberflat[camera] = args.fiberflat
1033
                elif args.calibnight is not None :
1034
                    # look for a fiberflatnight for this calib night
1035
                    fiberflatnightfile = findfile('fiberflatnight',
1036
                            args.calibnight, args.expid, camera)
1037
                    if not os.path.isfile(fiberflatnightfile) :
1038
                        log.error("no {}".format(fiberflatnightfile))
1039
                        raise IOError("no {}".format(fiberflatnightfile))
1040
                    input_fiberflat[camera] = fiberflatnightfile
1041
                else :
1042
                    # look for a fiberflatnight fiberflat
1043
                    fiberflatnightfile = findfile('fiberflatnight',
1044
                            args.night, args.expid, camera)
1045
                    if os.path.isfile(fiberflatnightfile) :
1046
                        input_fiberflat[camera] = fiberflatnightfile
1047
                    elif args.most_recent_calib:
1048
                        nightfile = find_most_recent(args.night, file_type='fiberflatnight')
1049
                        if nightfile is None:
1050
                            input_fiberflat[camera] = findcalibfile([hdr, camhdr[camera]], 'FIBERFLAT')
1051
                        else:
1052
                            input_fiberflat[camera] = nightfile
1053
                    else :
1054
                        input_fiberflat[camera] = findcalibfile(
1055
                                [hdr, camhdr[camera]], 'FIBERFLAT')
1056

1057
                input_fiberflat[camera] = get_readonly_filepath(input_fiberflat[camera])
1058
                log.info("Will use input FIBERFLAT: {}".format(input_fiberflat[camera]))
1059

1060
        if comm is not None:
1061
            input_fiberflat = comm.bcast(input_fiberflat, root=0)
1062

1063
        timer.stop('find_fiberflat')
1064

1065
    #-------------------------------------------------------------------------
1066
    #- Fiber flat corrected for humidity
1067
    if args.obstype in ['SCIENCE', 'SKY'] and (not args.noprestdstarfit):
1068

1069
        timer.start('fiberflat_humidity_correction')
1070

1071
        if rank == 0:
1072
            log.info('Flatfield correction for humidity {}'.format(time.asctime()))
1073

1074
        for i in range(rank, len(args.cameras), size):
1075
            camera = args.cameras[i]
1076
            framefile = findfile('frame', args.night, args.expid, camera, readonly=True)
1077
            input_fiberflatfile=input_fiberflat[camera]
1078
            if input_fiberflatfile is None :
1079
                log.error("No input fiberflat for {}".format(camera))
1080
                continue
1081

1082
            # First need a flatfield per exposure
1083
            fiberflatfile = findfile('fiberflatexp', args.night, args.expid, camera)
1084

1085
            cmd = "desi_compute_humidity_corrected_fiberflat"
1086
            cmd += " --use-sky-fibers"
1087
            cmd += " -i {}".format(framefile)
1088
            cmd += " --fiberflat {}".format(input_fiberflatfile)
1089
            cmd += " -o {}".format(fiberflatfile)
1090
            cmdargs = cmd.split()[1:]
1091

1092
            result, success = runcmd(desispec.scripts.humidity_corrected_fiberflat.main,
1093
                    args=cmdargs, inputs=[framefile, input_fiberflatfile], outputs=[fiberflatfile,])
1094

1095
            if not success:
1096
                error_count += 1
1097

1098
        timer.stop('fiberflat_humidity_correction')
1099
        if comm is not None:
1100
            comm.barrier()
1101

1102
    #-------------------------------------------------------------------------
1103
    #- Apply fiberflat and write fframe file
1104

1105
    if args.obstype in ['SCIENCE', 'SKY'] and args.fframe and \
1106
    ( not args.nofiberflat ) and (not args.noprestdstarfit):
1107
        timer.start('apply_fiberflat')
1108
        if rank == 0:
1109
            log.info('Applying fiberflat at {}'.format(time.asctime()))
1110

1111
        for i in range(rank, len(args.cameras), size):
1112
            camera = args.cameras[i]
1113
            fframefile = findfile('fframe', args.night, args.expid, camera)
1114
            if not os.path.exists(fframefile):
1115
                framefile = findfile('frame', args.night, args.expid, camera, readonly=True)
1116
                fr = desispec.io.read_frame(framefile)
1117
                flatfilename = findfile('fiberflatexp', args.night, args.expid, camera, readonly=True)
1118
                ff = desispec.io.read_fiberflat(flatfilename)
1119
                fr.meta['FIBERFLT'] = desispec.io.shorten_filename(flatfilename)
1120
                apply_fiberflat(fr, ff)
1121
                fframefile = findfile('fframe', args.night, args.expid, camera)
1122
                desispec.io.write_frame(fframefile, fr)
1123

1124
        timer.stop('apply_fiberflat')
1125
        if comm is not None:
1126
            comm.barrier()
1127

1128
    #-------------------------------------------------------------------------
1129
    #- Select random sky fibers (inplace update of frame file)
1130
    #- TODO: move this to a function somewhere
1131
    #- TODO: this assigns different sky fibers to each frame of same spectrograph
1132

1133
    if (args.obstype in ['SKY', 'SCIENCE']) and (not args.noskysub) and (not args.noprestdstarfit):
1134
        timer.start('picksky')
1135
        if rank == 0:
1136
            log.info('Picking sky fibers at {}'.format(time.asctime()))
1137

1138
        for i in range(rank, len(args.cameras), size):
1139
            camera = args.cameras[i]
1140
            roframefile = findfile('frame', args.night, args.expid, camera, readonly=True)
1141
            orig_frame = desispec.io.read_frame(roframefile)
1142

1143
            #- Make a copy so that we can apply fiberflat
1144
            fr = deepcopy(orig_frame)
1145

1146
            if np.any(fr.fibermap['OBJTYPE'] == 'SKY'):
1147
                log.info('{} sky fibers already set; skipping'.format(
1148
                    os.path.basename(roframefile)))
1149
                continue
1150

1151
            #- Apply fiberflat then select random fibers below a flux cut
1152
            flatfilename = findfile('fiberflatexp', args.night, args.expid, camera, readonly=True)
1153
            ff = desispec.io.read_fiberflat(flatfilename)
1154
            apply_fiberflat(fr, ff)
1155
            sumflux = np.sum(fr.flux, axis=1)
1156
            fluxcut = np.percentile(sumflux, 30)
1157
            iisky = np.where(sumflux < fluxcut)[0]
1158
            iisky = np.random.choice(iisky, size=100, replace=False)
1159

1160
            #- Update fibermap or original frame and write out
1161
            orig_frame.fibermap['OBJTYPE'][iisky] = 'SKY'
1162
            orig_frame.fibermap['DESI_TARGET'][iisky] |= desi_mask.SKY
1163

1164
            #- Get the writable path to frame file and write it out
1165
            framefile = findfile('frame', args.night, args.expid, camera)
1166
            desispec.io.write_frame(framefile, orig_frame)
1167

1168
        timer.stop('picksky')
1169
        if comm is not None:
1170
            comm.barrier()
1171

1172
    #-------------------------------------------------------------------------
1173
    #- Sky subtraction
1174
    if args.obstype in ['SCIENCE', 'SKY'] and (not args.noskysub ) and (not args.noprestdstarfit):
1175

1176
        timer.start('skysub')
1177
        if rank == 0:
1178
            log.info('Starting sky subtraction at {}'.format(time.asctime()))
1179

1180
        for i in range(rank, len(args.cameras), size):
1181
            camera = args.cameras[i]
1182
            framefile = findfile('frame', args.night, args.expid, camera, readonly=True)
1183
            hdr = fitsio.read_header(framefile, 'FLUX')
1184
            fiberflatfile = findfile('fiberflatexp', args.night, args.expid, camera, readonly=True)
1185
            skyfile = findfile('sky', args.night, args.expid, camera)
1186

1187
            cmd = "desi_compute_sky"
1188
            cmd += " -i {}".format(framefile)
1189
            cmd += " --fiberflat {}".format(fiberflatfile)
1190
            cmd += " -o {}".format(skyfile)
1191
            if args.no_extra_variance :
1192
                cmd += " --no-extra-variance"
1193
            if not args.no_sky_wavelength_adjustment : cmd += " --adjust-wavelength"
1194
            if not args.no_sky_lsf_adjustment : cmd += " --adjust-lsf"
1195
            if (not args.no_sky_wavelength_adjustment) and (not args.no_sky_lsf_adjustment) and args.save_sky_adjustments :
1196
                cmd += " --save-adjustments {}".format(skyfile.replace("sky-","skycorr-"))
1197
            if args.adjust_sky_with_more_fibers :
1198
                cmd += " --adjust-with-more-fibers"
1199
            if (not args.no_sky_wavelength_adjustment) or (not args.no_sky_lsf_adjustment) :
1200
                pca_corr_filename = findcalibfile([hdr, camhdr[camera]], 'SKYCORR')
1201
                if pca_corr_filename is not None :
1202
                    cmd += " --pca-corr {}".format(pca_corr_filename)
1203
                else :
1204
                    log.warning("No SKYCORR file, do you need to update DESI_SPECTRO_CALIB?")
1205
            cmd += " --fit-offsets"
1206
            if not args.no_skygradpca:
1207
                skygradpca_filename = findcalibfile([hdr, camhdr[camera]], 'SKYGRADPCA')
1208
                if skygradpca_filename is not None :
1209
                    cmd += " --skygradpca {}".format(skygradpca_filename)
1210
                else :
1211
                    log.warning("No SKYGRADPCA file, do you need to update DESI_SPECTRO_CALIB?")
1212

1213
            if not args.no_tpcorrparam:
1214
                tpcorrparam_filename = findcalibfile([hdr, camhdr[camera]], 'TPCORRPARAM')
1215
                if tpcorrparam_filename is not None :
1216
                    cmd += " --tpcorrparam {}".format(tpcorrparam_filename)
1217
                else :
1218
                    log.warning("No TPCORRPARAM file, do you need to update DESI_SPECTRO_CALIB?")
1219
            cmdargs = cmd.split()[1:]
1220

1221
            result, success = runcmd(desispec.scripts.sky.main,
1222
                    args=cmdargs, inputs=[framefile, fiberflatfile], outputs=[skyfile,])
1223

1224
            if not success:
1225
                error_count += 1
1226

1227
            #- sframe = flatfielded sky-subtracted but not flux calibrated frame
1228
            #- Note: this re-reads and re-does steps previously done for picking
1229
            #- sky fibers; desi_proc is about human efficiency,
1230
            #- not I/O or CPU efficiency...
1231
            sframefile = desispec.io.findfile('sframe', args.night, args.expid, camera)
1232
            if not os.path.exists(sframefile):
1233
                missing_inputs = False
1234
                for filename in [framefile, fiberflatfile, skyfile]:
1235
                    if not os.path.exists(filename):
1236
                        log.error(f'Camera {camera} missing sframe input {filename}')
1237
                        missing_inputs = True
1238

1239
                if missing_inputs:
1240
                    log.error(f'Camera {camera} missing sframe inputs; skipping')
1241
                    error_count += 1
1242
                else:
1243
                    try:
1244
                        frame = desispec.io.read_frame(framefile)
1245
                        fiberflat = desispec.io.read_fiberflat(fiberflatfile)
1246
                        sky = desispec.io.read_sky(skyfile)
1247
                        apply_fiberflat(frame, fiberflat)
1248
                        subtract_sky(frame, sky, apply_throughput_correction=(
1249
                            args.apply_sky_throughput_correction))
1250
                        frame.meta['IN_SKY'] = shorten_filename(skyfile)
1251
                        frame.meta['FIBERFLT'] = shorten_filename(fiberflatfile)
1252
                        desispec.io.write_frame(sframefile, frame)
1253
                    except Exception as err:
1254
                        import traceback
1255
                        lines = traceback.format_exception(*sys.exc_info())
1256
                        log.error(f"Camera {camera} sframe raised an exception:")
1257
                        print("".join(lines))
1258
                        log.warning(f'Continuing without {sframefile}')
1259
                        error_count += 1
1260

1261
        timer.stop('skysub')
1262
        if comm is not None:
1263
            comm.barrier()
1264

1265
    #-------------------------------------------------------------------------
1266
    #- Standard Star Fitting
1267

1268
    if args.obstype in ['SCIENCE',] and \
1269
            (not args.noskysub ) and \
1270
            (not args.nostdstarfit) :
1271

1272
        timer.start('stdstarfit')
1273
        if rank == 0:
1274
            log.info('Starting flux calibration at {}'.format(time.asctime()))
1275

1276
        #- Group inputs by spectrograph
1277
        framefiles = dict()
1278
        skyfiles = dict()
1279
        fiberflatfiles = dict()
1280
        night, expid = args.night, args.expid #- shorter
1281
        for camera in args.cameras:
1282
            sp = int(camera[1])
1283
            if sp not in framefiles:
1284
                framefiles[sp] = list()
1285
                skyfiles[sp] = list()
1286
                fiberflatfiles[sp] = list()
1287

1288
            framefiles[sp].append(findfile('frame', night, expid, camera, readonly=True))
1289
            skyfiles[sp].append(findfile('sky', night, expid, camera, readonly=True))
1290
            fiberflatfiles[sp].append(findfile('fiberflatexp', night, expid, camera, readonly=True))
1291

1292
        #- Hardcoded stdstar model version
1293
        starmodels = os.path.join(
1294
            os.getenv('DESI_BASIS_TEMPLATES'), 'stdstar_templates_v2.2.fits')
1295

1296
        #- Fit stdstars per spectrograph (not per-camera)
1297
        spectro_nums = sorted(framefiles.keys())
1298

1299
        if args.mpistdstars and comm is not None:
1300
            #- If using MPI parallelism in stdstar fit, divide comm into subcommunicators.
1301
            #- (spectro_start, spectro_step) determine stride pattern over spectro_nums.
1302
            #- Split comm by at most len(spectro_nums)
1303
            num_subcomms = min(size, len(spectro_nums))
1304
            subcomm_index = rank % num_subcomms
1305
            if rank == 0:
1306
                log.info(f"Splitting comm of {size=} into {num_subcomms=} for stdstar fitting")
1307
            subcomm = comm.Split(color=subcomm_index)
1308
            spectro_start, spectro_step = subcomm_index, num_subcomms
1309
        else:
1310
            #- Otherwise, use multiprocessing assuming 1 MPI rank per spectrograph
1311
            spectro_start, spectro_step = rank, size
1312
            subcomm = None
1313

1314
        for i in range(spectro_start, len(spectro_nums), spectro_step):
1315
            sp = spectro_nums[i]
1316

1317
            stdfile = findfile('stdstars', night, expid, spectrograph=sp)
1318
            cmd = "desi_fit_stdstars"
1319
            cmd += " --frames {}".format(' '.join(framefiles[sp]))
1320
            cmd += " --skymodels {}".format(' '.join(skyfiles[sp]))
1321
            cmd += " --fiberflats {}".format(' '.join(fiberflatfiles[sp]))
1322
            cmd += " --starmodels {}".format(starmodels)
1323
            cmd += " --outfile {}".format(stdfile)
1324
            cmd += " --delta-color 0.1"
1325
            if args.maxstdstars is not None:
1326
                cmd += " --maxstdstars {}".format(args.maxstdstars)
1327
            if args.apply_sky_throughput_correction :
1328
                cmd += " --apply-sky-throughput-correction"
1329
            inputs = framefiles[sp] + skyfiles[sp] + fiberflatfiles[sp]
1330
            err = 0
1331
            cmdargs = cmd.split()[1:]
1332

1333
            if subcomm is None:
1334
                #- Using multiprocessing
1335
                log.info(f'Rank {rank=} fitting sp{sp=} stdstars with multiprocessing')
1336
                result, success = runcmd(desispec.scripts.stdstars.main,
1337
                    args=cmdargs, inputs=inputs, outputs=[stdfile])
1338
            else:
1339
                #- Using MPI
1340
                log.info(f'Rank {rank=} fitting sp{sp=} stdstars with mpi')
1341
                result, success = runcmd(desispec.scripts.stdstars.main,
1342
                    args=cmdargs, inputs=inputs, outputs=[stdfile], comm=subcomm)
1343

1344
            if not success:
1345
                log.info(f'Rank {rank=} stdstar failure {err=}')
1346
                error_count += 1
1347

1348
        timer.stop('stdstarfit')
1349
        if comm is not None:
1350
            comm.barrier()
1351

1352
    # -------------------------------------------------------------------------
1353
    # - Flux calibration
1354

1355
    def list2str(xx) :
1356
        """converts list xx to string even if elements aren't strings"""
1357
        return " ".join([str(x) for x in xx])
1358

1359
    if args.obstype in ['SCIENCE'] and \
1360
                (not args.noskysub) and \
1361
                (not args.nofluxcalib):
1362
        timer.start('fluxcalib')
1363

1364
        night, expid = args.night, args.expid #- shorter
1365

1366
        if rank == 0 :
1367
            r_cameras = []
1368
            for camera in args.cameras :
1369
                if camera[0] == 'r' :
1370
                    r_cameras.append(camera)
1371
            if len(r_cameras)>0 :
1372
                outfile    = findfile('calibstars',night, expid)
1373
                frames     = [findfile('frame', night, expid, camera, readonly=True) for camera in r_cameras]
1374
                fiberflats = [findfile('fiberflatexp', night, expid, camera, readonly=True) for camera in r_cameras]
1375
                skys       = [findfile('sky', night, expid, camera, readonly=True) for camera in r_cameras]
1376
                models     = [findfile('stdstars', night, expid,spectrograph=int(camera[1]), readonly=True) for camera in r_cameras]
1377

1378
                inputs = frames + fiberflats + skys + models
1379
                cmd = "desi_select_calib_stars --delta-color-cut 0.1 "
1380
                cmd += " --frames {}".format(list2str(frames))
1381
                cmd += " --fiberflats {}".format(list2str(fiberflats))
1382
                cmd += " --skys {}".format(list2str(skys))
1383
                cmd += " --models {}".format(list2str(models))
1384
                cmd += f" -o {outfile}"
1385
                cmdargs = cmd.split()[1:]
1386
                result, success = runcmd(desispec.scripts.select_calib_stars.main,
1387
                        args=cmdargs, inputs=inputs, outputs=[outfile,])
1388

1389
                if not success:
1390
                    error_count += 1
1391

1392
        if comm is not None:
1393
            comm.barrier()
1394

1395
        #- Compute flux calibration vectors per camera
1396
        for camera in args.cameras[rank::size]:
1397
            framefile = findfile('frame', night, expid, camera, readonly=True)
1398
            skyfile = findfile('sky', night, expid, camera, readonly=True)
1399
            spectrograph = int(camera[1])
1400
            stdfile = findfile('stdstars', night, expid,spectrograph=spectrograph, readonly=True)
1401
            fiberflatfile = findfile('fiberflatexp', night, expid, camera, readonly=True)
1402
            calibfile = findfile('fluxcalib', night, expid, camera)
1403
            calibstars = findfile('calibstars',night, expid)
1404

1405
            cmd = "desi_compute_fluxcalibration"
1406
            cmd += " --infile {}".format(framefile)
1407
            cmd += " --sky {}".format(skyfile)
1408
            cmd += " --fiberflat {}".format(fiberflatfile)
1409
            cmd += " --models {}".format(stdfile)
1410
            cmd += " --outfile {}".format(calibfile)
1411
            cmd += " --selected-calibration-stars {}".format(calibstars)
1412
            if args.apply_sky_throughput_correction :
1413
                cmd += " --apply-sky-throughput-correction"
1414

1415
            inputs = [framefile, skyfile, fiberflatfile, stdfile, calibstars]
1416
            cmdargs = cmd.split()[1:]
1417

1418
            result, success = runcmd(desispec.scripts.fluxcalibration.main,
1419
                    args=cmdargs, inputs=inputs, outputs=[calibfile,])
1420

1421
            if not success:
1422
                error_count += 1
1423

1424
        timer.stop('fluxcalib')
1425
        if comm is not None:
1426
            comm.barrier()
1427

1428
    #-------------------------------------------------------------------------
1429
    #- Applying flux calibration
1430

1431
    if args.obstype in ['SCIENCE',] and (not args.noskysub ) and (not args.nofluxcalib) :
1432

1433
        night, expid = args.night, args.expid #- shorter
1434

1435
        timer.start('applycalib')
1436
        if rank == 0:
1437
            log.info('Starting cframe file creation at {}'.format(time.asctime()))
1438

1439
        for camera in args.cameras[rank::size]:
1440
            framefile = findfile('frame', night, expid, camera, readonly=True)
1441
            fiberflatfile = findfile('fiberflatexp', night, expid, camera, readonly=True)
1442
            skyfile = findfile('sky', night, expid, camera, readonly=True)
1443
            spectrograph = int(camera[1])
1444
            stdfile = findfile('stdstars', night, expid, spectrograph=spectrograph, readonly=True)
1445
            calibfile = findfile('fluxcalib', night, expid, camera, readonly=True)
1446
            cframefile = findfile('cframe', night, expid, camera)
1447

1448
            cmd = "desi_process_exposure"
1449
            cmd += " --infile {}".format(framefile)
1450
            cmd += " --fiberflat {}".format(fiberflatfile)
1451
            cmd += " --sky {}".format(skyfile)
1452
            cmd += " --calib {}".format(calibfile)
1453
            cmd += " --outfile {}".format(cframefile)
1454
            if args.apply_sky_throughput_correction :
1455
                cmd += " --apply-sky-throughput-correction"
1456
            cmd += " --cosmics-nsig 6"
1457
            if args.no_xtalk :
1458
                cmd += " --no-xtalk"
1459

1460
            inputs = [framefile, fiberflatfile, skyfile, calibfile]
1461
            cmdargs = cmd.split()[1:]
1462

1463
            result, success = runcmd(desispec.scripts.procexp.main, args=cmdargs, inputs=inputs, outputs=[cframefile,])
1464

1465
            if not success:
1466
                error_count += 1
1467

1468
        if comm is not None:
1469
            comm.barrier()
1470

1471
        timer.stop('applycalib')
1472

1473
    #-------------------------------------------------------------------------
1474
    #- Exposure QA, using same criterion as fluxcalib for when to run
1475

1476
    if args.obstype in ['SCIENCE',] and (not args.noskysub ) and (not args.nofluxcalib) :
1477
        from desispec.scripts import exposure_qa
1478

1479
        night, expid = args.night, args.expid #- shorter
1480

1481
        timer.start('exposure_qa')
1482
        if rank == 0:
1483
            log.info('Starting exposure_qa at {}'.format(time.asctime()))
1484

1485
        #- exposure QA not yet parallelized for a single exposure
1486
        if rank == 0:
1487
            qa_args = ['-n', str(night), '-e', str(expid), '--nproc', str(1)]
1488
            try:
1489
                exposure_qa.main(exposure_qa.parse(qa_args))
1490
            except Exception as err:
1491
                #- log exceptions, but don't treat QA problems as fatal
1492
                import traceback
1493
                lines = traceback.format_exception(*sys.exc_info())
1494
                log.error(f"exposure_qa raised an exception:")
1495
                print("".join(lines))
1496
                log.warning(f"QA exception not treated as blocking failure")
1497

1498
        #- Make other ranks wait anyway
1499
        if comm is not None:
1500
            comm.barrier()
1501

1502
        timer.stop('exposure_qa')
1503

1504
    #-------------------------------------------------------------------------
1505
    #- Collect error count and wrap up
1506
    if comm is not None:
1507
        all_error_counts = comm.gather(error_count, root=0)
1508
        error_count = int(comm.bcast(np.sum(all_error_counts), root=0))
1509

1510
    #- save / print timing information
1511
    log_timer(timer, args.timingfile, comm=comm)
1512

1513
    if rank == 0:
1514
        duration_seconds = time.time() - start_time
1515
        mm = int(duration_seconds) // 60
1516
        ss = int(duration_seconds - mm*60)
1517
        goodbye = f'All done at {time.asctime()}; duration {mm}m{ss}s'
1518

1519
        if error_count > 0:
1520
            log.error(f'{error_count} processing errors; see logs above')
1521
            log.error(goodbye)
1522
        else:
1523
            log.info(goodbye)
1524

1525
    if error_count > 0:
1526
        sys.exit(int(error_count))
1527
    else:
1528
        return 0
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc