• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

moeyensj / thor / 11277630968

10 Oct 2024 03:54PM UTC coverage: 73.794% (-0.2%) from 73.99%
11277630968

Pull #166

github

web-flow
Merge 4ac6449af into 3172ac203
Pull Request #166: Swap to pdm

213 of 288 new or added lines in 37 files covered. (73.96%)

2768 of 3751 relevant lines covered (73.79%)

0.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.64
/src/thor/range_and_transform.py
1
import logging
1✔
2
import multiprocessing as mp
1✔
3
import time
1✔
4
from typing import Optional, Type, Union
1✔
5

6
import quivr as qv
1✔
7
import ray
1✔
8
from adam_core.coordinates import (
1✔
9
    CartesianCoordinates,
10
    OriginCodes,
11
    transform_coordinates,
12
)
13
from adam_core.propagator import Propagator
1✔
14
from adam_core.propagator.adam_pyoorb import PYOORBPropagator
1✔
15
from adam_core.ray_cluster import initialize_use_ray
1✔
16

17
from .observations.observations import Observations
1✔
18
from .orbit import RangedPointSourceDetections, TestOrbitEphemeris, TestOrbits
1✔
19
from .projections import GnomonicCoordinates
1✔
20

21
__all__ = [
1✔
22
    "TransformedDetections",
23
    "range_and_transform",
24
]
25

26

27
logger = logging.getLogger(__name__)
1✔
28

29

30
class TransformedDetections(qv.Table):
1✔
31
    id = qv.LargeStringColumn()
1✔
32
    coordinates = GnomonicCoordinates.as_column()
1✔
33
    state_id = qv.LargeStringColumn()
1✔
34

35

36
def range_and_transform_worker(
1✔
37
    ranged_detections: RangedPointSourceDetections,
38
    observations: Observations,
39
    ephemeris: TestOrbitEphemeris,
40
    state_id: str,
41
) -> TransformedDetections:
42
    """
43
    Given ranged detections and their original observations, transform these to the gnomonic tangent
44
    plane centered on the motion of the test orbit for a single state.
45

46
    Parameters
47
    ----------
48
    ranged_detections
49
        Spherical coordinates that have been ranged mapped by state id
50
    observations
51
        The observations from which the ranged detections originate. These should be sorted one-to-one
52
        with the ranged detections
53
    ephemeris
54
        Ephemeris from which to extract the test orbit's aberrated state.
55
    state_id
56
        The ID for this particular state.
57

58
    Returns
59
    -------
60
    transformed_detections
61
        Detections transformed to a gnomonic tangent plane centered on the motion of the
62
        test orbit.
63
    """
64
    # Select the detections and ephemeris for this state id
65
    ranged_detections_spherical_state = ranged_detections.select("state_id", state_id)
1✔
66
    ephemeris_state = ephemeris.select("id", state_id)
1✔
67
    observations_state = observations.select("state_id", state_id)
1✔
68

69
    ranged_detections_cartesian_state = transform_coordinates(
1✔
70
        ranged_detections_spherical_state.coordinates,
71
        representation_out=CartesianCoordinates,
72
        frame_out="ecliptic",
73
        origin_out=OriginCodes.SUN,
74
    )
75

76
    # Transform the detections into the co-rotating frame
77
    return TransformedDetections.from_kwargs(
1✔
78
        id=observations_state.id,
79
        coordinates=GnomonicCoordinates.from_cartesian(
80
            ranged_detections_cartesian_state,
81
            center_cartesian=ephemeris_state.ephemeris.aberrated_coordinates,
82
        ),
83
        state_id=observations_state.state_id,
84
    )
85

86

87
range_and_transform_remote = ray.remote(range_and_transform_worker)
1✔
88
range_and_transform_remote = range_and_transform_remote.options(
1✔
89
    num_cpus=1,
90
    num_returns=1,
91
)
92

93

94
def range_and_transform(
1✔
95
    test_orbit: TestOrbits,
96
    observations: Union[Observations, ray.ObjectRef],
97
    propagator: Type[Propagator] = PYOORBPropagator,
98
    propagator_kwargs: dict = {},
99
    max_processes: Optional[int] = 1,
100
) -> TransformedDetections:
101
    """
102
    Range observations for a single test orbit and transform them into a
103
    gnomonic projection centered on the motion of the test orbit (co-rotating
104
    frame).
105

106
    Parameters
107
    ----------
108
    test_orbit : `~thor.orbit.TestOrbits`
109
        Test orbit to use to gather and transform observations.
110
    observations : `~thor.observations.observations.Observations`
111
        Observations from which range and transform the detections.
112
    propagator : `~adam_core.propagator.propagator.Propagator`
113
        Propagator to use to propagate the test orbit and generate
114
        ephemerides.
115
    max_processes : int, optional
116
        Maximum number of processes to use for parallelization. If
117
        an existing ray cluster is already running, this parameter
118
        will be ignored if larger than 1 or not None.
119

120
    Returns
121
    -------
122
    transformed_detections : `~thor.main.TransformedDetections`
123
        The transformed detections as gnomonic coordinates
124
        of the observations in the co-rotating frame.
125
    """
126
    time_start = time.perf_counter()
1✔
127
    logger.info("Running range and transform...")
1✔
128

129
    if len(test_orbit) != 1:
1✔
NEW
130
        raise ValueError(f"range_and_transform received {len(test_orbit)} orbits but expected 1.")
×
131

132
    logger.info(f"Assuming r = {test_orbit.coordinates.r[0]} au")
1✔
133
    logger.info(f"Assuming v = {test_orbit.coordinates.v[0]} au/d")
1✔
134

135
    if isinstance(observations, ray.ObjectRef):
1✔
136
        observations_ref = observations
1✔
137
        observations = ray.get(observations)
1✔
138
        logger.info("Retrieved observations from the object store.")
1✔
139
    else:
140
        observations_ref = None
1✔
141

142
    prop = propagator(**propagator_kwargs)
1✔
143

144
    if len(observations) > 0:
1✔
145
        # Compute the ephemeris of the test orbit (this will be cached)
146
        ephemeris = test_orbit.generate_ephemeris_from_observations(
1✔
147
            observations,
148
            propagator=prop,
149
            max_processes=max_processes,
150
        )
151

152
        # Assume that the heliocentric distance of all point sources in
153
        # the observations are the same as that of the test orbit
154
        ranged_detections_spherical = test_orbit.range_observations(
1✔
155
            observations,
156
            propagator=prop,
157
            max_processes=max_processes,
158
        )
159

160
        transformed_detections = TransformedDetections.empty()
1✔
161

162
        if max_processes is None:
1✔
163
            max_processes = mp.cpu_count()
×
164

165
        use_ray = initialize_use_ray(num_cpus=max_processes)
1✔
166
        if use_ray:
1✔
167
            refs_to_free = []
1✔
168
            if observations_ref is None:
1✔
169
                observations_ref = ray.put(observations)
×
170
                refs_to_free.append(observations_ref)
×
171
                logger.info("Placed observations in the object store.")
×
172

173
            if not isinstance(ephemeris, ray.ObjectRef):
1✔
174
                ephemeris_ref = ray.put(ephemeris)
1✔
175
                refs_to_free.append(ephemeris_ref)
1✔
176
                logger.info("Placed ephemeris in the object store.")
1✔
177
            else:
178
                ephemeris_ref = ephemeris
×
179

180
            ranged_detections_spherical_ref = ray.put(ranged_detections_spherical)
1✔
181

182
            # Get state IDs
183
            state_ids = observations.state_id.unique()
1✔
184
            futures = []
1✔
185
            for state_id in state_ids:
1✔
186
                futures.append(
1✔
187
                    range_and_transform_remote.remote(
188
                        ranged_detections_spherical_ref,
189
                        observations_ref,
190
                        ephemeris_ref,
191
                        state_id,
192
                    )
193
                )
194

195
                if len(futures) >= max_processes * 1.5:
1✔
196
                    finished, futures = ray.wait(futures, num_returns=1)
1✔
197
                    transformed_detections = qv.concatenate([transformed_detections, ray.get(finished[0])])
1✔
198
                    if transformed_detections.fragmented():
1✔
199
                        transformed_detections = qv.defragment(transformed_detections)
×
200

201
            while futures:
1✔
202
                finished, futures = ray.wait(futures, num_returns=1)
1✔
203
                transformed_detections = qv.concatenate([transformed_detections, ray.get(finished[0])])
1✔
204
                if transformed_detections.fragmented():
1✔
205
                    transformed_detections = qv.defragment(transformed_detections)
×
206

207
            if len(refs_to_free) > 0:
1✔
208
                ray.internal.free(refs_to_free)
1✔
209
                logger.info(f"Removed {len(refs_to_free)} references from the object store.")
1✔
210

211
        else:
212
            # Get state IDs
213
            state_ids = observations.state_id.unique()
1✔
214
            for state_id in state_ids:
1✔
215
                # mask = pc.equal(state_id, observations.state_id)
216

217
                chunk = range_and_transform_worker(
1✔
218
                    ranged_detections_spherical.select("state_id", state_id),
219
                    observations.select("state_id", state_id),
220
                    ephemeris.select("id", state_id),
221
                    state_id,
222
                )
223
                transformed_detections = qv.concatenate([transformed_detections, chunk])
1✔
224
                if transformed_detections.fragmented():
1✔
225
                    transformed_detections = qv.defragment(transformed_detections)
×
226

227
        transformed_detections = transformed_detections.sort_by(by=["state_id"])
1✔
228

229
    else:
230
        transformed_detections = TransformedDetections.empty()
×
231

232
    time_end = time.perf_counter()
1✔
233
    logger.info(f"Transformed {len(transformed_detections)} observations.")
1✔
234
    logger.info(f"Range and transform completed in {time_end - time_start:.3f} seconds.")
1✔
235
    return transformed_detections
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc