• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dsavransky / EXOSIMS / 14319210709

07 Apr 2025 08:50PM UTC coverage: 66.192% (+0.7%) from 65.466%
14319210709

Pull #408

github

web-flow
Merge d3c65c73b into 207b4b0d8
Pull Request #408: Performance optimization

769 of 923 new or added lines in 32 files covered. (83.32%)

20 existing lines in 9 files now uncovered.

9907 of 14967 relevant lines covered (66.19%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.06
/EXOSIMS/SurveySimulation/linearJScheduler.py
1
from EXOSIMS.Prototypes.SurveySimulation import SurveySimulation
1✔
2
import astropy.units as u
1✔
3
import numpy as np
1✔
4
import astropy.constants as const
1✔
5
from EXOSIMS.util._numpy_compat import copy_if_needed
1✔
6

7

8
class linearJScheduler(SurveySimulation):
1✔
9
    """linearJScheduler
10

11
    This class implements the linear cost function scheduler described
12
    in [Savransky2010]_
13

14
    Args:
15
        coeffs (iterable 6x1):
16
            Cost function coefficients: slew distance, completeness, least visited
17
            planet ramp, unvisited known RV planet ramp, least visited ramp, unvisited
18
            ramp
19
        revisit_wait (float):
20
            The time required for the scheduler to wait before a target may be revisited
21
        find_known_RV (boolean):
22
            A flag that turns on the ability to identify known RV stars. The stars with
23
            known rocky planets have their int_comp value set to 1.0.
24
        specs (dict):
25
            :ref:`sec:inputspec`
26

27
    """
28

29
    def __init__(
1✔
30
        self,
31
        coeffs=[1, 1, 1, 1, 2, 1],
32
        revisit_wait=91.25,
33
        find_known_RV=False,
34
        **specs
35
    ):
36

37
        SurveySimulation.__init__(self, **specs)
1✔
38
        TL = self.TargetList
1✔
39

40
        # verify that coefficients input is iterable 6x1
41
        if not (isinstance(coeffs, (list, tuple, np.ndarray))) or (len(coeffs) != 6):
1✔
42
            raise TypeError("coeffs must be a 6 element iterable")
×
43

44
        # Add to outspec
45
        self._outspec["coeffs"] = coeffs
1✔
46
        self._outspec["revisit_wait"] = revisit_wait
1✔
47

48
        # normalize coefficients
49
        coeffs = np.array(coeffs)
1✔
50
        coeffs = coeffs / np.linalg.norm(coeffs, ord=1)
1✔
51

52
        self.coeffs = coeffs
1✔
53
        self.find_known_RV = find_known_RV
1✔
54

55
        self.revisit_wait = revisit_wait * u.d
1✔
56

57
        self.earth_candidates = (
1✔
58
            []
59
        )  # list of detected earth-like planets aroung promoted stars
60
        self.no_dets = np.ones(self.TargetList.nStars, dtype=bool)
1✔
61
        self.known_stars = np.array([])
1✔
62
        self.known_rocky = np.array([])
1✔
63
        if self.find_known_RV:
1✔
64
            self.known_stars, self.known_rocky = self.find_known_plans()
×
65
            TL.int_comp[self.known_rocky] = 1.0
×
66

67
    def next_target(self, old_sInd, mode):
1✔
68
        """Finds index of next target star and calculates its integration time.
69

70
        This method chooses the next target star index based on which
71
        stars are available, their integration time, and maximum completeness.
72
        Returns None if no target could be found.
73

74
        Args:
75
            old_sInd (integer):
76
                Index of the previous target star
77
            mode (dict):
78
                Selected observing mode for detection
79

80
        Returns:
81
            DRM (dict):
82
                Design Reference Mission, contains the results of one complete
83
                observation (detection and characterization)
84
            sInd (integer):
85
                Index of next target star. Defaults to None.
86
            intTime (astropy Quantity):
87
                Selected star integration time for detection in units of day.
88
                Defaults to None.
89
            waitTime (astropy Quantity):
90
                a strategically advantageous amount of time to wait in the case of an
91
                occulter for slew times
92

93
        """
94
        OS = self.OpticalSystem
1✔
95
        TL = self.TargetList
1✔
96
        Obs = self.Observatory
1✔
97
        TK = self.TimeKeeping
1✔
98

99
        # create DRM
100
        DRM = {}
1✔
101

102
        # create appropriate koMap
103
        koMap = self.koMaps[mode["syst"]["name"]]
1✔
104

105
        # allocate settling time + overhead time
106
        tmpCurrentTimeAbs = (
1✔
107
            TK.currentTimeAbs.copy() + Obs.settlingTime + mode["syst"]["ohTime"]
108
        )
109
        tmpCurrentTimeNorm = (
1✔
110
            TK.currentTimeNorm.copy() + Obs.settlingTime + mode["syst"]["ohTime"]
111
        )
112

113
        # look for available targets
114
        # 1. initialize arrays
115
        slewTimes = np.zeros(TL.nStars) * u.d
1✔
116
        # fZs = np.zeros(TL.nStars) / u.arcsec**2
117
        dV = np.zeros(TL.nStars) * u.m / u.s
1✔
118
        intTimes = np.zeros(TL.nStars) * u.d
1✔
119
        obsTimes = np.zeros([2, TL.nStars]) * u.d
1✔
120
        sInds = np.arange(TL.nStars)
1✔
121

122
        # 2. find spacecraft orbital START positions (if occulter, positions
123
        # differ for each star) and filter out unavailable targets
124
        sd = None
1✔
125
        if OS.haveOcculter:
1✔
126
            sd = Obs.star_angularSep(TL, old_sInd, sInds, tmpCurrentTimeAbs)
×
127
            obsTimes = Obs.calculate_observableTimes(
×
128
                TL, sInds, tmpCurrentTimeAbs, self.koMaps, self.koTimes, mode
129
            )
130
            slewTimes = Obs.calculate_slewTimes(
×
131
                TL, old_sInd, sInds, sd, obsTimes, tmpCurrentTimeAbs
132
            )
133

134
        # 2.1 filter out totTimes > integration cutoff
135
        if len(sInds.tolist()) > 0:
1✔
136
            sInds = np.intersect1d(self.intTimeFilterInds, sInds)
1✔
137

138
        # start times, including slew times
139
        startTimes = tmpCurrentTimeAbs.copy() + slewTimes
1✔
140
        startTimesNorm = tmpCurrentTimeNorm.copy() + slewTimes
1✔
141

142
        # 2.5 Filter stars not observable at startTimes
143
        try:
1✔
144
            tmpIndsbool = list()
1✔
145
            for i in np.arange(len(sInds)):
1✔
146
                koTimeInd = np.where(
1✔
147
                    np.round(startTimes[sInds[i]].value) - self.koTimes.value == 0
148
                )[0][
149
                    0
150
                ]  # find indice where koTime is startTime[0]
151
                tmpIndsbool.append(
1✔
152
                    koMap[sInds[i]][koTimeInd].astype(bool)
153
                )  # Is star observable at time ind
154
            sInds = sInds[tmpIndsbool]
1✔
155
            del tmpIndsbool
1✔
156
        except:  # noqa: E722 If there are no target stars to observe
×
157
            sInds = np.asarray([], dtype=int)
×
158

159
        # 3. filter out all previously (more-)visited targets, unless in
160
        if len(sInds.tolist()) > 0:
1✔
161
            sInds = self.revisitFilter(sInds, tmpCurrentTimeNorm)
1✔
162

163
        # 4.1 calculate integration times for ALL preselected targets
164
        (
1✔
165
            maxIntTimeOBendTime,
166
            maxIntTimeExoplanetObsTime,
167
            maxIntTimeMissionLife,
168
        ) = TK.get_ObsDetectionMaxIntTime(Obs, mode)
169
        maxIntTime = min(
1✔
170
            maxIntTimeOBendTime, maxIntTimeExoplanetObsTime, maxIntTimeMissionLife
171
        )  # Maximum intTime allowed
172

173
        if len(sInds.tolist()) > 0:
1✔
174
            intTimes[sInds] = self.calc_targ_intTime(sInds, startTimes[sInds], mode)
1✔
175
            sInds = sInds[
1✔
176
                np.where(intTimes[sInds] <= maxIntTime)
177
            ]  # Filters targets exceeding end of OB
178
            endTimes = startTimes + intTimes
1✔
179

180
            if maxIntTime.value <= 0:
1✔
181
                sInds = np.asarray([], dtype=int)
×
182

183
        # 5.1 TODO Add filter to filter out stars entering and exiting keepout
184
        # between startTimes and endTimes
185

186
        # 5.2 find spacecraft orbital END positions (for each candidate target),
187
        # and filter out unavailable targets
188
        if len(sInds.tolist()) > 0 and Obs.checkKeepoutEnd:
1✔
189
            # endTimes may exist past koTimes so we have an exception to hand this case
190
            try:
1✔
191
                tmpIndsbool = list()
1✔
192
                for i in np.arange(len(sInds)):
1✔
193
                    koTimeInd = np.where(
1✔
194
                        np.round(endTimes[sInds[i]].value) - self.koTimes.value == 0
195
                    )[0][
196
                        0
197
                    ]  # find indice where koTime is endTime[0]
198
                    tmpIndsbool.append(
1✔
199
                        koMap[sInds[i]][koTimeInd].astype(bool)
200
                    )  # Is star observable at time ind
201
                sInds = sInds[tmpIndsbool]
1✔
202
                del tmpIndsbool
1✔
203
            except:  # noqa: E722
×
204
                sInds = np.asarray([], dtype=int)
×
205

206
        # 6. choose best target from remaining
207
        if len(sInds.tolist()) > 0:
1✔
208
            # choose sInd of next target
209
            sInd, waitTime = self.choose_next_target(
1✔
210
                old_sInd, sInds, slewTimes, intTimes[sInds]
211
            )
212

213
            # Should Choose Next Target decide there are no stars it wishes to
214
            # observe at this time.
215
            if (sInd is None) and (waitTime is not None):
1✔
216
                self.vprint(
×
217
                    (
218
                        "There are no stars Choose Next Target would like to Observe. "
219
                        "Waiting {}"
220
                    ).format(waitTime)
221
                )
222
                return DRM, None, None, waitTime
×
223
            elif (sInd is None) and (waitTime is not None):
1✔
224
                self.vprint(
×
225
                    (
226
                        "There are no stars Choose Next Target would like to Observe "
227
                        "and waitTime is None"
228
                    )
229
                )
230
                return DRM, None, None, waitTime
×
231
            # store selected star integration time
232
            intTime = intTimes[sInd]
1✔
233

234
        # if no observable target, advanceTime to next Observable Target
235
        else:
236
            self.vprint(
×
237
                "No Observable Targets at currentTimeNorm= "
238
                + str(TK.currentTimeNorm.copy())
239
            )
240
            return DRM, None, None, None
×
241

242
        # update visited list for selected star
243
        self.starVisits[sInd] += 1
1✔
244
        # store normalized start time for future completeness update
245
        self.lastObsTimes[sInd] = startTimesNorm[sInd]
1✔
246

247
        # populate DRM with occulter related values
248
        if OS.haveOcculter:
1✔
249
            DRM = Obs.log_occulterResults(
×
250
                DRM, slewTimes[sInd], sInd, sd[sInd], dV[sInd]
251
            )
252
            return DRM, sInd, intTime, slewTimes[sInd]
×
253

254
        return DRM, sInd, intTime, waitTime
1✔
255

256
    def choose_next_target(self, old_sInd, sInds, slewTimes, intTimes):
1✔
257
        """Choose next target based on truncated depth first search
258
        of linear cost function.
259

260
        Args:
261
            old_sInd (integer):
262
                Index of the previous target star
263
            sInds (integer array):
264
                Indices of available targets
265
            slewTimes (astropy quantity array):
266
                slew times to all stars (must be indexed by sInds)
267
            intTimes (astropy Quantity array):
268
                Integration times for detection in units of day
269

270
        Returns:
271
            tuple:
272
                sInd (int):
273
                    Index of next target star
274
                waitTime (astropy.units.Quantity or None):
275
                    the amount of time to wait (this method returns None)
276

277
        """
278

279
        Comp = self.Completeness
1✔
280
        TL = self.TargetList
1✔
281
        TK = self.TimeKeeping
1✔
282
        OS = self.OpticalSystem
1✔
283
        Obs = self.Observatory
1✔
284
        allModes = OS.observingModes
1✔
285

286
        # cast sInds to array
287
        sInds = np.array(sInds, ndmin=1, copy=copy_if_needed)
1✔
288
        known_sInds = np.intersect1d(sInds, self.known_rocky)
1✔
289

290
        # current star has to be in the adjmat
291
        if (old_sInd is not None) and (old_sInd not in sInds):
1✔
292
            sInds = np.append(sInds, old_sInd)
1✔
293

294
        # calculate dt since previous observation
295
        dt = TK.currentTimeNorm.copy() + slewTimes[sInds] - self.lastObsTimes[sInds]
1✔
296
        # get dynamic completeness values
297
        comps = Comp.completeness_update(TL, sInds, self.starVisits[sInds], dt)
1✔
298
        for idx, sInd in enumerate(sInds):
1✔
299
            if sInd in known_sInds:
1✔
300
                comps[idx] = 1.0
×
301

302
        # if first target, or if only 1 available target,
303
        # choose highest available completeness
304
        nStars = len(sInds)
1✔
305
        if (old_sInd is None) or (nStars == 1):
1✔
306
            sInd = np.random.choice(sInds[comps == max(comps)])
1✔
307
            return sInd, slewTimes[sInd]
1✔
308

309
        # define adjacency matrix
310
        A = np.zeros((nStars, nStars))
1✔
311

312
        # 0/ only consider slew distance when there's an occulter
313
        if OS.haveOcculter:
1✔
314
            r_ts = TL.starprop(sInds, TK.currentTimeAbs.copy())
1✔
315
            u_ts = (
1✔
316
                r_ts.to("AU").value.T / np.linalg.norm(r_ts.to("AU").value, axis=1)
317
            ).T
318
            angdists = np.arccos(np.clip(np.dot(u_ts, u_ts.T), -1, 1))
1✔
319
            A[np.ones((nStars), dtype=bool)] = angdists
1✔
320
            A = self.coeffs[0] * (A) / np.pi
1✔
321

322
        # 1/ add factor due to completeness
323
        A = A + self.coeffs[1] * (1 - comps)
1✔
324

325
        # add factor for unvisited ramp for known stars
326
        if np.any(known_sInds):
1✔
327
            # 2/ add factor for least visited known stars
328
            f_uv = np.zeros(nStars)
×
329
            u1 = np.in1d(sInds, known_sInds)
×
330
            u2 = self.starVisits[sInds] == min(self.starVisits[known_sInds])
×
331
            unvisited = np.logical_and(u1, u2)
×
332
            f_uv[unvisited] = (
×
333
                float(TK.currentTimeNorm.copy() / TK.missionLife.copy()) ** 2
334
            )
335
            A = A - self.coeffs[2] * f_uv
×
336

337
            # 3/ add factor for unvisited known stars
338
            no_visits = np.zeros(nStars)
×
339
            u2 = self.starVisits[sInds] == 0
×
340
            unvisited = np.logical_and(u1, u2)
×
341
            no_visits[unvisited] = 1.0
×
342
            A = A - self.coeffs[3] * no_visits
×
343

344
        # 4/ add factor due to unvisited ramp
345
        f_uv = np.zeros(nStars)
1✔
346
        unvisited = self.starVisits[sInds] == 0
1✔
347
        f_uv[unvisited] = float(TK.currentTimeNorm.copy() / TK.missionLife.copy()) ** 2
1✔
348
        A = A - self.coeffs[4] * f_uv
1✔
349

350
        # 5/ add factor due to revisited ramp
351
        if self.starRevisit.size != 0:
1✔
352
            f2_uv = 1 - (np.in1d(sInds, self.starRevisit[:, 0]))
1✔
353
            A = A + self.coeffs[5] * f2_uv
1✔
354

355
        # kill diagonal
356
        A = A + np.diag(np.ones(nStars) * np.inf)
1✔
357

358
        # take two traversal steps
359
        step1 = np.tile(A[sInds == old_sInd, :], (nStars, 1)).flatten("F")
1✔
360
        step2 = A[np.array(np.ones((nStars, nStars)), dtype=bool)]
1✔
361
        tmp = np.nanargmin(step1 + step2)
1✔
362
        sInd = sInds[int(np.floor(tmp / float(nStars)))]
1✔
363

364
        waitTime = slewTimes[sInd]
1✔
365
        # Check if exoplanetObsTime would be exceeded
366
        mode = list(filter(lambda mode: mode["detectionMode"], allModes))[0]
1✔
367
        (
1✔
368
            maxIntTimeOBendTime,
369
            maxIntTimeExoplanetObsTime,
370
            maxIntTimeMissionLife,
371
        ) = TK.get_ObsDetectionMaxIntTime(Obs, mode)
372
        maxIntTime = min(
1✔
373
            maxIntTimeOBendTime, maxIntTimeExoplanetObsTime, maxIntTimeMissionLife
374
        )  # Maximum intTime allowed
375
        intTimes2 = self.calc_targ_intTime(
1✔
376
            np.array([sInd]), TK.currentTimeAbs.copy(), mode
377
        )
378
        if (
1✔
379
            intTimes2 > maxIntTime
380
        ):  # check if max allowed integration time would be exceeded
381
            self.vprint("max allowed integration time would be exceeded")
×
382
            sInd = None
×
383
            waitTime = 1.0 * u.d
×
384

385
        return sInd, waitTime
1✔
386

387
    def revisitFilter(self, sInds, tmpCurrentTimeNorm):
1✔
388
        """Helper method for Overloading Revisit Filtering
389

390
        Args:
391
            sInds(int numpy.ndarray):
392
                indices of stars still in observation list
393
            tmpCurrentTimeNorm (MJD)
394
                the simulation time after overhead was added in MJD
395

396
        Returns:
397
            int numpy.ndarray:
398
                sInds - indices of stars still in observation list
399
        """
400
        tovisit = np.zeros(
1✔
401
            self.TargetList.nStars, dtype=bool
402
        )  # tovisit is a boolean array containing the
403
        if len(sInds) > 0:  # so long as there is at least 1 star left in sInds
1✔
404
            tovisit[sInds] = (self.starVisits[sInds] == min(self.starVisits[sInds])) & (
1✔
405
                self.starVisits[sInds] < self.nVisitsMax
406
            )  # Checks that no star has exceeded the number of revisits
407
            if (
1✔
408
                self.starRevisit.size != 0
409
            ):  # There is at least one revisit planned in starRevisit
410
                dt_rev = (
1✔
411
                    self.starRevisit[:, 1] * u.day - tmpCurrentTimeNorm
412
                )  # absolute temporal spacing between revisit and now.
413

414
                # return indices of all revisits within a threshold dt_max of revisit
415
                # day and indices of all revisits with no detections past the
416
                # revisit time
417
                ind_rev2 = [
1✔
418
                    int(x)
419
                    for x in self.starRevisit[dt_rev < 0 * u.d, 0]
420
                    if (x in sInds)
421
                ]
422
                tovisit[ind_rev2] = self.starVisits[ind_rev2] < self.nVisitsMax
1✔
423
            sInds = np.where(tovisit)[0]
1✔
424

425
        return sInds
1✔
426

427
    def scheduleRevisit(self, sInd, smin, det, pInds):
1✔
428
        """A Helper Method for scheduling revisits after observation detection
429

430
        Args:
431
            sInd (int):
432
                sInd of the star just detected
433
            smin (float):
434
                minimum separation of the planet to star of planet just detected
435
            det (bool numpy.ndarray):
436
                detection array
437
            pInds (int numpy.ndarray):
438
                Indices of planets around target star
439

440
        Returns:
441
            None
442

443
        updates self.starRevisit attribute
444
        """
445
        TK = self.TimeKeeping
1✔
446
        TL = self.TargetList
1✔
447
        SU = self.SimulatedUniverse
1✔
448

449
        # in both cases (detection or false alarm), schedule a revisit
450
        # based on minimum separation
451
        Ms = TL.MsTrue[sInd]
1✔
452
        if (
1✔
453
            smin is not None and smin is not np.nan
454
        ):  # smin is None if no planet was detected
455
            sp = smin
1✔
456
            if np.any(det):
1✔
457
                pInd_smin = pInds[det][np.argmin(SU.s[pInds[det]])]
1✔
458
                Mp = SU.Mp[pInd_smin]
1✔
459
            else:
460
                Mp = SU.Mp.mean()
×
461
            mu = const.G * (Mp + Ms)
1✔
462
            T = 2.0 * np.pi * np.sqrt(sp**3 / mu)
1✔
463
            t_rev = TK.currentTimeNorm.copy() + T / 2.0
1✔
464
        # otherwise, revisit based on average of population semi-major axis and mass
465
        else:
466
            sp = SU.s.mean()
1✔
467
            Mp = SU.Mp.mean()
1✔
468
            mu = const.G * (Mp + Ms)
1✔
469
            T = 2.0 * np.pi * np.sqrt(sp**3 / mu)
1✔
470
            t_rev = TK.currentTimeNorm.copy() + 0.75 * T
1✔
471

472
        # if no detections then schedule revisit based off of revisit_weight
473
        if not np.any(det):
1✔
474
            t_rev = TK.currentTimeNorm.copy() + self.revisit_wait
1✔
475
            self.no_dets[sInd] = True
1✔
476
        else:
477
            self.no_dets[sInd] = False
1✔
478

479
        t_rev = TK.currentTimeNorm.copy() + self.revisit_wait
1✔
480
        # finally, populate the revisit list (NOTE: sInd becomes a float)
481
        revisit = np.array([sInd, t_rev.to("day").value])
1✔
482
        if self.starRevisit.size == 0:  # If starRevisit has nothing in it
1✔
483
            self.starRevisit = np.array([revisit])  # initialize sterRevisit
1✔
484
        else:
485
            revInd = np.where(self.starRevisit[:, 0] == sInd)[
1✔
486
                0
487
            ]  # indices of the first column of the starRevisit list containing sInd
488
            if revInd.size == 0:
1✔
489
                self.starRevisit = np.vstack((self.starRevisit, revisit))
1✔
490
            else:
491
                self.starRevisit[revInd, 1] = revisit[1]  # over
×
492

493
    def observation_characterization(self, sInd, mode):
1✔
494
        """Finds if characterizations are possible and relevant information
495

496
        Args:
497
            sInd (int):
498
                Integer index of the star of interest
499
            mode (dict):
500
                Selected observing mode for characterization
501

502
        Returns:
503
            tuple:
504
                characterized (integer list):
505
                    Characterization status for each planet orbiting the observed
506
                    target star including False Alarm if any, where 1 is full spectrum,
507
                    -1 partial spectrum, and 0 not characterized
508
                fZ (astropy Quantity):
509
                    Surface brightness of local zodiacal light in units of 1/arcsec2
510
                JEZ (astropy.units.Quantity(numpy.ndarray(float))):
511
                    Intensity of exo-zodiacal light in units of photons/s/m2/arcsec2
512
                systemParams (dict):
513
                    Dictionary of time-dependant planet properties averaged over the
514
                    duration of the integration
515
                SNR (float ndarray):
516
                    Characterization signal-to-noise ratio of the observable planets.
517
                    Defaults to None.
518
                intTime (astropy Quantity):
519
                    Selected star characterization time in units of day.
520
                    Defaults to None.
521

522
        """
523

524
        OS = self.OpticalSystem
1✔
525
        ZL = self.ZodiacalLight
1✔
526
        TL = self.TargetList
1✔
527
        SU = self.SimulatedUniverse
1✔
528
        Obs = self.Observatory
1✔
529
        TK = self.TimeKeeping
1✔
530

531
        # find indices of planets around the target
532
        pInds = np.where(SU.plan2star == sInd)[0]
1✔
533

534
        # get the detected status, and check if there was a FA
535
        det = self.lastDetected[sInd, 0]
1✔
536
        FA = len(det) == len(pInds) + 1
1✔
537
        if FA:
1✔
538
            pIndsDet = np.append(pInds, -1)[det]
×
539
        else:
540
            pIndsDet = pInds[det]
1✔
541

542
        # initialize outputs, and check if there's anything (planet or FA) to
543
        # characterize
544
        characterized = np.zeros(len(det), dtype=int)
1✔
545
        fZ = 0.0 / u.arcsec**2
1✔
546
        systemParams = SU.dump_system_params(
1✔
547
            sInd
548
        )  # write current system params by default
549
        JEZ = np.zeros(len(det)) * u.ph / u.s / u.m**2 / u.arcsec**2
1✔
550
        SNR = np.zeros(len(det))
1✔
551
        intTime = None
1✔
552
        if len(det) == 0:  # nothing to characterize
1✔
NEW
553
            return characterized, fZ, JEZ, systemParams, SNR, intTime
×
554

555
        # look for last detected planets that have not been fully characterized
556
        if not (FA):  # only true planets, no FA
1✔
557
            tochar = self.fullSpectra[pIndsDet] == 0
1✔
558
        else:  # mix of planets and a FA
559
            truePlans = pIndsDet[:-1]
×
560
            tochar = np.append((self.fullSpectra[truePlans] == 0), True)
×
561

562
        # 1/ find spacecraft orbital START position including overhead time,
563
        # and check keepout angle
564
        if np.any(tochar):
1✔
565
            # start times
566
            startTime = (
1✔
567
                TK.currentTimeAbs.copy() + mode["syst"]["ohTime"] + Obs.settlingTime
568
            )
569
            startTimeNorm = (
1✔
570
                TK.currentTimeNorm.copy() + mode["syst"]["ohTime"] + Obs.settlingTime
571
            )
572
            # planets to characterize
573
            koTimeInd = np.where(np.round(startTime.value) - self.koTimes.value == 0)[
1✔
574
                0
575
            ][
576
                0
577
            ]  # find indice where koTime is startTime[0]
578
            # wherever koMap is 1, the target is observable
579
            koMap = self.koMaps[mode["syst"]["name"]]
1✔
580
            tochar[tochar] = koMap[sInd][koTimeInd]
1✔
581

582
        # 2/ if any planet to characterize, find the characterization times
583
        # at the detected JEZ, dMag, and WA
584
        if np.any(tochar):
1✔
585
            is_earthlike = np.logical_and(
1✔
586
                np.array([(p in self.earth_candidates) for p in pIndsDet]), tochar
587
            )
588

589
            fZ = ZL.fZ(Obs, TL, sInd, startTime, mode)
1✔
590
            JEZ = self.lastDetected[sInd, 1][det][tochar]
1✔
591
            dMag = self.lastDetected[sInd, 2][det][tochar]
1✔
592
            WA = self.lastDetected[sInd, 3][det][tochar] * u.arcsec
1✔
593
            WA[is_earthlike[tochar]] = SU.WA[pIndsDet[is_earthlike]]
1✔
594
            dMag[is_earthlike[tochar]] = SU.dMag[pIndsDet[is_earthlike]]
1✔
595

596
            intTimes = np.zeros(len(tochar)) * u.day
1✔
597
            intTimes[tochar] = OS.calc_intTime(
1✔
598
                TL, sInd, fZ, JEZ, dMag, WA, mode, TK=self.TimeKeeping
599
            )
600
            intTimes[~np.isfinite(intTimes)] = 0 * u.d
1✔
601
            # add a predetermined margin to the integration times
602
            intTimes = intTimes * (1.0 + self.charMargin)
1✔
603
            # apply time multiplier
604
            totTimes = intTimes * (mode["timeMultiplier"])
1✔
605
            # end times
606
            endTimes = startTime + totTimes
1✔
607
            endTimesNorm = startTimeNorm + totTimes
1✔
608
            # planets to characterize
609
            tochar = (
1✔
610
                (totTimes > 0)
611
                & (totTimes <= OS.intCutoff)
612
                & (endTimesNorm <= TK.OBendTimes[TK.OBnumber])
613
            )
614
        # 3/ is target still observable at the end of any char time?
615
        if np.any(tochar) and Obs.checkKeepoutEnd:
1✔
616
            koTimeInds = np.zeros(len(endTimes.value[tochar]), dtype=int)
1✔
617
            # find index in koMap where each endTime is closest to koTimes
618
            for t, endTime in enumerate(endTimes.value[tochar]):
1✔
619
                if endTime > self.koTimes.value[-1]:
1✔
620
                    # case where endTime exceeds largest koTimes element
621
                    endTimeInBounds = np.where(
×
622
                        np.floor(endTime) - self.koTimes.value == 0
623
                    )[0]
624
                    koTimeInds[t] = (
×
625
                        endTimeInBounds[0] if endTimeInBounds.size != 0 else -1
626
                    )
627
                else:
628
                    koTimeInds[t] = np.where(
1✔
629
                        np.round(endTime) - self.koTimes.value == 0
630
                    )[0][
631
                        0
632
                    ]  # find indice where koTime is endTimes[0]
633
            tochar[tochar] = [koMap[sInd][koT] if koT >= 0 else 0 for koT in koTimeInds]
1✔
634

635
        # 4/ if yes, allocate the overhead time, and perform the characterization
636
        # for the maximum char time
637
        if np.any(tochar):
1✔
638
            # Save Current Time before attempting time allocation
639
            currentTimeNorm = TK.currentTimeNorm.copy()
1✔
640
            currentTimeAbs = TK.currentTimeAbs.copy()
1✔
641

642
            # Allocate Time
643
            if np.any(np.logical_and(is_earthlike, tochar)):
1✔
644
                intTime = np.max(intTimes[np.logical_and(is_earthlike, tochar)])
×
645
            else:
646
                intTime = np.max(intTimes[tochar])
1✔
647
            extraTime = intTime * (mode["timeMultiplier"] - 1.0)  # calculates extraTime
1✔
648
            success = TK.allocate_time(
1✔
649
                intTime + extraTime + mode["syst"]["ohTime"] + Obs.settlingTime, True
650
            )  # allocates time
651
            if not (success):  # Time was not successfully allocated
1✔
652
                # Identical to when "if char_mode['SNR'] not in [0, np.inf]:"
653
                # in run_sim()
654
                char_intTime = None
×
655
                lenChar = len(pInds) + 1 if FA else len(pInds)
×
656
                characterized = np.zeros(lenChar, dtype=float)
×
657
                char_SNR = np.zeros(lenChar, dtype=float)
×
658
                char_fZ = 0.0 / u.arcsec**2
×
NEW
659
                char_JEZ = np.zeros(lenChar) * u.ph / u.s / u.m**2 / u.arcsec**2
×
660
                char_systemParams = SU.dump_system_params(sInd)
×
NEW
661
                return (
×
662
                    characterized,
663
                    char_fZ,
664
                    char_JEZ,
665
                    char_systemParams,
666
                    char_SNR,
667
                    char_intTime,
668
                )
669

670
            pIndsChar = pIndsDet[tochar]
1✔
671
            log_char = "   - Charact. planet inds %s (%s/%s detected)" % (
1✔
672
                pIndsChar,
673
                len(pIndsChar),
674
                len(pIndsDet),
675
            )
676
            self.logger.info(log_char)
1✔
677
            self.vprint(log_char)
1✔
678

679
            # SNR CALCULATION:
680
            # first, calculate SNR for observable planets (without false alarm)
681
            planinds = pIndsChar[:-1] if pIndsChar[-1] == -1 else pIndsChar
1✔
682
            SNRplans = np.zeros(len(planinds))
1✔
683
            if len(planinds) > 0:
1✔
684
                # initialize arrays for SNR integration
685
                fZs = np.zeros(self.ntFlux) / u.arcsec**2
1✔
686
                systemParamss = np.empty(self.ntFlux, dtype="object")
1✔
687
                Ss = np.zeros((self.ntFlux, len(planinds)))
1✔
688
                Ns = np.zeros((self.ntFlux, len(planinds)))
1✔
689
                # integrate the signal (planet flux) and noise
690
                dt = intTime / float(self.ntFlux)
1✔
691
                timePlus = (
1✔
692
                    Obs.settlingTime.copy() + mode["syst"]["ohTime"].copy()
693
                )  # accounts for the time since the current time
694
                for i in range(self.ntFlux):
1✔
695
                    # allocate first half of dt
696
                    timePlus += dt / 2.0
1✔
697
                    # calculate current zodiacal light brightness
698
                    fZs[i] = ZL.fZ(Obs, TL, sInd, currentTimeAbs + timePlus, mode)[0]
1✔
699
                    # propagate the system to match up with current time
700
                    SU.propag_system(
1✔
701
                        sInd, currentTimeNorm + timePlus - self.propagTimes[sInd]
702
                    )
703
                    self.propagTimes[sInd] = currentTimeNorm + timePlus
1✔
704
                    # save planet parameters
705
                    systemParamss[i] = SU.dump_system_params(sInd)
1✔
706
                    # calculate signal and noise (electron count rates)
707
                    Ss[i, :], Ns[i, :] = self.calc_signal_noise(
1✔
708
                        sInd, planinds, dt, mode, fZ=fZs[i]
709
                    )
710
                    # allocate second half of dt
711
                    timePlus += dt / 2.0
1✔
712

713
                # average output parameters
714
                fZ = np.mean(fZs)
1✔
715
                systemParams = {
1✔
716
                    key: sum([systemParamss[x][key] for x in range(self.ntFlux)])
717
                    / float(self.ntFlux)
718
                    for key in sorted(systemParamss[0])
719
                }
720
                # calculate planets SNR
721
                S = Ss.sum(0)
1✔
722
                N = Ns.sum(0)
1✔
723
                SNRplans[N > 0] = S[N > 0] / N[N > 0]
1✔
724
                # allocate extra time for timeMultiplier
725

726
            # if only a FA, just save zodiacal brightness in the middle of the
727
            # integration
728
            else:
729
                totTime = intTime * (mode["timeMultiplier"])
×
730
                fZ = ZL.fZ(Obs, TL, sInd, currentTimeAbs + totTime / 2.0, mode)[0]
×
731

732
            # calculate the false alarm SNR (if any)
733
            SNRfa = []
1✔
734
            if pIndsChar[-1] == -1:
1✔
NEW
735
                JEZ = self.lastDetected[sInd, 1][-1]
×
736
                dMag = self.lastDetected[sInd, 2][-1]
×
737
                WA = self.lastDetected[sInd, 3][-1] * u.arcsec
×
738
                C_p, C_b, C_sp = OS.Cp_Cb_Csp(
×
739
                    TL, sInd, fZ, JEZ, dMag, WA, mode, TK=self.TimeKeeping
740
                )
741
                S = (C_p * intTime).decompose().value
×
742
                N = np.sqrt((C_b * intTime + (C_sp * intTime) ** 2).decompose().value)
×
743
                SNRfa = S / N if N > 0 else 0.0
×
744

745
            # save all SNRs (planets and FA) to one array
746
            SNRinds = np.where(det)[0][tochar]
1✔
747
            SNR[SNRinds] = np.append(SNRplans, SNRfa)
1✔
748

749
            # now, store characterization status: 1 for full spectrum,
750
            # -1 for partial spectrum, 0 for not characterized
751
            char = SNR >= mode["SNR"]
1✔
752
            # initialize with full spectra
753
            characterized = char.astype(int)
1✔
754
            WAchar = self.lastDetected[sInd, 3][char] * u.arcsec
1✔
755
            # find the current WAs of characterized planets
756
            WAs = systemParams["WA"]
1✔
757
            if FA:
1✔
758
                WAs = np.append(WAs, self.lastDetected[sInd, 3][-1] * u.arcsec)
×
759
            # check for partial spectra
760
            IWA_max = mode["IWA"] * (1 + mode["BW"] / 2.0)
1✔
761
            OWA_min = mode["OWA"] * (1 - mode["BW"] / 2.0)
1✔
762
            char[char] = (WAchar < IWA_max) | (WAchar > OWA_min)
1✔
763
            characterized[char] = -1
1✔
764
            # encode results in spectra lists (only for planets, not FA)
765
            charplans = characterized[:-1] if FA else characterized
1✔
766
            self.fullSpectra[pInds[charplans == 1]] += 1
1✔
767
            self.partialSpectra[pInds[charplans == -1]] += 1
1✔
768

769
        return characterized.astype(int), fZ, JEZ, systemParams, SNR, intTime
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc