• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dsavransky / EXOSIMS / 19649142370

24 Nov 2025 09:01PM UTC coverage: 65.627% (+0.02%) from 65.607%
19649142370

Pull #446

github

web-flow
Merge ed4071a9f into e3fee7dce
Pull Request #446: [pre-commit.ci] pre-commit autoupdate

9787 of 14913 relevant lines covered (65.63%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.25
/EXOSIMS/MissionSim.py
1
from EXOSIMS.util.vprint import vprint
1✔
2
from EXOSIMS.util.get_module import get_module
1✔
3
from EXOSIMS.util.waypoint import waypoint
1✔
4
from EXOSIMS.util.CheckScript import CheckScript
1✔
5
from EXOSIMS.util.keyword_fun import get_all_mod_kws, check_opticalsystem_kws
1✔
6
import logging
1✔
7
import json
1✔
8
import os.path
1✔
9
import tempfile
1✔
10
import numpy as np
1✔
11
import astropy.units as u
1✔
12
import copy
1✔
13
import inspect
1✔
14
import warnings
1✔
15
from typing import Dict, Optional, Any
1✔
16

17

18
class MissionSim(object):
1✔
19
    """Mission Simulation (backbone) class
20

21
    This class is responsible for instantiating all objects required
22
    to carry out a mission simulation.
23

24
    Args:
25
        scriptfile (string):
26
            Full path to JSON script file.  If not set, assumes that dictionary has been
27
            passed through specs.
28
        nopar (bool):
29
            Ignore any provided ensemble module in the script or specs and force the
30
            prototype :py:class:`~EXOSIMS.Prototypes.SurveyEnsemble`. Defaults True
31
        verbose (bool):
32
            Input to :py:meth:`~EXOSIMS.util.vprint.vprint`, toggling verbosity of
33
            print statements. Defaults True.
34
        logfile (str of None):
35
            Path to the log file. If None, logging is turned off.
36
            If supplied but empty string (''), a temporary file is generated.
37
        loglevel (str):
38
            The level of log, defaults to 'INFO'. Valid levels are: CRITICAL,
39
            ERROR, WARNING, INFO, DEBUG (case sensitive).
40
        checkInputs (bool):
41
            Validate inputs against selected modules. Defaults True.
42
        **specs (dict):
43
            :ref:`sec:inputspec`
44

45
    Attributes:
46
        StarCatalog (StarCatalog module):
47
            StarCatalog class object
48
        PlanetPopulation (PlanetPopulation module):
49
            PlanetPopulation class object
50
        PlanetPhysicalModel (PlanetPhysicalModel module):
51
            PlanetPhysicalModel class object
52
        OpticalSystem (OpticalSystem module):
53
            OpticalSystem class object
54
        ZodiacalLight (ZodiacalLight module):
55
            ZodiacalLight class object
56
        BackgroundSources (BackgroundSources module):
57
            Background Source class object
58
        PostProcessing (PostProcessing module):
59
            PostProcessing class object
60
        Completeness (Completeness module):
61
            Completeness class object
62
        TargetList (TargetList module):
63
            TargetList class object
64
        SimulatedUniverse (SimulatedUniverse module):
65
            SimulatedUniverse class object
66
        Observatory (Observatory module):
67
            Observatory class object
68
        TimeKeeping (TimeKeeping module):
69
            TimeKeeping class object
70
        SurveySimulation (SurveySimulation module):
71
            SurveySimulation class object
72
        SurveyEnsemble (SurveyEnsemble module):
73
            SurveyEnsemble class object
74
        modules (dict):
75
            Dictionary of all modules, except StarCatalog
76
        verbose (bool):
77
            Boolean used to create the vprint function, equivalent to the
78
            python print function with an extra verbose toggle parameter
79
            (True by default). The vprint function can be accessed by all
80
            modules from EXOSIMS.util.vprint.
81
        seed (int):
82
            Number used to seed the NumPy generator. Generated randomly
83
            by default.
84
        logfile (str):
85
            Path to the log file. If None, logging is turned off.
86
            If supplied but empty string (''), a temporary file is generated.
87
        loglevel (str):
88
            The level of log, defaults to 'INFO'. Valid levels are: CRITICAL,
89
            ERROR, WARNING, INFO, DEBUG (case sensitive).
90

91
    """
92

93
    _modtype = "MissionSim"
1✔
94
    _outspec = {}
1✔
95

96
    def __init__(
1✔
97
        self,
98
        scriptfile=None,
99
        nopar=False,
100
        verbose=True,
101
        logfile=None,
102
        loglevel="INFO",
103
        checkInputs=True,
104
        **specs,
105
    ):
106
        """Initializes all modules from a given script file or specs dictionary."""
107

108
        # extend given specs with (JSON) script file
109
        if scriptfile is not None:
1✔
110
            assert os.path.isfile(scriptfile), "%s is not a file." % scriptfile
1✔
111
            try:
1✔
112
                with open(scriptfile, "r") as ff:
1✔
113
                    script = ff.read()
1✔
114
                specs_from_file = json.loads(script)
1✔
115
                specs_from_file.update(specs)
1✔
116
            except ValueError as err:
1✔
117
                print(
1✔
118
                    "Error: %s: Input file `%s' improperly formatted."
119
                    % (self._modtype, scriptfile)
120
                )
121
                print("Error: JSON error was: %s" % err)
1✔
122
                # re-raise here to suppress the rest of the backtrace.
123
                # it is only confusing details about the bowels of json.loads()
124
                raise ValueError(err)
1✔
125
        else:
126
            specs_from_file = {}
1✔
127
        specs.update(specs_from_file)
1✔
128

129
        if "modules" not in specs:
1✔
130
            raise ValueError("No modules field found in script.")
1✔
131

132
        # push all inputs into combined spec dict and save a copy before it gets
133
        # modified through module instantiations
134
        specs["verbose"] = bool(verbose)
1✔
135
        specs["logfile"] = logfile
1✔
136
        specs["loglevel"] = loglevel
1✔
137
        specs["nopar"] = bool(nopar)
1✔
138
        specs["checkInputs"] = bool(checkInputs)
1✔
139
        specs0 = copy.deepcopy(specs)
1✔
140

141
        # load the vprint function (same line in all prototype module constructors)
142
        self.verbose = specs["verbose"]
1✔
143
        self.vprint = vprint(self.verbose)
1✔
144

145
        # overwrite any ensemble setting if nopar is set
146
        self.nopar = specs["nopar"]
1✔
147
        if self.nopar:
1✔
148
            self.vprint("No-parallel: resetting SurveyEnsemble to Prototype")
×
149
            specs["modules"]["SurveyEnsemble"] = " "
×
150

151
        # start logging, with log file and logging level (default: INFO)
152
        self.logfile = specs.get("logfile", None)
1✔
153
        self.loglevel = specs.get("loglevel", "INFO").upper()
1✔
154
        specs["logger"] = self.get_logger(self.logfile, self.loglevel)
1✔
155
        specs["logger"].info(
1✔
156
            "Start Logging: loglevel = %s" % specs["logger"].level
157
            + " (%s)" % self.loglevel
158
        )
159

160
        # populate outspec
161
        self.checkInputs = specs["checkInputs"]
1✔
162
        for att in self.__dict__:
1✔
163
            if att not in ["vprint"]:
1✔
164
                self._outspec[att] = self.__dict__[att]
1✔
165

166
        # create a surveysimulation object (triggering init of everything else)
167
        self.SurveySimulation = get_module(
1✔
168
            specs["modules"]["SurveySimulation"], "SurveySimulation"
169
        )(**specs)
170

171
        # collect sub-initializations
172
        SS = self.SurveySimulation
1✔
173
        self.StarCatalog = SS.StarCatalog
1✔
174
        self.PlanetPopulation = SS.PlanetPopulation
1✔
175
        self.PlanetPhysicalModel = SS.PlanetPhysicalModel
1✔
176
        self.OpticalSystem = SS.OpticalSystem
1✔
177
        self.ZodiacalLight = SS.ZodiacalLight
1✔
178
        self.BackgroundSources = SS.BackgroundSources
1✔
179
        self.PostProcessing = SS.PostProcessing
1✔
180
        self.Completeness = SS.Completeness
1✔
181
        self.TargetList = SS.TargetList
1✔
182
        self.SimulatedUniverse = SS.SimulatedUniverse
1✔
183
        self.Observatory = SS.Observatory
1✔
184
        self.TimeKeeping = SS.TimeKeeping
1✔
185

186
        # now that everything has successfully built, you can create the ensemble
187
        self.SurveyEnsemble = get_module(
1✔
188
            specs["modules"]["SurveyEnsemble"], "SurveyEnsemble"
189
        )(**copy.deepcopy(specs0))
190

191
        # create a dictionary of all modules, except StarCatalog
192
        self.modules = SS.modules
1✔
193
        self.modules["SurveyEnsemble"] = self.SurveyEnsemble
1✔
194

195
        # alias SurveySimulation random seed to attribute for easier access
196
        self.seed = self.SurveySimulation.seed
1✔
197
        self.specs0 = specs0
1✔
198

199
        # run keywords check if requested
200
        if self.checkInputs:
1✔
201
            self.check_ioscripts()
1✔
202

203
    def check_ioscripts(self) -> None:
1✔
204
        """Collect all input and output scripts against selected module inits and
205
        report and discrepancies.
206

207
        """
208

209
        # get a list of all modules in use
210
        mods = {}
1✔
211
        for modname in self.modules:
1✔
212
            mods[modname] = self.modules[modname].__class__
1✔
213
        mods["MissionSim"] = self.__class__
1✔
214
        mods["StarCatalog"] = self.TargetList.StarCatalog.__class__
1✔
215

216
        # collect keywords
217
        allkws, allkwmods, ukws, ukwcounts = get_all_mod_kws(mods)
1✔
218

219
        self.vprint(
1✔
220
            (
221
                "\nThe following keywords are used in multiple inits (this is ok):"
222
                "\n\t{}"
223
            ).format("\n\t".join(ukws[ukwcounts > 1]))
224
        )
225

226
        # now let's compare against specs0
227
        unused = list(set(self.specs0.keys()) - set(ukws))
1✔
228
        if "modules" in unused:
1✔
229
            unused.remove("modules")
1✔
230
        if "seed" in unused:
1✔
231
            unused.remove("seed")
1✔
232
        if len(unused) > 0:
1✔
233
            warnstr = (
1✔
234
                "\nThe following input keywords were not used in any "
235
                "module init:\n\t{}".format("\n\t".join(unused))
236
            )
237
            warnings.warn(warnstr)
1✔
238
        self.vprint(
1✔
239
            "\n{} keywords were set to their default values.".format(
240
                len(list(set(ukws) - set(self.specs0.keys())))
241
            )
242
        )
243

244
        # check the optical system
245
        out = check_opticalsystem_kws(self.specs0, self.OpticalSystem)
1✔
246
        if out != "":
1✔
247
            warnings.warn(f"\n{out}")
×
248

249
        # and finally, let's look at the outspec
250
        outspec = self.genOutSpec(modnames=True)
1✔
251
        # these are extraneous things allowed to be in outspec:
252
        whitelist = ["modules", "Version", "seed", "nStars"]
1✔
253
        for w in whitelist:
1✔
254
            _ = outspec.pop(w, None)
1✔
255

256
        extraouts = list(set(outspec.keys()) - set(ukws))
1✔
257
        if len(extraouts) > 0:
1✔
258
            warnstr = (
×
259
                "\nThe following outspec keywords were not used in any "
260
                "module init:\n"
261
            )
262
            for e in extraouts:
×
263
                warnstr += "\t{:>20} ({})\n".format(e, outspec[e])
×
264
            warnings.warn(warnstr)
×
265

266
        missingouts = list(set(ukws) - set(outspec.keys()))
1✔
267
        if len(missingouts) > 0:
1✔
268
            allkws = np.array(allkws)
×
269
            allkwmods = np.array(allkwmods)
×
270
            warnstr = "\nThe following init keywords were not found in any outspec:\n"
×
271
            for m in missingouts:
×
272
                warnstr += "\t{:>20} ({})\n".format(
×
273
                    m, ", ".join(allkwmods[allkws == m])
274
                )
275
            warnings.warn(warnstr)
×
276

277
    def get_logger(self, logfile, loglevel):
1✔
278
        r"""Set up logging object so other modules can use logging.info(),
279
        logging.warning, etc.
280

281
        Args:
282
            logfile (string):
283
                Path to the log file. If None, logging is turned off.
284
                If supplied but empty string (''), a temporary file is generated.
285
            loglevel (string):
286
                The level of log, defaults to 'INFO'. Valid levels are: CRITICAL,
287
                ERROR, WARNING, INFO, DEBUG (case sensitive).
288

289
        Returns:
290
            logger (logging object):
291
                Mission Simulation logger.
292

293
        """
294

295
        # this leaves the default logger in place, so logger.warn will appear on stderr
296
        if logfile is None:
1✔
297
            logger = logging.getLogger(__name__)
1✔
298
            return logger
1✔
299

300
        # if empty string, a temporary file is generated
301
        if logfile == "":
×
302
            (dummy, logfile) = tempfile.mkstemp(
×
303
                prefix="EXOSIMS.", suffix=".log", dir="/tmp", text=True
304
            )
305
        else:
306
            # ensure we can write it
307
            try:
×
308
                with open(logfile, "w") as ff:  # noqa: F841
×
309
                    pass
×
310
            except (IOError, OSError):
×
311
                print('%s: Failed to open logfile "%s"' % (__file__, logfile))
×
312
                return None
×
313
        self.vprint("Logging to '%s' at level '%s'" % (logfile, loglevel.upper()))
×
314

315
        # convert string to a logging.* level
316
        numeric_level = getattr(logging, loglevel.upper())
×
317
        if not isinstance(numeric_level, int):
×
318
            raise ValueError("Invalid log level: %s" % loglevel.upper())
×
319

320
        # set up the top-level logger
321
        logger = logging.getLogger(__name__.split(".")[0])
×
322
        logger.setLevel(numeric_level)
×
323
        # do not propagate EXOSIMS messages to higher loggers in this case
324
        logger.propagate = False
×
325
        # create a handler that outputs to the named file
326
        handler = logging.FileHandler(logfile, mode="w")
×
327
        handler.setLevel(numeric_level)
×
328
        # logging format
329
        formatter = logging.Formatter(
×
330
            "%(levelname)s: %(filename)s(%(lineno)s): " + "%(funcName)s: %(message)s"
331
        )
332
        handler.setFormatter(formatter)
×
333
        # add the handler to the logger
334
        logger.addHandler(handler)
×
335

336
        return logger
×
337

338
    def run_sim(self):
1✔
339
        """Convenience method that simply calls the SurveySimulation run_sim method."""
340

341
        res = self.SurveySimulation.run_sim()
1✔
342

343
        return res
1✔
344

345
    def reset_sim(self, genNewPlanets=True, rewindPlanets=True, seed=None):
1✔
346
        """
347
        Convenience method that simply calls the SurveySimulation reset_sim method.
348
        """
349

350
        res = self.SurveySimulation.reset_sim(
1✔
351
            genNewPlanets=genNewPlanets, rewindPlanets=rewindPlanets, seed=seed
352
        )
353
        self.modules = self.SurveySimulation.modules
1✔
354
        self.modules["SurveyEnsemble"] = self.SurveyEnsemble  # replace SurveyEnsemble
1✔
355

356
        return res
1✔
357

358
    def run_ensemble(
1✔
359
        self,
360
        nb_run_sim,
361
        run_one=None,
362
        genNewPlanets=True,
363
        rewindPlanets=True,
364
        kwargs={},
365
    ):
366
        """
367
        Convenience method that simply calls the SurveyEnsemble run_ensemble method.
368
        """
369

370
        res = self.SurveyEnsemble.run_ensemble(
1✔
371
            self,
372
            nb_run_sim,
373
            run_one=run_one,
374
            genNewPlanets=genNewPlanets,
375
            rewindPlanets=rewindPlanets,
376
            kwargs=kwargs,
377
        )
378

379
        return res
1✔
380

381
    def genOutSpec(
1✔
382
        self,
383
        tofile: Optional[str] = None,
384
        modnames: bool = False,
385
    ) -> Dict[str, Any]:
386
        """Join all _outspec dicts from all modules into one output dict
387
        and optionally write out to JSON file on disk.
388

389
        Args:
390
            tofile (str):
391
                Name of the file containing all output specifications (outspecs).
392
                Defaults to None.
393
            modnames (bool):
394
                If True, populate outspec dictionary with the module it originated from,
395
                instead of the actual value of the keyword.  Defaults False.
396

397
        Returns:
398
            dict:
399
                Dictionary containing the full :ref:`sec:inputspec`, including all
400
                filled-in default values. Combination of all individual module _outspec
401
                attributes.
402
        """
403

404
        starting_outspec = copy.copy(self._outspec)
1✔
405
        if modnames:
1✔
406
            for k in starting_outspec:
1✔
407
                starting_outspec[k] = "MissionSim"
1✔
408

409
        out = self.SurveySimulation.genOutSpec(
1✔
410
            starting_outspec=starting_outspec, tofile=tofile, modnames=modnames
411
        )
412

413
        return out
1✔
414

415
    def genWaypoint(self, targetlist=None, duration=365, tofile=None, charmode=False):
1✔
416
        """generates a ballpark estimate of the expected number of star visits and
417
        the total completeness of these visits for a given mission duration
418

419
        Args:
420
            targetlist (list, optional):
421
                List of target indices
422
            duration (int):
423
                The length of time allowed for the waypoint calculation, defaults to 365
424
            tofile (str):
425
                Name of the file containing a plot of total completeness over mission
426
                time, by default genWaypoint does not create this plot
427
            charmode (bool):
428
                Run the waypoint calculation using either the char mode instead of the
429
                det mode
430

431
        Returns:
432
            dict:
433
                Output dictionary containing the number of stars visited, the total
434
                completeness achieved, and the amount of time spent integrating.
435

436
        """
437

438
        SS = self.SurveySimulation
×
439
        OS = SS.OpticalSystem
×
440
        ZL = SS.ZodiacalLight
×
441
        Comp = SS.Completeness
×
442
        TL = SS.TargetList
×
443
        Obs = SS.Observatory
×
444
        TK = SS.TimeKeeping
×
445

446
        # Only considering detections
447
        allModes = OS.observingModes
×
448
        if charmode:
×
449
            int_mode = list(
×
450
                filter(lambda mode: "spec" in mode["inst"]["name"], allModes)
451
            )[0]
452
        else:
453
            int_mode = list(filter(lambda mode: mode["detectionMode"], allModes))[0]
×
454
        mpath = os.path.split(inspect.getfile(self.__class__))[0]
×
455

456
        if targetlist is not None:
×
457
            num_stars = len(targetlist)
×
458
            sInds = np.array(targetlist)
×
459
        else:
460
            num_stars = TL.nStars
×
461
            sInds = np.arange(TL.nStars)
×
462

463
        startTimes = TK.currentTimeAbs + np.zeros(num_stars) * u.d
×
464
        fZ = ZL.fZ(Obs, TL, sInds, startTimes, int_mode)
×
465
        JEZ = TL.JEZ0[int_mode["hex"]]
×
466
        dMag = TL.int_dMag[sInds]
×
467
        WA = TL.int_WA[sInds]
×
468

469
        # sort star indices by completeness diveded by integration time
470
        intTimes = OS.calc_intTime(TL, sInds, fZ, JEZ, dMag, WA, int_mode)
×
471
        comps = Comp.comp_per_intTime(intTimes, TL, sInds, fZ, JEZ, WA[0], int_mode)
×
472
        wp = waypoint(comps, intTimes, duration, mpath, tofile)
×
473

474
        return wp
×
475

476
    def checkScript(self, scriptfile, prettyprint=False, tofile=None):
1✔
477
        """Calls CheckScript and checks the script file against the mission outspec.
478

479
        Args:
480
            scriptfile (str):
481
                The path to the scriptfile being used by the sim
482
            prettyprint (bool):
483
                Outputs the results of Checkscript in a readable format.
484
            tofile (str):
485
                Name of the file containing all output specifications (outspecs).
486
                Default to None.
487

488
        Returns:
489
            str:
490
                Output string containing the results of the check.
491

492
        """
493
        if scriptfile is not None:
×
494
            cs = CheckScript(scriptfile, self.genOutSpec())
×
495
            out = cs.recurse(cs.specs_from_file, cs.outspec, pretty_print=prettyprint)
×
496
            if tofile is not None:
×
497
                mpath = os.path.split(inspect.getfile(self.__class__))[0]
×
498
                cs.write_file(os.path.join(mpath, tofile))
×
499
        else:
500
            out = None
×
501

502
        return out
×
503

504
    def DRM2array(self, key, DRM=None):
1✔
505
        """Creates an array corresponding to one element of the DRM dictionary.
506

507
        Args:
508
            key (str):
509
                Name of an element of the DRM dictionary
510
            DRM (list(dict)):
511
                Design Reference Mission, contains the results of a survey simulation
512

513
        Returns:
514
            ~numpy.ndarray or ~astropy.units.Quantity(~numpy.ndarray):
515
                Array containing all the DRM values of the selected element
516

517
        """
518

519
        # if the DRM was not specified, get it from the current SurveySimulation
520
        if DRM is None:
1✔
521
            DRM = self.SurveySimulation.DRM
1✔
522
        assert DRM != [], "DRM is empty. Use MissionSim.run_sim() to start simulation."
1✔
523

524
        # lists of relevant DRM elements
525
        keysStar = [
1✔
526
            "star_ind",
527
            "star_name",
528
            "arrival_time",
529
            "OB_nb",
530
            "det_time",
531
            "det_fZ",
532
            "char_time",
533
            "char_fZ",
534
        ]
535
        keysPlans = ["plan_inds", "det_status", "det_SNR", "char_status", "char_SNR"]
1✔
536
        keysParams = [
1✔
537
            "det_JEZ",
538
            "det_dMag",
539
            "det_WA",
540
            "det_d",
541
            "char_JEZ",
542
            "char_dMag",
543
            "char_WA",
544
            "char_d",
545
        ]
546
        keysFA = [
1✔
547
            "FA_det_status",
548
            "FA_char_status",
549
            "FA_char_SNR",
550
            "FA_char_JEZ",
551
            "FA_char_dMag",
552
            "FA_char_WA",
553
        ]
554
        keysOcculter = [
1✔
555
            "slew_time",
556
            "slew_dV",
557
            "det_dF_lateral",
558
            "scMass",
559
            "slewMass",
560
            "skMass",
561
            "char_dF_axial",
562
            "det_mass_used",
563
            "slew_mass_used",
564
            "det_dF_axial",
565
            "det_dV",
566
            "slew_angle",
567
            "char_dF_lateral",
568
        ]
569

570
        assert key in (
1✔
571
            keysStar + keysPlans + keysParams + keysFA + keysOcculter
572
        ), "'%s' is not a relevant DRM keyword."
573

574
        # extract arrays for each relevant keyword in the DRM
575
        if key in keysParams:
1✔
576
            if "det_" in key:
1✔
577
                elem = [DRM[x]["det_params"][key[4:]] for x in range(len(DRM))]
1✔
578

579
            elif "char_" in key:
×
580
                elem = [DRM[x]["char_params"][key[5:]] for x in range(len(DRM))]
×
581

582
        elif isinstance(DRM[0][key], u.Quantity):
1✔
583
            elem = ([DRM[x][key].value for x in range(len(DRM))]) * DRM[0][key].unit
×
584

585
        else:
586
            elem = [DRM[x][key] for x in range(len(DRM))]
1✔
587

588
        try:
1✔
589
            elem = np.array(elem)
1✔
590
        except ValueError:
×
591
            elem = np.array(elem, dtype=object)
×
592

593
        return elem
1✔
594

595
    def filter_status(self, key, status, DRM=None, obsMode=None):
1✔
596
        """Finds the values of one DRM element, corresponding to a status value,
597
        for detection or characterization.
598

599
        Args:
600
            key (string):
601
                Name of an element of the DRM dictionary
602
            status (integer):
603
                Status value for detection or characterization
604
            DRM (list of dicts):
605
                Design Reference Mission, contains the results of a survey simulation
606
            obsMode (string):
607
                Observing mode type ('det' or 'char')
608

609
        Returns:
610
            elemStat (ndarray / astropy Quantity array):
611
                Array containing all the DRM values of the selected element,
612
                and filtered by the value of the corresponding status array
613

614
        """
615

616
        # get DRM detection status array
617
        det = (
1✔
618
            self.DRM2array("FA_det_status", DRM=DRM)
619
            if "FA_" in key
620
            else self.DRM2array("det_status", DRM=DRM)
621
        )
622
        # get DRM characterization status array
623
        char = (
1✔
624
            self.DRM2array("FA_char_status", DRM=DRM)
625
            if "FA_" in key
626
            else self.DRM2array("char_status", DRM=DRM)
627
        )
628
        # get DRM key element array
629
        elem = self.DRM2array(key, DRM=DRM)
1✔
630

631
        # reshape elem array, for keys with 1 value per observation
632
        if elem[0].shape == () and "FA_" not in key:
1✔
633
            if isinstance(elem[0], u.Quantity):
×
634
                elem = np.array(
×
635
                    [
636
                        np.array([elem[x].value] * len(det[x])) * elem[0].unit
637
                        for x in range(len(elem))
638
                    ]
639
                )
640
            else:
641
                elem = np.array(
×
642
                    [np.array([elem[x]] * len(det[x])) for x in range(len(elem))]
643
                )
644

645
        # assign a default observing mode type ('det' or 'char')
646
        if obsMode is None:
1✔
647
            obsMode = "char" if "char_" in key else "det"
1✔
648
        assert obsMode in (
1✔
649
            "det",
650
            "char",
651
        ), "Observing mode type must be 'det' or 'char'."
652

653
        # now, find the values of elem corresponding to the specified status value
654
        if obsMode == "det":
1✔
655
            if isinstance(elem[0], u.Quantity):
1✔
656
                elemStat = (
×
657
                    np.concatenate(
658
                        [elem[x][det[x] == status].value for x in range(len(elem))]
659
                    )
660
                    * elem[0].unit
661
                )
662
            else:
663
                elemStat = np.concatenate(
1✔
664
                    [elem[x][det[x] == status] for x in range(len(elem))]
665
                )
666
        else:  # if obsMode is 'char'
667
            if isinstance(elem[0], u.Quantity):
×
668
                elemDet = (
×
669
                    np.concatenate(
670
                        [elem[x][det[x] == 1].value for x in range(len(elem))]
671
                    )
672
                    * elem[0].unit
673
                )
674
            else:
675
                elemDet = np.concatenate(
×
676
                    [elem[x][det[x] == 1] for x in range(len(elem))]
677
                )
678
            charDet = np.concatenate([char[x][det[x] == 1] for x in range(len(elem))])
×
679
            elemStat = elemDet[charDet == status]
×
680

681
        return elemStat
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc