• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

desihub / desispec / 6934001252

20 Nov 2023 06:03PM UTC coverage: 24.386% (+0.2%) from 24.217%
6934001252

Pull #2141

github

weaverba137
fix band keys
Pull Request #2141: Spectra to specutils

82 of 98 new or added lines in 1 file covered. (83.67%)

24 existing lines in 18 files now uncovered.

10939 of 44858 relevant lines covered (24.39%)

0.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/py/desispec/scripts/pipe.py
1
#
2
# See top-level LICENSE.rst file for Copyright information
3
#
4
# -*- coding: utf-8 -*-
5
"""
×
6
desispec.scripts.pipe
7
=====================
8

9
Interactive control of the pipeline
10
"""
11

12
from __future__ import absolute_import, division, print_function
×
13

14
import sys
×
15
import os
×
16
import argparse
×
17
import re
×
18
import glob
×
19
from collections import OrderedDict
×
20

21
import subprocess
×
22
import numpy as np
×
23

24
from .. import io
×
25

26
from desiutil.log import get_logger
×
27

28
from .. import pipeline as pipe
×
29

30
from ..pipeline import control as control
×
31

32

33
class PipeUI(object):
×
34

35
    def __init__(self):
×
36
        self.pref = "DESI"
×
37

38
        parser = argparse.ArgumentParser(
×
39
            description="DESI pipeline control",
40
            usage="""desi_pipe <command> [options]
41

42
Where supported commands are (use desi_pipe <command> --help for details):
43
   (------- High-Level -------)
44
   create   Create a new production.
45
   go       Run a full production.
46
   update   Update an existing production.
47
   top      Live display of production database.
48
   status   Overview of production.
49
   (------- Mid-Level --------)
50
   chain    Run all ready tasks for multiple pipeline steps.
51
   cleanup  Reset "running" (or optionally "failed") tasks back to "ready".
52
   (------- Low-Level --------)
53
   tasks    Get all possible tasks for a given type and states.
54
   check    Check the status of tasks.
55
   dryrun   Return the equivalent command line entrypoint for tasks.
56
   script   Generate a shell or slurm script.
57
   run      Generate a script and run it.
58
   getready Auto-Update of prod DB.
59
   sync     Synchronize DB state based on the filesystem.
60
   env      Print current production location.
61
   query    Direct sql query to the database.
62

63
""")
64
        parser.add_argument("command", help="Subcommand to run")
×
65
        # parse_args defaults to [1:] for args, but you need to
66
        # exclude the rest of the args too, or validation will fail
67
        args = parser.parse_args(sys.argv[1:2])
×
68
        if not hasattr(self, args.command):
×
69
            print("Unrecognized command")
×
70
            parser.print_help()
×
71
            sys.exit(1)
×
72

73
        # use dispatch pattern to invoke method with same name
74
        getattr(self, args.command)()
×
75

76

77
    def env(self):
×
78
        rawdir = io.rawdata_root()
×
79
        proddir = io.specprod_root()
×
80
        print("{}{:<22} = {}{}{}".format(
×
81
            self.pref, "Raw data directory", control.clr.OKBLUE, rawdir,
82
            control.clr.ENDC)
83
        )
84
        print("{}{:<22} = {}{}{}".format(
×
85
            self.pref, "Production directory", control.clr.OKBLUE, proddir,
86
            control.clr.ENDC)
87
        )
88
        return
×
89

90
    def query(self):
×
91
        parser = argparse.ArgumentParser(\
×
92
            description="Query the DB",
93
                                         usage="desi_pipe query 'sql_command' [--rw] (use --help for details)")
94
        parser.add_argument('cmd', metavar='cmd', type=str,
×
95
                            help="SQL command in quotes, like 'select * from preproc'")
96
        parser.add_argument("--rw", action = "store_true",
×
97
                            help="read/write mode (use with care, experts only). Default is read only")
98
        args = parser.parse_args(sys.argv[2:])
×
99
        dbpath = io.get_pipe_database()
×
100
        if args.rw :
×
101
            mode="w"
×
102
        else :
103
            mode="r"
×
104
        db = pipe.load_db(dbpath, mode=mode)
×
105
        with db.cursor() as cur:
×
106
            cur.execute(args.cmd)
×
107
            st = cur.fetchall()
×
108
            for entry in st :
×
109
                line=""
×
110
                for prop in entry :
×
111
                    line += " {}".format(prop)
×
112
                print(line)
×
113

114
    def create(self):
×
115
        parser = argparse.ArgumentParser(\
×
116
            description="Create a new production",
117
            usage="desi_pipe create [options] (use --help for details)")
118

119
        parser.add_argument("--root", required=False, default=None,
×
120
            help="value to use for DESI_ROOT")
121

122
        parser.add_argument("--data", required=False, default=None,
×
123
            help="value to use for DESI_SPECTRO_DATA")
124

125
        parser.add_argument("--redux", required=False, default=None,
×
126
            help="value to use for DESI_SPECTRO_REDUX")
127

128
        parser.add_argument("--prod", required=False, default=None,
×
129
            help="value to use for SPECPROD")
130

131
        parser.add_argument("--force", action = "store_true",
×
132
            help="force DB creation even if prod exists on disk (useful for simulations")
133

134
        parser.add_argument("--basis", required=False, default=None,
×
135
            help="value to use for DESI_BASIS_TEMPLATES")
136

137
        parser.add_argument("--calib", required=False, default=None,
×
138
            help="value to use for DESI_SPECTRO_CALIB")
139

140
        parser.add_argument("--db-sqlite", required=False, default=False,
×
141
            action="store_true", help="Use SQLite database backend.")
142

143
        parser.add_argument("--db-sqlite-path", type=str, required=False,
×
144
            default=None, help="Override path to SQLite DB")
145

146
        parser.add_argument("--db-postgres", required=False, default=False,
×
147
            action="store_true", help="Use PostgreSQL database backend.  "
148
            "You must correctly configure your ~/.pgpass file!")
149

150
        parser.add_argument("--db-postgres-host", type=str, required=False,
×
151
            default="nerscdb03.nersc.gov", help="Set PostgreSQL hostname")
152

153
        parser.add_argument("--db-postgres-port", type=int, required=False,
×
154
            default=5432, help="Set PostgreSQL port number")
155

156
        parser.add_argument("--db-postgres-name", type=str, required=False,
×
157
            default="desidev", help="Set PostgreSQL DB name")
158

159
        parser.add_argument("--db-postgres-user", type=str, required=False,
×
160
            default="desidev_admin", help="Set PostgreSQL user name")
161

162
        parser.add_argument("--db-postgres-authorized", type=str,
×
163
            required=False, default="desidev_ro",
164
            help="Additional PostgreSQL users / roles to authorize")
165

166
        parser.add_argument("--nside", required=False, type=int, default=64,
×
167
            help="HEALPix nside value to use for spectral grouping.")
168

169
        args = parser.parse_args(sys.argv[2:])
×
170

171
        control.create(
×
172
            root=args.root,
173
            data=args.data,
174
            redux=args.redux,
175
            prod=args.prod,
176
            force=args.force,
177
            basis=args.basis,
178
            calib=args.calib,
179
            db_sqlite=args.db_sqlite,
180
            db_sqlite_path=args.db_sqlite_path,
181
            db_postgres=args.db_postgres,
182
            db_postgres_host=args.db_postgres_host,
183
            db_postgres_port=args.db_postgres_port,
184
            db_postgres_name=args.db_postgres_name,
185
            db_postgres_user=args.db_postgres_user,
186
            db_postgres_authorized=args.db_postgres_authorized,
187
            nside=args.nside)
188

189
        return
×
190

191

192
    def update(self):
×
193
        parser = argparse.ArgumentParser(description="Update a production",
×
194
            usage="desi_pipe update [options] (use --help for details)")
195

196
        parser.add_argument("--nights", required=False, default=None,
×
197
            help="comma separated (YYYYMMDD) or regex pattern- only nights "
198
            "matching these patterns will be examined.")
199

200
        parser.add_argument("--nside", required=False, type=int, default=64,
×
201
            help="HEALPix nside value to use for spectral grouping.")
202

203
        parser.add_argument("--expid", required=False, type=int, default=-1,
×
204
            help="Only update the production for a single exposure ID.")
205

206
        args = parser.parse_args(sys.argv[2:])
×
207

208
        expid = None
×
209
        if args.expid >= 0:
×
210
            expid = args.expid
×
211

212
        control.update(nightstr=args.nights, nside=args.nside,
×
213
            expid=expid)
214

215
        return
×
216

217

218
    def tasks(self):
×
219
        availtypes = ",".join(pipe.tasks.base.default_task_chain)
×
220

221
        parser = argparse.ArgumentParser(description="Get all tasks of a "
×
222
            "particular type for one or more nights",
223
            usage="desi_pipe tasks [options] (use --help for details)")
224

225
        parser.add_argument("--tasktypes", required=False, default=availtypes,
×
226
            help="comma separated list of task types ({})".format(availtypes))
227

228
        parser.add_argument("--nights", required=False, default=None,
×
229
            help="comma separated (YYYYMMDD) or regex pattern- only nights "
230
            "matching these patterns will be examined.")
231

232
        parser.add_argument("--expid", required=False, type=int, default=-1,
×
233
            help="Only select tasks for a single exposure ID.")
234

235
        parser.add_argument("--spec", required=False, type=int, default=-1,
×
236
            help="Only select tasks for a single spectrograph.")
237

238
        parser.add_argument("--states", required=False, default=None,
×
239
            help="comma separated list of states (see defs.py).  Only tasks "
240
            "in these states will be returned.")
241

242
        parser.add_argument("--nosubmitted", required=False, default=False,
×
243
            action="store_true", help="Skip all tasks flagged as submitted")
244

245
        parser.add_argument("--taskfile", required=False, default=None,
×
246
            help="write tasks to this file (if not specified, write to STDOUT)")
247

248
        parser.add_argument("--db-postgres-user", type=str, required=False,
×
249
            default="desidev_ro", help="If using postgres, connect as this "
250
            "user for read-only access")
251

252
        args = parser.parse_args(sys.argv[2:])
×
253

254
        expid = None
×
255
        if args.expid >= 0:
×
256
            expid = args.expid
×
257

258
        spec = None
×
259
        if args.spec >= 0:
×
260
            spec = args.spec
×
261

262
        states = None
×
263
        if args.states is not None:
×
264
            states = args.states.split(",")
×
265

266
        ttypes = None
×
267
        if args.tasktypes is not None:
×
268
            ttypes = args.tasktypes.split(",")
×
269

270
        control.tasks(
×
271
            ttypes,
272
            nightstr=args.nights,
273
            states=states,
274
            expid=expid,
275
            spec=spec,
276
            nosubmitted=args.nosubmitted,
277
            db_postgres_user=args.db_postgres_user,
278
            taskfile=args.taskfile)
279

280
        return
×
281

282

283
    def getready(self):
×
284
        parser = argparse.ArgumentParser(description="Update database to "
×
285
            "for one or more nights to ensure that forward dependencies "
286
            "know that they are ready.",
287
            usage="desi_pipe getready [options] (use --help for details)")
288

289
        parser.add_argument("--nights", required=False, default=None,
×
290
            help="comma separated (YYYYMMDD) or regex pattern- only nights "
291
            "matching these patterns will be examined.")
292

293
        args = parser.parse_args(sys.argv[2:])
×
294

295
        dbpath = io.get_pipe_database()
×
296
        db = pipe.load_db(dbpath, mode="w")
×
297

298
        control.getready(db, nightstr=args.nights)
×
299

300
        return
×
301

302

303
    def check(self):
×
304
        parser = argparse.ArgumentParser(\
×
305
            description="Check the state of pipeline tasks",
306
            usage="desi_pipe check [options] (use --help for details)")
307

308
        parser.add_argument("--taskfile", required=False, default=None,
×
309
            help="read tasks from this file (if not specified, read from "
310
            "STDIN)")
311

312
        parser.add_argument("--nodb", required=False, default=False,
×
313
            action="store_true", help="Do not use the production database.")
314

315
        parser.add_argument("--db-postgres-user", type=str, required=False,
×
316
            default="desidev_ro", help="If using postgres, connect as this "
317
            "user for read-only access")
318

319
        args = parser.parse_args(sys.argv[2:])
×
320

321
        tasks = pipe.prod.task_read(args.taskfile)
×
322

323
        db = None
×
324
        if not args.nodb:
×
325
            dbpath = io.get_pipe_database()
×
326
            db = pipe.load_db(dbpath, mode="r", user=args.db_postgres_user)
×
327

328
        states = control.check_tasks(tasks, db=db)
×
329

330
        for tsk in tasks:
×
331
            print("{} : {}".format(tsk, states[tsk]))
×
332
        sys.stdout.flush()
×
333

334
        return
×
335

336

337
    def sync(self):
×
338
        availtypes = ",".join(pipe.tasks.base.default_task_chain)
×
339

340
        parser = argparse.ArgumentParser(\
×
341
            description="Synchronize DB state based on the filesystem.",
342
            usage="desi_pipe sync [options] (use --help for details)")
343

344
        parser.add_argument("--nights", required=False, default=None,
×
345
            help="comma separated (YYYYMMDD) or regex pattern- only nights "
346
            "matching these patterns will be examined.")
347
        parser.add_argument("--force-spec-done", action="store_true",
×
348
            help="force setting spectra file to state done if file exists independently of state of parent cframes.")
349

350
        args = parser.parse_args(sys.argv[2:])
×
351

352
        dbpath = io.get_pipe_database()
×
353
        db = pipe.load_db(dbpath, mode="w")
×
354

355
        control.sync(db, nightstr=args.nights,specdone=args.force_spec_done)
×
356

357
        return
×
358

359

360
    def cleanup(self):
×
361
        availtypes = ",".join(pipe.tasks.base.default_task_chain)
×
362

363
        parser = argparse.ArgumentParser(\
×
364
            description="Clean up stale task states in the DB",
365
            usage="desi_pipe cleanup [options] (use --help for details)")
366

367
        parser.add_argument("--failed", required=False, default=False,
×
368
            action="store_true", help="Also clear failed states")
369

370
        parser.add_argument("--submitted", required=False, default=False,
×
371
            action="store_true", help="Also clear submitted flag")
372

373
        parser.add_argument("--tasktypes", required=False, default=None,
×
374
            help="comma separated list of task types to clean ({})".format(availtypes))
375

376
        parser.add_argument("--expid", required=False, type=int, default=-1,
×
377
            help="Only clean tasks for this exposure ID.")
378

379
        args = parser.parse_args(sys.argv[2:])
×
380

381
        dbpath = io.get_pipe_database()
×
382
        db = pipe.load_db(dbpath, mode="w")
×
383

384
        ttypes = None
×
385
        if args.tasktypes is not None:
×
386
            ttypes = args.tasktypes.split(",")
×
387

388
        expid = None
×
389
        if args.expid >= 0:
×
390
            expid = args.expid
×
391

392
        control.cleanup(
×
393
            db,
394
            ttypes,
395
            failed=args.failed,
396
            submitted=args.submitted,
397
            expid=expid)
398

399
        return
×
400

401

402
    def _check_nersc_host(self, args):
×
403
        """Modify the --nersc argument based on the environment.
404
        """
405
        if args.shell:
×
406
            # We are forcibly generating shell scripts.
407
            args.nersc = None
×
408
        else:
409
            if args.nersc is None:
×
410
                if "NERSC_HOST" in os.environ:
×
411
                    if os.environ["NERSC_HOST"] == "cori":
×
412
                        args.nersc = "cori-haswell"
×
413
                    else:
414
                        args.nersc = os.environ["NERSC_HOST"]
×
415
        return
×
416

417

418
    def _parse_run_opts(self, parser):
×
419
        """Internal function to parse options for running.
420

421
        This provides a consistent set of run-time otpions for the
422
        "dryrun", "script", and "run" commands.
423

424
        """
425
        parser.add_argument("--nersc", required=False, default=None,
×
426
            help="write a script for this NERSC system (cori-haswell "
427
            "| cori-knl).  Default uses $NERSC_HOST")
428

429
        parser.add_argument("--shell", required=False, default=False,
×
430
            action="store_true",
431
            help="generate normal bash scripts, even if run on a NERSC system")
432

433
        parser.add_argument("--nersc_queue", required=False, default="regular",
×
434
            help="write a script for this NERSC queue (debug | regular)")
435

436
        parser.add_argument("--nersc_maxtime", required=False, type=int,
×
437
            default=0, help="Then maximum run time (in minutes) for a single "
438
            " job.  If the list of tasks cannot be run in this time, multiple "
439
            " job scripts will be written.  Default is the maximum time for "
440
            " the specified queue.")
441

442
        parser.add_argument("--nersc_maxnodes", required=False, type=int,
×
443
            default=0, help="The maximum number of nodes to use.  Default "
444
            " is the maximum nodes for the specified queue.")
445

446
        parser.add_argument("--nersc_shifter", required=False, default=None,
×
447
            help="The shifter image to use for NERSC jobs")
448

449
        parser.add_argument("--mpi_procs", required=False, type=int, default=1,
×
450
            help="The number of MPI processes to use for non-NERSC shell "
451
            "scripts (default 1)")
452

453
        parser.add_argument("--mpi_run", required=False, type=str,
×
454
            default="", help="The command to launch MPI programs "
455
            "for non-NERSC shell scripts (default do not use MPI)")
456

457
        parser.add_argument("--procs_per_node", required=False, type=int,
×
458
            default=0, help="The number of processes to use per node.  If not "
459
            "specified it uses a default value for each machine.")
460

461
        parser.add_argument("--outdir", required=False, default=None,
×
462
            help="Put task scripts and logs in this directory relative to the "
463
            "production 'scripts' directory.  Default puts task directory "
464
            "in the main scripts directory.")
465

466
        parser.add_argument("--debug", required=False, default=False,
×
467
            action="store_true", help="debugging messages in job logs")
468

469
        return parser
×
470

471

472
    def dryrun(self):
×
473
        availtypes = ",".join(pipe.tasks.base.default_task_chain)
×
474

475
        parser = argparse.ArgumentParser(description="Print equivalent "
×
476
            "command-line jobs that would be run given the tasks and total"
477
            "number of processes",
478
            usage="desi_pipe dryrun [options] (use --help for details)")
479

480
        parser.add_argument("--taskfile", required=False, default=None,
×
481
            help="read tasks from this file (if not specified, read from "
482
            "STDIN)")
483

484
        parser = self._parse_run_opts(parser)
×
485

486
        parser.add_argument("--nodb", required=False, default=False,
×
487
            action="store_true", help="Do not use the production database.")
488

489
        parser.add_argument("--db-postgres-user", type=str, required=False,
×
490
            default="desidev_ro", help="If using postgres, connect as this "
491
            "user for read-only access")
492

493
        parser.add_argument("--force", required=False, default=False,
×
494
            action="store_true", help="print commands for all tasks, not"
495
            " only the ready ones")
496

497
        args = parser.parse_args(sys.argv[2:])
×
498

499
        self._check_nersc_host(args)
×
500

501
        tasks = pipe.prod.task_read(args.taskfile)
×
502

503
        control.dryrun(
×
504
            tasks,
505
            nersc=args.nersc,
506
            nersc_queue=args.nersc_queue,
507
            nersc_maxtime=args.nersc_maxtime,
508
            nersc_maxnodes=args.nersc_maxnodes,
509
            nersc_shifter=args.nersc_shifter,
510
            mpi_procs=args.mpi_procs,
511
            mpi_run=args.mpi_run,
512
            nodb=args.nodb,
513
            db_postgres_user=args.db_postgres_user,
514
            force=args.force)
515

516
        return
×
517

518

519
    def script(self):
×
520
        availtypes = ",".join(pipe.tasks.base.default_task_chain)
×
521

522
        parser = argparse.ArgumentParser(description="Create batch script(s) "
×
523
            "for the list of tasks.  If the --nersc option is not given, "
524
            "create shell script(s) that optionally uses mpirun.  Prints"
525
            " the list of generated scripts to STDOUT.",
526
            usage="desi_pipe script [options] (use --help for details)")
527

528
        parser.add_argument("--taskfile", required=False, default=None,
×
529
            help="read tasks from this file (if not specified, read from "
530
            "STDIN)")
531

532
        parser = self._parse_run_opts(parser)
×
533

534
        parser.add_argument("--nodb", required=False, default=False,
×
535
            action="store_true", help="Do not use the production database.")
536

537
        parser.add_argument("--db-postgres-user", type=str, required=False,
×
538
            default="desidev_ro", help="If using postgres, connect as this "
539
            "user for read-only access")
540

541
        args = parser.parse_args(sys.argv[2:])
×
542

543
        self._check_nersc_host(args)
×
544

545
        scripts = control.script(
×
546
            args.taskfile,
547
            nersc=args.nersc,
548
            nersc_queue=args.nersc_queue,
549
            nersc_maxtime=args.nersc_maxtime,
550
            nersc_maxnodes=args.nersc_maxnodes,
551
            nersc_shifter=args.nersc_shifter,
552
            mpi_procs=args.mpi_procs,
553
            mpi_run=args.mpi_run,
554
            procs_per_node=args.procs_per_node,
555
            nodb=args.nodb,
556
            out=args.outdir,
557
            db_postgres_user=args.db_postgres_user)
558

559
        if len(scripts) > 0:
×
560
            print(",".join(scripts))
×
561

562
        return
×
563

564

565
    def run(self):
×
566
        availtypes = ",".join(pipe.tasks.base.default_task_chain)
×
567

568
        parser = argparse.ArgumentParser(description="Create and run batch "
×
569
            "script(s) for the list of tasks.  If the --nersc option is not "
570
            "given, create shell script(s) that optionally uses mpirun.",
571
            usage="desi_pipe run [options] (use --help for details)")
572

573
        parser.add_argument("--taskfile", required=False, default=None,
×
574
            help="Read tasks from this file (if not specified, read from "
575
            "STDIN).  Tasks of all types will be packed into a single job!")
576

577
        parser.add_argument("--nosubmitted", required=False, default=False,
×
578
            action="store_true", help="Skip all tasks flagged as submitted")
579

580
        parser.add_argument("--depjobs", required=False, default=None,
×
581
            help="comma separated list of slurm job IDs to specify as "
582
            "dependencies of this current job.")
583

584
        parser = self._parse_run_opts(parser)
×
585

586
        parser.add_argument("--nodb", required=False, default=False,
×
587
            action="store_true", help="Do not use the production database.")
588

589
        args = parser.parse_args(sys.argv[2:])
×
590

591
        self._check_nersc_host(args)
×
592

593
        deps = None
×
594
        if args.depjobs is not None:
×
595
            deps = args.depjobs.split(",")
×
596

597
        jobids = control.run(
×
598
            args.taskfile,
599
            nosubmitted=args.nosubmitted,
600
            depjobs=deps,
601
            nersc=args.nersc,
602
            nersc_queue=args.nersc_queue,
603
            nersc_maxtime=args.nersc_maxtime,
604
            nersc_maxnodes=args.nersc_maxnodes,
605
            nersc_shifter=args.nersc_shifter,
606
            mpi_procs=args.mpi_procs,
607
            mpi_run=args.mpi_run,
608
            procs_per_node=args.procs_per_node,
609
            nodb=args.nodb,
610
            out=args.outdir,
611
            debug=args.debug)
612

613
        if len(jobids) > 0:
×
614
            print(",".join(jobids))
×
615

616
        return
×
617

618

619
    def chain(self):
×
620
        parser = argparse.ArgumentParser(description="Create a chain of jobs"
×
621
            " using all ready tasks for each specified step.  The order of"
622
            " the pipeline steps is fixed, regardless of the order specified"
623
            " by the --tasktypes option.",
624
            usage="desi_pipe chain [options] (use --help for details)")
625

626
        parser.add_argument("--tasktypes", required=False, default=",".join(pipe.tasks.base.default_task_chain),
×
627
            help="comma separated list of slurm job IDs to specify as "
628
            "dependencies of this current job.")
629

630
        parser.add_argument("--nights", required=False, default=None,
×
631
            help="comma separated (YYYYMMDD) or regex pattern- only nights "
632
            "matching these patterns will be generated.")
633

634
        parser.add_argument("--expid", required=False, type=int, default=-1,
×
635
            help="Only select tasks for a single exposure ID.")
636

637
        parser.add_argument("--spec", required=False, type=int, default=-1,
×
638
            help="Only select tasks for a single spectrograph.")
639

640
        parser.add_argument("--states", required=False, default=None,
×
641
            help="comma separated list of states (see defs.py).  Only tasks "
642
            "in these states will be scheduled.")
643

644
        parser.add_argument("--pack", required=False, default=False,
×
645
            action="store_true", help="Pack the chain of pipeline steps "
646
            "into a single job script.")
647

648
        parser.add_argument("--nosubmitted", required=False, default=False,
×
649
            action="store_true", help="Skip all tasks flagged as submitted")
650

651
        parser.add_argument("--depjobs", required=False, default=None,
×
652
            help="comma separated list of slurm job IDs to specify as "
653
            "dependencies of this current job.")
654

655
        parser.add_argument("--dryrun", action="store_true",
×
656
                            help="do not submit the jobs.")
657

658
        parser = self._parse_run_opts(parser)
×
659

660
        args = parser.parse_args(sys.argv[2:])
×
661

662
        self._check_nersc_host(args)
×
663

664
        expid = None
×
665
        if args.expid >= 0:
×
666
            expid = args.expid
×
667

668
        spec = None
×
669
        if args.spec >= 0:
×
670
            spec = args.spec
×
671

672
        states = None
×
673
        if args.states is not None:
×
674
            states = args.states.split(",")
×
675

676
        deps = None
×
677
        if args.depjobs is not None:
×
678
            deps = args.depjobs.split(",")
×
679

680
        jobids = control.chain(
×
681
            args.tasktypes.split(","),
682
            nightstr=args.nights,
683
            states=states,
684
            expid=expid,
685
            spec=spec,
686
            pack=args.pack,
687
            nosubmitted=args.nosubmitted,
688
            depjobs=deps,
689
            nersc=args.nersc,
690
            nersc_queue=args.nersc_queue,
691
            nersc_maxtime=args.nersc_maxtime,
692
            nersc_maxnodes=args.nersc_maxnodes,
693
            nersc_shifter=args.nersc_shifter,
694
            mpi_procs=args.mpi_procs,
695
            mpi_run=args.mpi_run,
696
            procs_per_node=args.procs_per_node,
697
            out=args.outdir,
698
            debug=args.debug,
699
            dryrun=args.dryrun)
700

701
        if jobids is not None and len(jobids) > 0:
×
702
            print(",".join(jobids))
×
703

704
        return
×
705

706

707
    def go(self):
×
708
        parser = argparse.ArgumentParser(description="Run a full production "
×
709
            "from start to finish.  This will pack steps into 3 jobs per night"
710
            " and then run redshift fitting after all nights are done.  Note "
711
            "that if you are running multiple nights you should use the "
712
            "regular queue.",
713
            usage="desi_pipe go [options] (use --help for details)")
714

715
        parser.add_argument("--nights", required=False, default=None,
×
716
            help="comma separated (YYYYMMDD) or regex pattern- only nights "
717
            "matching these patterns will be generated.")
718

719
        parser.add_argument("--states", required=False, default=None,
×
720
            help="comma separated list of states. This argument is "
721
            "passed to chain (see desi_pipe chain --help for more info).")
722
        parser.add_argument("--resume", action = 'store_true',
×
723
            help="same as --states waiting,ready")
724

725
        parser.add_argument("--dryrun", action="store_true",
×
726
            help="do not submit the jobs.")
727

728
        parser = self._parse_run_opts(parser)
×
729

730
        args = parser.parse_args(sys.argv[2:])
×
731

732
        if args.resume :
×
733
            if args.states is not None :
×
734
                print("Ambiguous arguments: cannot specify --states along with --resume option which would overwrite the list of states.")
×
735
                return
×
736
            else :
737
                args.states="waiting,ready"
×
738

739
        self._check_nersc_host(args)
×
740

741
        allnights = io.get_nights(strip_path=True)
×
742
        nights = pipe.prod.select_nights(allnights, args.nights)
×
743

744
        log = get_logger()
×
745

746
        blocks = [
×
747
            ["preproc", "psf", "psfnight"],
748
            ["traceshift", "extract"],
749
            ["fiberflat", "fiberflatnight", "sky", "starfit", "fluxcalib",
750
             "cframe"],
751
        ]
752

753
        nightlast = list()
×
754

755
        states = args.states
×
756
        if states is not None :
×
757
            states = states.split(",")
×
758

759
        for nt in nights:
×
760
            previous = None
×
761
            log.info("Submitting processing chains for night {}".format(nt))
×
762
            for blk in blocks:
×
763
                jobids = control.chain(
×
764
                    blk,
765
                    nightstr="{}".format(nt),
766
                    pack=True,
767
                    depjobs=previous,
768
                    nersc=args.nersc,
769
                    nersc_queue=args.nersc_queue,
770
                    nersc_maxtime=args.nersc_maxtime,
771
                    nersc_maxnodes=args.nersc_maxnodes,
772
                    nersc_shifter=args.nersc_shifter,
773
                    mpi_procs=args.mpi_procs,
774
                    mpi_run=args.mpi_run,
775
                    procs_per_node=args.procs_per_node,
776
                    out=args.outdir,
777
                    states=states,
778
                    debug=args.debug,
779
                    dryrun=args.dryrun)
780
                if jobids is not None and len(jobids)>0 :
×
781
                    previous = [ jobids[-1] ]
×
782
            if previous is not None and len(previous)>0 :
×
783
                nightlast.append(previous[-1])
×
784

785
        # Submit spectal grouping
786
        jobids = control.chain(
×
787
            ["spectra"],
788
            pack=True,
789
            depjobs=nightlast,
790
            nersc=args.nersc,
791
            nersc_queue=args.nersc_queue,
792
            nersc_maxtime=args.nersc_maxtime,
793
            nersc_maxnodes=args.nersc_maxnodes,
794
            nersc_shifter=args.nersc_shifter,
795
            mpi_procs=args.mpi_procs,
796
            mpi_run=args.mpi_run,
797
            procs_per_node=args.procs_per_node,
798
            out=args.outdir,
799
            states=states,
800
            debug=args.debug,
801
            dryrun=args.dryrun)
802

803
        previous = None
×
804
        if jobids is not None and len(jobids)>0 :
×
805
            previous = [ jobids[-1] ]
×
806

807
        # Submit redshifts (and coadds)
808
        jobids = control.chain(
×
809
            ["redshift"],
810
            pack=True,
811
            depjobs=previous,
812
            nersc=args.nersc,
813
            nersc_queue=args.nersc_queue,
814
            nersc_maxtime=args.nersc_maxtime,
815
            nersc_maxnodes=args.nersc_maxnodes,
816
            nersc_shifter=args.nersc_shifter,
817
            mpi_procs=args.mpi_procs,
818
            mpi_run=args.mpi_run,
819
            procs_per_node=args.procs_per_node,
820
            out=args.outdir,
821
            states=states,
822
            debug=args.debug,
823
            dryrun=args.dryrun)
824

825
        return
×
826

827

828
    def status(self):
×
829
        availtypes = ",".join(pipe.tasks.base.default_task_chain)
×
830

831
        parser = argparse.ArgumentParser(\
×
832
            description="Explore status of pipeline tasks",
833
            usage="desi_pipe status [options] (use --help for details)")
834

835
        parser.add_argument("--task", required=False, default=None,
×
836
            help="get log information about this specific task")
837

838
        parser.add_argument("--tasktypes", required=False, default=None,
×
839
            help="comma separated list of task types ({})".format(availtypes))
840

841
        parser.add_argument("--nights", required=False, default=None,
×
842
            help="comma separated (YYYYMMDD) or regex pattern- only nights "
843
            "matching these patterns will be examined.")
844

845
        parser.add_argument("--expid", required=False, type=int, default=None,
×
846
            help="Only select tasks for a single exposure ID.")
847

848
        parser.add_argument("--spec", required=False, type=int, default=None,
×
849
            help="Only select tasks for a single spectrograph.")
850

851
        parser.add_argument("--states", required=False, default=None,
×
852
            help="comma separated list of states (see defs.py).  Only tasks "
853
            "in these states will be returned.")
854

855
        parser.add_argument("--db-postgres-user", type=str, required=False,
×
856
            default="desidev_ro", help="If using postgres, connect as this "
857
            "user for read-only access")
858

859
        args = parser.parse_args(sys.argv[2:])
×
860

861
        ttypes = None
×
862
        if args.tasktypes is not None:
×
863
            ttypes = args.tasktypes.split(",")
×
864

865
        states = None
×
866
        if args.states is not None:
×
867
            states = args.states.split(",")
×
868

869
        control.status(
×
870
            task=args.task, tasktypes=ttypes, nightstr=args.nights,
871
            states=states, expid=args.expid, spec=args.spec,
872
            db_postgres_user=args.db_postgres_user
873
        )
874

875
        return
×
876

877

878
    def top(self):
×
879
        parser = argparse.ArgumentParser(\
×
880
            description="Live overview of the production state",
881
            usage="desi_pipe top [options] (use --help for details)")
882

883
        parser.add_argument("--refresh", required=False, type=int, default=10,
×
884
            help="The number of seconds between DB queries")
885

886
        parser.add_argument("--db-postgres-user", type=str, required=False,
×
887
            default="desidev_ro", help="If using postgres, connect as this "
888
            "user for read-only access")
889

890
        parser.add_argument("--once", required=False, action="store_true",
×
891
            default=False, help="Print info once without clearing the terminal")
892

893
        args = parser.parse_args(sys.argv[2:])
×
894

895
        import signal
×
896
        import time
×
897
        import numpy as np
×
898

899
        def signal_handler(signal, frame):
×
900
            sys.exit(0)
×
901
        signal.signal(signal.SIGINT, signal_handler)
×
902

903
        dbpath = io.get_pipe_database()
×
904
        db = pipe.load_db(dbpath, mode="r", user=args.db_postgres_user)
×
905

906
        tasktypes = pipe.tasks.base.default_task_chain
×
907

908
        header_state = ""
×
909
        for s in pipe.task_states:
×
910
            header_state = "{} {:8s}|".format(header_state, s)
×
911
        header_state = "{} {:8s}|".format(header_state, "submit")
×
912

913
        sep = "----------------+---------+---------+---------+---------+---------+---------+"
×
914

915
        header = "{}\n{:16s}|{}\n{}".format(sep, "   Task Type",
×
916
            header_state, sep)
917

918
        def print_status(clear=False):
×
919
            taskstates = {}
×
920
            tasksub = {}
×
921
            with db.cursor() as cur:
×
922
                for t in tasktypes:
×
923
                    taskstates[t] = {}
×
924
                    cmd = "select state from {}".format(t)
×
925
                    cur.execute(cmd)
×
926
                    st = np.array([ int(x[0]) for x in cur.fetchall() ])
×
927
                    for s in pipe.task_states:
×
928
                        taskstates[t][s] = \
×
929
                            np.sum(st == pipe.task_state_to_int[s])
930
                    if (t != "spectra") and (t != "redshift"):
×
931
                        cmd = "select submitted from {}".format(t)
×
932
                        cur.execute(cmd)
×
933
                        isub = [ int(x[0]) for x in cur.fetchall() ]
×
934
                        tasksub[t] = np.sum(isub).astype(int)
×
935
            if clear:
×
936
                print("\033c")
×
937
            print(header)
×
938
            for t in tasktypes:
×
939
                line = "{:16s}|".format(t)
×
940
                for s in pipe.task_states:
×
941
                    line = "{}{:9d}|".format(line, taskstates[t][s])
×
942
                if t in tasksub:
×
943
                    line = "{}{:9d}|".format(line, tasksub[t])
×
944
                else:
945
                    line = "{}{:9s}|".format(line, "       NA")
×
946
                print(line)
×
947
            print(sep)
×
948
            if clear:
×
949
                print(" (Use Ctrl-C to Quit) ")
×
950
            sys.stdout.flush()
×
951

952
        if args.once:
×
953
            print_status(clear=False)
×
954
        else:
UNCOV
955
            while True:
×
956
                print_status(clear=True)
×
957
                time.sleep(args.refresh)
×
958

959
        return
×
960

961

962
def main():
×
963
    p = PipeUI()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc