• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dsavransky / EXOSIMS / 15825442190

23 Jun 2025 01:21PM UTC coverage: 65.872% (+0.009%) from 65.863%
15825442190

push

github

dsavransky
removing unused configuration files and updating coverage config

9809 of 14891 relevant lines covered (65.87%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.29
/EXOSIMS/MissionSim.py
1
from EXOSIMS.util.vprint import vprint
1✔
2
from EXOSIMS.util.get_module import get_module
1✔
3
from EXOSIMS.util.waypoint import waypoint
1✔
4
from EXOSIMS.util.CheckScript import CheckScript
1✔
5
from EXOSIMS.util.keyword_fun import get_all_mod_kws, check_opticalsystem_kws
1✔
6
import logging
1✔
7
import json
1✔
8
import os.path
1✔
9
import tempfile
1✔
10
import numpy as np
1✔
11
import astropy.units as u
1✔
12
import copy
1✔
13
import inspect
1✔
14
import warnings
1✔
15
from typing import Dict, Optional, Any
1✔
16

17

18
class MissionSim(object):
1✔
19
    """Mission Simulation (backbone) class
20

21
    This class is responsible for instantiating all objects required
22
    to carry out a mission simulation.
23

24
    Args:
25
        scriptfile (string):
26
            Full path to JSON script file.  If not set, assumes that dictionary has been
27
            passed through specs.
28
        nopar (bool):
29
            Ignore any provided ensemble module in the script or specs and force the
30
            prototype :py:class:`~EXOSIMS.Prototypes.SurveyEnsemble`. Defaults True
31
        verbose (bool):
32
            Input to :py:meth:`~EXOSIMS.util.vprint.vprint`, toggling verbosity of
33
            print statements. Defaults True.
34
        logfile (str of None):
35
            Path to the log file. If None, logging is turned off.
36
            If supplied but empty string (''), a temporary file is generated.
37
        loglevel (str):
38
            The level of log, defaults to 'INFO'. Valid levels are: CRITICAL,
39
            ERROR, WARNING, INFO, DEBUG (case sensitive).
40
        checkInputs (bool):
41
            Validate inputs against selected modules. Defaults True.
42
        **specs (dict):
43
            :ref:`sec:inputspec`
44

45
    Attributes:
46
        StarCatalog (StarCatalog module):
47
            StarCatalog class object (only retained if keepStarCatalog is True)
48
        PlanetPopulation (PlanetPopulation module):
49
            PlanetPopulation class object
50
        PlanetPhysicalModel (PlanetPhysicalModel module):
51
            PlanetPhysicalModel class object
52
        OpticalSystem (OpticalSystem module):
53
            OpticalSystem class object
54
        ZodiacalLight (ZodiacalLight module):
55
            ZodiacalLight class object
56
        BackgroundSources (BackgroundSources module):
57
            Background Source class object
58
        PostProcessing (PostProcessing module):
59
            PostProcessing class object
60
        Completeness (Completeness module):
61
            Completeness class object
62
        TargetList (TargetList module):
63
            TargetList class object
64
        SimulatedUniverse (SimulatedUniverse module):
65
            SimulatedUniverse class object
66
        Observatory (Observatory module):
67
            Observatory class object
68
        TimeKeeping (TimeKeeping module):
69
            TimeKeeping class object
70
        SurveySimulation (SurveySimulation module):
71
            SurveySimulation class object
72
        SurveyEnsemble (SurveyEnsemble module):
73
            SurveyEnsemble class object
74
        modules (dict):
75
            Dictionary of all modules, except StarCatalog
76
        verbose (bool):
77
            Boolean used to create the vprint function, equivalent to the
78
            python print function with an extra verbose toggle parameter
79
            (True by default). The vprint function can be accessed by all
80
            modules from EXOSIMS.util.vprint.
81
        seed (int):
82
            Number used to seed the NumPy generator. Generated randomly
83
            by default.
84
        logfile (str):
85
            Path to the log file. If None, logging is turned off.
86
            If supplied but empty string (''), a temporary file is generated.
87
        loglevel (str):
88
            The level of log, defaults to 'INFO'. Valid levels are: CRITICAL,
89
            ERROR, WARNING, INFO, DEBUG (case sensitive).
90

91
    """
92

93
    _modtype = "MissionSim"
1✔
94
    _outspec = {}
1✔
95

96
    def __init__(
1✔
97
        self,
98
        scriptfile=None,
99
        nopar=False,
100
        verbose=True,
101
        logfile=None,
102
        loglevel="INFO",
103
        checkInputs=True,
104
        **specs,
105
    ):
106
        """Initializes all modules from a given script file or specs dictionary."""
107

108
        # extend given specs with (JSON) script file
109
        if scriptfile is not None:
1✔
110
            assert os.path.isfile(scriptfile), "%s is not a file." % scriptfile
1✔
111
            try:
1✔
112
                with open(scriptfile, "r") as ff:
1✔
113
                    script = ff.read()
1✔
114
                specs_from_file = json.loads(script)
1✔
115
                specs_from_file.update(specs)
1✔
116
            except ValueError as err:
1✔
117
                print(
1✔
118
                    "Error: %s: Input file `%s' improperly formatted."
119
                    % (self._modtype, scriptfile)
120
                )
121
                print("Error: JSON error was: %s" % err)
1✔
122
                # re-raise here to suppress the rest of the backtrace.
123
                # it is only confusing details about the bowels of json.loads()
124
                raise ValueError(err)
1✔
125
        else:
126
            specs_from_file = {}
1✔
127
        specs.update(specs_from_file)
1✔
128

129
        if "modules" not in specs:
1✔
130
            raise ValueError("No modules field found in script.")
1✔
131

132
        # push all inputs into combined spec dict and save a copy before it gets
133
        # modified through module instantiations
134
        specs["verbose"] = bool(verbose)
1✔
135
        specs["logfile"] = logfile
1✔
136
        specs["loglevel"] = loglevel
1✔
137
        specs["nopar"] = bool(nopar)
1✔
138
        specs["checkInputs"] = bool(checkInputs)
1✔
139
        specs0 = copy.deepcopy(specs)
1✔
140

141
        # load the vprint function (same line in all prototype module constructors)
142
        self.verbose = specs["verbose"]
1✔
143
        self.vprint = vprint(self.verbose)
1✔
144

145
        # overwrite any ensemble setting if nopar is set
146
        self.nopar = specs["nopar"]
1✔
147
        if self.nopar:
1✔
148
            self.vprint("No-parallel: resetting SurveyEnsemble to Prototype")
×
149
            specs["modules"]["SurveyEnsemble"] = " "
×
150

151
        # start logging, with log file and logging level (default: INFO)
152
        self.logfile = specs.get("logfile", None)
1✔
153
        self.loglevel = specs.get("loglevel", "INFO").upper()
1✔
154
        specs["logger"] = self.get_logger(self.logfile, self.loglevel)
1✔
155
        specs["logger"].info(
1✔
156
            "Start Logging: loglevel = %s" % specs["logger"].level
157
            + " (%s)" % self.loglevel
158
        )
159

160
        # populate outspec
161
        self.checkInputs = specs["checkInputs"]
1✔
162
        for att in self.__dict__:
1✔
163
            if att not in ["vprint"]:
1✔
164
                self._outspec[att] = self.__dict__[att]
1✔
165

166
        # create a surveysimulation object (triggering init of everything else)
167
        self.SurveySimulation = get_module(
1✔
168
            specs["modules"]["SurveySimulation"], "SurveySimulation"
169
        )(**specs)
170

171
        # collect sub-initializations
172
        SS = self.SurveySimulation
1✔
173
        self.StarCatalog = SS.StarCatalog
1✔
174
        self.PlanetPopulation = SS.PlanetPopulation
1✔
175
        self.PlanetPhysicalModel = SS.PlanetPhysicalModel
1✔
176
        self.OpticalSystem = SS.OpticalSystem
1✔
177
        self.ZodiacalLight = SS.ZodiacalLight
1✔
178
        self.BackgroundSources = SS.BackgroundSources
1✔
179
        self.PostProcessing = SS.PostProcessing
1✔
180
        self.Completeness = SS.Completeness
1✔
181
        self.TargetList = SS.TargetList
1✔
182
        self.SimulatedUniverse = SS.SimulatedUniverse
1✔
183
        self.Observatory = SS.Observatory
1✔
184
        self.TimeKeeping = SS.TimeKeeping
1✔
185

186
        # now that everything has successfully built, you can create the ensemble
187
        self.SurveyEnsemble = get_module(
1✔
188
            specs["modules"]["SurveyEnsemble"], "SurveyEnsemble"
189
        )(**copy.deepcopy(specs0))
190

191
        # create a dictionary of all modules, except StarCatalog
192
        self.modules = SS.modules
1✔
193
        self.modules["SurveyEnsemble"] = self.SurveyEnsemble
1✔
194

195
        # alias SurveySimulation random seed to attribute for easier access
196
        self.seed = self.SurveySimulation.seed
1✔
197
        self.specs0 = specs0
1✔
198

199
        # run keywords check if requested
200
        if self.checkInputs:
1✔
201
            self.check_ioscripts()
1✔
202

203
    def check_ioscripts(self) -> None:
1✔
204
        """Collect all input and output scripts against selected module inits and
205
        report and discrepancies.
206

207
        """
208

209
        # get a list of all modules in use
210
        mods = {}
1✔
211
        for modname in self.modules:
1✔
212
            mods[modname] = self.modules[modname].__class__
1✔
213
        mods["MissionSim"] = self.__class__
1✔
214
        if self.TargetList.keepStarCatalog:
1✔
215
            mods["StarCatalog"] = self.TargetList.StarCatalog.__class__
×
216
        else:
217
            mods["StarCatalog"] = self.TargetList.StarCatalog
1✔
218

219
        # collect keywords
220
        allkws, allkwmods, ukws, ukwcounts = get_all_mod_kws(mods)
1✔
221

222
        self.vprint(
1✔
223
            (
224
                "\nThe following keywords are used in multiple inits (this is ok):"
225
                "\n\t{}"
226
            ).format("\n\t".join(ukws[ukwcounts > 1]))
227
        )
228

229
        # now let's compare against specs0
230
        unused = list(set(self.specs0.keys()) - set(ukws))
1✔
231
        if "modules" in unused:
1✔
232
            unused.remove("modules")
1✔
233
        if "seed" in unused:
1✔
234
            unused.remove("seed")
1✔
235
        if len(unused) > 0:
1✔
236
            warnstr = (
×
237
                "\nThe following input keywords were not used in any "
238
                "module init:\n\t{}".format("\n\t".join(unused))
239
            )
240
            warnings.warn(warnstr)
×
241
        self.vprint(
1✔
242
            "\n{} keywords were set to their default values.".format(
243
                len(list(set(ukws) - set(self.specs0.keys())))
244
            )
245
        )
246

247
        # check the optical system
248
        out = check_opticalsystem_kws(self.specs0, self.OpticalSystem)
1✔
249
        if out != "":
1✔
250
            warnings.warn(f"\n{out}")
×
251

252
        # and finally, let's look at the outspec
253
        outspec = self.genOutSpec(modnames=True)
1✔
254
        # these are extraneous things allowed to be in outspec:
255
        whitelist = ["modules", "Version", "seed", "nStars"]
1✔
256
        for w in whitelist:
1✔
257
            _ = outspec.pop(w, None)
1✔
258

259
        extraouts = list(set(outspec.keys()) - set(ukws))
1✔
260
        if len(extraouts) > 0:
1✔
261
            warnstr = (
×
262
                "\nThe following outspec keywords were not used in any "
263
                "module init:\n"
264
            )
265
            for e in extraouts:
×
266
                warnstr += "\t{:>20} ({})\n".format(e, outspec[e])
×
267
            warnings.warn(warnstr)
×
268

269
        missingouts = list(set(ukws) - set(outspec.keys()))
1✔
270
        if len(missingouts) > 0:
1✔
271
            allkws = np.array(allkws)
×
272
            allkwmods = np.array(allkwmods)
×
273
            warnstr = "\nThe following init keywords were not found in any outspec:\n"
×
274
            for m in missingouts:
×
275
                warnstr += "\t{:>20} ({})\n".format(
×
276
                    m, ", ".join(allkwmods[allkws == m])
277
                )
278
            warnings.warn(warnstr)
×
279

280
    def get_logger(self, logfile, loglevel):
1✔
281
        r"""Set up logging object so other modules can use logging.info(),
282
        logging.warning, etc.
283

284
        Args:
285
            logfile (string):
286
                Path to the log file. If None, logging is turned off.
287
                If supplied but empty string (''), a temporary file is generated.
288
            loglevel (string):
289
                The level of log, defaults to 'INFO'. Valid levels are: CRITICAL,
290
                ERROR, WARNING, INFO, DEBUG (case sensitive).
291

292
        Returns:
293
            logger (logging object):
294
                Mission Simulation logger.
295

296
        """
297

298
        # this leaves the default logger in place, so logger.warn will appear on stderr
299
        if logfile is None:
1✔
300
            logger = logging.getLogger(__name__)
1✔
301
            return logger
1✔
302

303
        # if empty string, a temporary file is generated
304
        if logfile == "":
×
305
            (dummy, logfile) = tempfile.mkstemp(
×
306
                prefix="EXOSIMS.", suffix=".log", dir="/tmp", text=True
307
            )
308
        else:
309
            # ensure we can write it
310
            try:
×
311
                with open(logfile, "w") as ff:  # noqa: F841
×
312
                    pass
×
313
            except (IOError, OSError):
×
314
                print('%s: Failed to open logfile "%s"' % (__file__, logfile))
×
315
                return None
×
316
        self.vprint("Logging to '%s' at level '%s'" % (logfile, loglevel.upper()))
×
317

318
        # convert string to a logging.* level
319
        numeric_level = getattr(logging, loglevel.upper())
×
320
        if not isinstance(numeric_level, int):
×
321
            raise ValueError("Invalid log level: %s" % loglevel.upper())
×
322

323
        # set up the top-level logger
324
        logger = logging.getLogger(__name__.split(".")[0])
×
325
        logger.setLevel(numeric_level)
×
326
        # do not propagate EXOSIMS messages to higher loggers in this case
327
        logger.propagate = False
×
328
        # create a handler that outputs to the named file
329
        handler = logging.FileHandler(logfile, mode="w")
×
330
        handler.setLevel(numeric_level)
×
331
        # logging format
332
        formatter = logging.Formatter(
×
333
            "%(levelname)s: %(filename)s(%(lineno)s): " + "%(funcName)s: %(message)s"
334
        )
335
        handler.setFormatter(formatter)
×
336
        # add the handler to the logger
337
        logger.addHandler(handler)
×
338

339
        return logger
×
340

341
    def run_sim(self):
1✔
342
        """Convenience method that simply calls the SurveySimulation run_sim method."""
343

344
        res = self.SurveySimulation.run_sim()
1✔
345

346
        return res
1✔
347

348
    def reset_sim(self, genNewPlanets=True, rewindPlanets=True, seed=None):
1✔
349
        """
350
        Convenience method that simply calls the SurveySimulation reset_sim method.
351
        """
352

353
        res = self.SurveySimulation.reset_sim(
1✔
354
            genNewPlanets=genNewPlanets, rewindPlanets=rewindPlanets, seed=seed
355
        )
356
        self.modules = self.SurveySimulation.modules
1✔
357
        self.modules["SurveyEnsemble"] = self.SurveyEnsemble  # replace SurveyEnsemble
1✔
358

359
        return res
1✔
360

361
    def run_ensemble(
1✔
362
        self,
363
        nb_run_sim,
364
        run_one=None,
365
        genNewPlanets=True,
366
        rewindPlanets=True,
367
        kwargs={},
368
    ):
369
        """
370
        Convenience method that simply calls the SurveyEnsemble run_ensemble method.
371
        """
372

373
        res = self.SurveyEnsemble.run_ensemble(
1✔
374
            self,
375
            nb_run_sim,
376
            run_one=run_one,
377
            genNewPlanets=genNewPlanets,
378
            rewindPlanets=rewindPlanets,
379
            kwargs=kwargs,
380
        )
381

382
        return res
1✔
383

384
    def genOutSpec(
1✔
385
        self,
386
        tofile: Optional[str] = None,
387
        modnames: bool = False,
388
    ) -> Dict[str, Any]:
389
        """Join all _outspec dicts from all modules into one output dict
390
        and optionally write out to JSON file on disk.
391

392
        Args:
393
            tofile (str):
394
                Name of the file containing all output specifications (outspecs).
395
                Defaults to None.
396
            modnames (bool):
397
                If True, populate outspec dictionary with the module it originated from,
398
                instead of the actual value of the keyword.  Defaults False.
399

400
        Returns:
401
            dict:
402
                Dictionary containing the full :ref:`sec:inputspec`, including all
403
                filled-in default values. Combination of all individual module _outspec
404
                attributes.
405
        """
406

407
        starting_outspec = copy.copy(self._outspec)
1✔
408
        if modnames:
1✔
409
            for k in starting_outspec:
1✔
410
                starting_outspec[k] = "MissionSim"
1✔
411

412
        out = self.SurveySimulation.genOutSpec(
1✔
413
            starting_outspec=starting_outspec, tofile=tofile, modnames=modnames
414
        )
415

416
        return out
1✔
417

418
    def genWaypoint(self, targetlist=None, duration=365, tofile=None, charmode=False):
1✔
419
        """generates a ballpark estimate of the expected number of star visits and
420
        the total completeness of these visits for a given mission duration
421

422
        Args:
423
            targetlist (list, optional):
424
                List of target indices
425
            duration (int):
426
                The length of time allowed for the waypoint calculation, defaults to 365
427
            tofile (str):
428
                Name of the file containing a plot of total completeness over mission
429
                time, by default genWaypoint does not create this plot
430
            charmode (bool):
431
                Run the waypoint calculation using either the char mode instead of the
432
                det mode
433

434
        Returns:
435
            dict:
436
                Output dictionary containing the number of stars visited, the total
437
                completeness achieved, and the amount of time spent integrating.
438

439
        """
440

441
        SS = self.SurveySimulation
×
442
        OS = SS.OpticalSystem
×
443
        ZL = SS.ZodiacalLight
×
444
        Comp = SS.Completeness
×
445
        TL = SS.TargetList
×
446
        Obs = SS.Observatory
×
447
        TK = SS.TimeKeeping
×
448

449
        # Only considering detections
450
        allModes = OS.observingModes
×
451
        if charmode:
×
452
            int_mode = list(
×
453
                filter(lambda mode: "spec" in mode["inst"]["name"], allModes)
454
            )[0]
455
        else:
456
            int_mode = list(filter(lambda mode: mode["detectionMode"], allModes))[0]
×
457
        mpath = os.path.split(inspect.getfile(self.__class__))[0]
×
458

459
        if targetlist is not None:
×
460
            num_stars = len(targetlist)
×
461
            sInds = np.array(targetlist)
×
462
        else:
463
            num_stars = TL.nStars
×
464
            sInds = np.arange(TL.nStars)
×
465

466
        startTimes = TK.currentTimeAbs + np.zeros(num_stars) * u.d
×
467
        fZ = ZL.fZ(Obs, TL, sInds, startTimes, int_mode)
×
468
        JEZ = TL.JEZ0[int_mode["hex"]]
×
469
        dMag = TL.int_dMag[sInds]
×
470
        WA = TL.int_WA[sInds]
×
471

472
        # sort star indices by completeness diveded by integration time
473
        intTimes = OS.calc_intTime(TL, sInds, fZ, JEZ, dMag, WA, int_mode)
×
474
        comps = Comp.comp_per_intTime(intTimes, TL, sInds, fZ, JEZ, WA[0], int_mode)
×
475
        wp = waypoint(comps, intTimes, duration, mpath, tofile)
×
476

477
        return wp
×
478

479
    def checkScript(self, scriptfile, prettyprint=False, tofile=None):
1✔
480
        """Calls CheckScript and checks the script file against the mission outspec.
481

482
        Args:
483
            scriptfile (str):
484
                The path to the scriptfile being used by the sim
485
            prettyprint (bool):
486
                Outputs the results of Checkscript in a readable format.
487
            tofile (str):
488
                Name of the file containing all output specifications (outspecs).
489
                Default to None.
490

491
        Returns:
492
            str:
493
                Output string containing the results of the check.
494

495
        """
496
        if scriptfile is not None:
×
497
            cs = CheckScript(scriptfile, self.genOutSpec())
×
498
            out = cs.recurse(cs.specs_from_file, cs.outspec, pretty_print=prettyprint)
×
499
            if tofile is not None:
×
500
                mpath = os.path.split(inspect.getfile(self.__class__))[0]
×
501
                cs.write_file(os.path.join(mpath, tofile))
×
502
        else:
503
            out = None
×
504

505
        return out
×
506

507
    def DRM2array(self, key, DRM=None):
1✔
508
        """Creates an array corresponding to one element of the DRM dictionary.
509

510
        Args:
511
            key (str):
512
                Name of an element of the DRM dictionary
513
            DRM (list(dict)):
514
                Design Reference Mission, contains the results of a survey simulation
515

516
        Returns:
517
            ~numpy.ndarray or ~astropy.units.Quantity(~numpy.ndarray):
518
                Array containing all the DRM values of the selected element
519

520
        """
521

522
        # if the DRM was not specified, get it from the current SurveySimulation
523
        if DRM is None:
1✔
524
            DRM = self.SurveySimulation.DRM
1✔
525
        assert DRM != [], "DRM is empty. Use MissionSim.run_sim() to start simulation."
1✔
526

527
        # lists of relevant DRM elements
528
        keysStar = [
1✔
529
            "star_ind",
530
            "star_name",
531
            "arrival_time",
532
            "OB_nb",
533
            "det_time",
534
            "det_fZ",
535
            "char_time",
536
            "char_fZ",
537
        ]
538
        keysPlans = ["plan_inds", "det_status", "det_SNR", "char_status", "char_SNR"]
1✔
539
        keysParams = [
1✔
540
            "det_JEZ",
541
            "det_dMag",
542
            "det_WA",
543
            "det_d",
544
            "char_JEZ",
545
            "char_dMag",
546
            "char_WA",
547
            "char_d",
548
        ]
549
        keysFA = [
1✔
550
            "FA_det_status",
551
            "FA_char_status",
552
            "FA_char_SNR",
553
            "FA_char_JEZ",
554
            "FA_char_dMag",
555
            "FA_char_WA",
556
        ]
557
        keysOcculter = [
1✔
558
            "slew_time",
559
            "slew_dV",
560
            "det_dF_lateral",
561
            "scMass",
562
            "slewMass",
563
            "skMass",
564
            "char_dF_axial",
565
            "det_mass_used",
566
            "slew_mass_used",
567
            "det_dF_axial",
568
            "det_dV",
569
            "slew_angle",
570
            "char_dF_lateral",
571
        ]
572

573
        assert key in (
1✔
574
            keysStar + keysPlans + keysParams + keysFA + keysOcculter
575
        ), "'%s' is not a relevant DRM keyword."
576

577
        # extract arrays for each relevant keyword in the DRM
578
        if key in keysParams:
1✔
579
            if "det_" in key:
1✔
580
                elem = [DRM[x]["det_params"][key[4:]] for x in range(len(DRM))]
1✔
581

582
            elif "char_" in key:
×
583
                elem = [DRM[x]["char_params"][key[5:]] for x in range(len(DRM))]
×
584

585
        elif isinstance(DRM[0][key], u.Quantity):
1✔
586
            elem = ([DRM[x][key].value for x in range(len(DRM))]) * DRM[0][key].unit
×
587

588
        else:
589
            elem = [DRM[x][key] for x in range(len(DRM))]
1✔
590

591
        try:
1✔
592
            elem = np.array(elem)
1✔
593
        except ValueError:
×
594
            elem = np.array(elem, dtype=object)
×
595

596
        return elem
1✔
597

598
    def filter_status(self, key, status, DRM=None, obsMode=None):
1✔
599
        """Finds the values of one DRM element, corresponding to a status value,
600
        for detection or characterization.
601

602
        Args:
603
            key (string):
604
                Name of an element of the DRM dictionary
605
            status (integer):
606
                Status value for detection or characterization
607
            DRM (list of dicts):
608
                Design Reference Mission, contains the results of a survey simulation
609
            obsMode (string):
610
                Observing mode type ('det' or 'char')
611

612
        Returns:
613
            elemStat (ndarray / astropy Quantity array):
614
                Array containing all the DRM values of the selected element,
615
                and filtered by the value of the corresponding status array
616

617
        """
618

619
        # get DRM detection status array
620
        det = (
1✔
621
            self.DRM2array("FA_det_status", DRM=DRM)
622
            if "FA_" in key
623
            else self.DRM2array("det_status", DRM=DRM)
624
        )
625
        # get DRM characterization status array
626
        char = (
1✔
627
            self.DRM2array("FA_char_status", DRM=DRM)
628
            if "FA_" in key
629
            else self.DRM2array("char_status", DRM=DRM)
630
        )
631
        # get DRM key element array
632
        elem = self.DRM2array(key, DRM=DRM)
1✔
633

634
        # reshape elem array, for keys with 1 value per observation
635
        if elem[0].shape == () and "FA_" not in key:
1✔
636
            if isinstance(elem[0], u.Quantity):
×
637
                elem = np.array(
×
638
                    [
639
                        np.array([elem[x].value] * len(det[x])) * elem[0].unit
640
                        for x in range(len(elem))
641
                    ]
642
                )
643
            else:
644
                elem = np.array(
×
645
                    [np.array([elem[x]] * len(det[x])) for x in range(len(elem))]
646
                )
647

648
        # assign a default observing mode type ('det' or 'char')
649
        if obsMode is None:
1✔
650
            obsMode = "char" if "char_" in key else "det"
1✔
651
        assert obsMode in (
1✔
652
            "det",
653
            "char",
654
        ), "Observing mode type must be 'det' or 'char'."
655

656
        # now, find the values of elem corresponding to the specified status value
657
        if obsMode == "det":
1✔
658
            if isinstance(elem[0], u.Quantity):
1✔
659
                elemStat = (
×
660
                    np.concatenate(
661
                        [elem[x][det[x] == status].value for x in range(len(elem))]
662
                    )
663
                    * elem[0].unit
664
                )
665
            else:
666
                elemStat = np.concatenate(
1✔
667
                    [elem[x][det[x] == status] for x in range(len(elem))]
668
                )
669
        else:  # if obsMode is 'char'
670
            if isinstance(elem[0], u.Quantity):
×
671
                elemDet = (
×
672
                    np.concatenate(
673
                        [elem[x][det[x] == 1].value for x in range(len(elem))]
674
                    )
675
                    * elem[0].unit
676
                )
677
            else:
678
                elemDet = np.concatenate(
×
679
                    [elem[x][det[x] == 1] for x in range(len(elem))]
680
                )
681
            charDet = np.concatenate([char[x][det[x] == 1] for x in range(len(elem))])
×
682
            elemStat = elemDet[charDet == status]
×
683

684
        return elemStat
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc