• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

moeyensj / thor / 18550563085

16 Oct 2025 04:44AM UTC coverage: 71.601%. First build
18550563085

Pull #169

github

web-flow
Merge cf159162f into cc8d68a8c
Pull Request #169: WIP: Kk/test orbit volumes

313 of 636 new or added lines in 7 files covered. (49.21%)

3149 of 4398 relevant lines covered (71.6%)

0.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

30.77
/src/thor/orbit_selection.py
1
import logging
1✔
2
import multiprocessing as mp
1✔
3
import time
1✔
4
from dataclasses import dataclass
1✔
5
from typing import Optional, Type, Union
1✔
6

7
import numpy as np
1✔
8
import pyarrow as pa
1✔
9
import pyarrow.compute as pc
1✔
10
import pyarrow.parquet as pq
1✔
11
import quivr as qv
1✔
12
import ray
1✔
13
from adam_core.coordinates import KeplerianCoordinates
1✔
14
from adam_core.observers import Observers
1✔
15
from adam_core.orbits import Ephemeris, Orbits
1✔
16
from adam_core.propagator import Propagator
1✔
17

18
try:
1✔
19
    from adam_core.utils.iter import _iterate_chunks
1✔
20
except ImportError:
1✔
21
    try:
1✔
22
        from adam_core.propagator.utils import _iterate_chunks
1✔
NEW
23
    except ImportError:
×
NEW
24
        from adam_core.propagator import _iterate_chunks
×
25
from adam_core.ray_cluster import initialize_use_ray
1✔
26
from adam_core.time import Timestamp
1✔
27

28
from thor.observations import Observations
1✔
29
from thor.orbit import TestOrbits
1✔
30

31
from .observations.utils import calculate_healpixels
1✔
32

33
logger = logging.getLogger(__name__)
1✔
34

35
__all__ = ["generate_test_orbits"]
1✔
36

37

38
@dataclass
1✔
39
class KeplerianPhaseSpace:
1✔
40
    a_min: float = -1_000_000.0
1✔
41
    a_max: float = 1_000_000.0
1✔
42
    e_min: float = 0.0
1✔
43
    e_max: float = 1_000.0
1✔
44
    i_min: float = 0.0
1✔
45
    i_max: float = 180.0
1✔
46

47

48
def select_average_within_region(coordinates: KeplerianCoordinates) -> int:
1✔
49
    """
50
    Select the Keplerian coordinate as close to the median in semi-major axis,
51
    eccentricity, and inclination.
52

53
    Parameters
54
    ----------
55
    coordinates
56
        Keplerian coordinates to select from.
57

58
    Returns
59
    -------
60
    index
61
        Index of the selected coordinates.
62
    """
63
    keplerian = coordinates.values
1✔
64
    aei = keplerian[:, 0:3]
1✔
65

66
    median = np.median(aei, axis=0)
1✔
67
    percent_diff = np.abs((aei - median) / median)
1✔
68

69
    # Sum the percent differences
70
    summed_diff = np.sum(percent_diff, axis=1)
1✔
71

72
    # Find the minimum summed percent difference and call that
73
    # the average object
74
    index = np.where(summed_diff == np.min(summed_diff))[0][0]
1✔
75
    return index
1✔
76

77

78
def select_test_orbits(ephemeris: Ephemeris, orbits: Orbits) -> Orbits:
1✔
79
    """
80
    Select test orbits from orbits using the predicted ephemeris
81
    for different regions of Keplerian phase space.
82

83
    The regions are:
84
    - 3 in the Hungarias
85
    - 5 in the main belt
86
    - 1 in the outer solar system
87

88
    Parameters
89
    ----------
90
    ephemeris
91
        Ephemeris for the orbits.
92
    orbits
93
        Orbits to select from.
94

95
    Returns
96
    -------
97
    test_orbits
98
        Test orbits selected from the orbits.
99
    """
100
    orbits_patch = orbits.apply_mask(pc.is_in(orbits.orbit_id, ephemeris.orbit_id))
×
101

102
    # Convert to keplerian coordinates
103
    keplerian = orbits_patch.coordinates.to_keplerian()
×
104

105
    # Create 3 phase space regions for the Hungarias
106
    hungarias_01 = KeplerianPhaseSpace(
×
107
        a_min=1.7,
108
        a_max=2.06,
109
        e_max=0.1,
110
    )
111
    hungarias_02 = KeplerianPhaseSpace(
×
112
        a_min=hungarias_01.a_min,
113
        a_max=hungarias_01.a_max,
114
        e_min=hungarias_01.e_max,
115
        e_max=0.2,
116
    )
117
    hungarias_03 = KeplerianPhaseSpace(
×
118
        a_min=hungarias_01.a_min,
119
        a_max=hungarias_01.a_max,
120
        e_min=hungarias_02.e_max,
121
        e_max=0.4,
122
    )
123

124
    # Create 5 phase space regions for the rest of the main belt
125
    mainbelt_01 = KeplerianPhaseSpace(
×
126
        a_min=hungarias_03.a_max,
127
        a_max=2.5,
128
        e_max=0.5,
129
    )
130
    mainbelt_02 = KeplerianPhaseSpace(
×
131
        a_min=mainbelt_01.a_max,
132
        a_max=2.82,
133
        e_max=0.5,
134
    )
135
    mainbelt_03 = KeplerianPhaseSpace(
×
136
        a_min=mainbelt_02.a_max,
137
        a_max=2.95,
138
        e_max=0.5,
139
    )
140
    mainbelt_04 = KeplerianPhaseSpace(
×
141
        a_min=mainbelt_03.a_max,
142
        a_max=3.27,
143
        e_max=0.5,
144
    )
145
    mainbelt_05 = KeplerianPhaseSpace(
×
146
        a_min=mainbelt_04.a_max,
147
        a_max=5.0,
148
        e_max=0.5,
149
    )
150

151
    # Create 1 phase space region for trojans, TNOs, etc..
152
    outer = KeplerianPhaseSpace(
×
153
        a_min=mainbelt_05.a_max,
154
        a_max=50.0,
155
        e_max=0.5,
156
    )
157

158
    phase_space_regions = [
×
159
        hungarias_01,
160
        hungarias_02,
161
        hungarias_03,
162
        mainbelt_01,
163
        mainbelt_02,
164
        mainbelt_03,
165
        mainbelt_04,
166
        mainbelt_05,
167
        outer,
168
    ]
169

170
    test_orbits = []
×
171
    for region in phase_space_regions:
×
172
        mask = pc.and_(
×
173
            pc.and_(
174
                pc.and_(
175
                    pc.and_(
176
                        pc.and_(
177
                            pc.greater_equal(keplerian.a, region.a_min),
178
                            pc.less(keplerian.a, region.a_max),
179
                        ),
180
                        pc.greater_equal(keplerian.e, region.e_min),
181
                    ),
182
                    pc.less(keplerian.e, region.e_max),
183
                ),
184
                pc.greater_equal(keplerian.i, region.i_min),
185
            ),
186
            pc.less(keplerian.i, region.i_max),
187
        )
188

189
        keplerian_region = keplerian.apply_mask(mask)
×
190
        orbits_region = orbits_patch.apply_mask(mask)
×
191

192
        if len(keplerian_region) != 0:
×
193
            index = select_average_within_region(keplerian_region)
×
194
            test_orbits.append(orbits_region[int(index)])
×
195

196
    if len(test_orbits) > 0:
×
197
        return qv.concatenate(test_orbits)
×
198
    else:
199
        return Orbits.empty()
×
200

201

202
def generate_test_orbits_worker(
1✔
203
    healpixel_chunk: pa.Array,
204
    ephemeris_healpixels: pa.Array,
205
    propagated_orbits: Union[Orbits, ray.ObjectRef],
206
    ephemeris: Union[Ephemeris, ray.ObjectRef],
207
) -> TestOrbits:
208
    """
209
    Worker function for generating test orbits.
210

211
    Parameters
212
    ----------
213
    healpixel_chunk
214
        Healpixels to generate test orbits for.
215
    ephemeris_healpixels
216
        Healpixels for the ephemeris.
217
    propagated_orbits
218
        Propagated orbits.
219
    ephemeris
220
        Ephemeris for the propagated orbits.
221

222
    Returns
223
    -------
224
    test_orbits
225
        Test orbits generated from the propagated orbits.
226
    """
227
    test_orbits_list = []
×
228

229
    # Filter the ephemerides to only those in the observations
230
    ephemeris_mask = pc.is_in(ephemeris_healpixels, healpixel_chunk)
×
231
    ephemeris_filtered = ephemeris.apply_mask(ephemeris_mask)
×
232
    ephemeris_healpixels = pc.filter(ephemeris_healpixels, ephemeris_mask)
×
233
    logger.info(f"{len(ephemeris_filtered)} orbit ephemerides overlap with the observations.")
×
234

235
    # Filter the orbits to only those in the ephemeris
236
    orbits_filtered = propagated_orbits.apply_mask(
×
237
        pc.is_in(propagated_orbits.orbit_id, ephemeris_filtered.orbit_id)
238
    )
239

240
    logger.info("Selecting test orbits from the orbit catalog...")
×
241
    for healpixel in healpixel_chunk:
×
242
        healpixel_mask = pc.equal(ephemeris_healpixels, healpixel)
×
243
        ephemeris_healpixel = ephemeris_filtered.apply_mask(healpixel_mask)
×
244

245
        if len(ephemeris_healpixel) == 0:
×
246
            logger.debug(f"No ephemerides in healpixel {healpixel}.")
×
247
            continue
×
248

249
        test_orbits_healpixel = select_test_orbits(ephemeris_healpixel, orbits_filtered)
×
250

251
        if len(test_orbits_healpixel) > 0:
×
252
            test_orbits_list.append(
×
253
                TestOrbits.from_kwargs(
254
                    orbit_id=test_orbits_healpixel.orbit_id,
255
                    object_id=test_orbits_healpixel.object_id,
256
                    coordinates=test_orbits_healpixel.coordinates,
257
                    bundle_id=[healpixel for _ in range(len(test_orbits_healpixel))],
258
                )
259
            )
260
        else:
261
            logger.debug(f"No orbits in healpixel {healpixel}.")
×
262

263
    if len(test_orbits_list) > 0:
×
264
        test_orbits = qv.concatenate(test_orbits_list)
×
265
    else:
266
        test_orbits = TestOrbits.empty()
×
267

268
    return test_orbits
×
269

270

271
generate_test_orbits_worker_remote = ray.remote(generate_test_orbits_worker)
1✔
272
generate_test_orbits_worker_remote.options(num_cpus=1, num_returns=1)
1✔
273

274

275
def generate_test_orbits(
1✔
276
    observations: Union[str, Observations],
277
    catalog: Orbits,
278
    propagator_class: Type[Propagator],
279
    nside: int = 32,
280
    max_processes: Optional[int] = None,
281
    chunk_size: int = 100,
282
) -> TestOrbits:
283
    """
284
    Given observations and a catalog of known orbits generate test orbits
285
    from the catalog. The observations are divded into healpixels (with size determined
286
    by the nside parameter). For each healpixel in observations, select up to 9 orbits from
287
    the catalog that are in the same healpixel as the observations. The orbits are selected
288
    in bins of semi-major axis, eccentricity, and inclination.
289

290
    The catalog will be propagated to start time of the observations using the propagator
291
    and ephemerides will be generated for the propagated orbits (assuming a geocentric observer).
292

293
    Parameters
294
    ----------
295
    observations
296
        Observations for which to generate test orbits. These observations can
297
        be an in-memory Observations object or a path to a parquet file containing the
298
        observations.
299
    catalog
300
        Catalog of known orbits.
301
    nside
302
        Healpixel size.
303
    propagator
304
        Propagator to use to propagate the orbits.
305
    max_processes
306
        Maximum number of processes to use while propagating orbits and
307
        generating ephemerides.
308
    chunk_size
309
        The maximum number of unique healpixels for which to generate test orbits per
310
        process. This function will dynamically compute the chunk size based on the
311
        number of unique healpixels and the number of processes. The dynamic chunk
312
        size will never exceed the given value.
313

314
    Returns
315
    -------
316
    test_orbits
317
        Test orbits generated from the catalog.
318
    """
319
    time_start = time.perf_counter()
×
320
    logger.info("Generating test orbits...")
×
321

322
    propagator = propagator_class()
×
323

324
    # If the input file is a string, read in the days column to
325
    # extract the minimum time
326
    if isinstance(observations, str):
×
327
        table = pq.read_table(observations, columns=["coordinates.time.days"], memory_map=True)
×
328

329
        min_day = pc.min(table["days"]).as_py()
×
330
        # Set the start time to the midnight of the first night of observations
331
        start_time = Timestamp.from_kwargs(days=[min_day], nanos=[0], scale="utc")
×
332
        del table
×
333
    elif isinstance(observations, Observations):
×
334
        # Extract the minimum time from the observations
335
        earliest_time = observations.coordinates.time.min()
×
336

337
        # Set the start time to the midnight of the first night of observations
338
        start_time = Timestamp.from_kwargs(days=earliest_time.days, nanos=[0], scale="utc")
×
339
    else:
340
        raise ValueError(
×
341
            f"observations must be a path to a parquet file or an Observations object. Got {type(observations)}."
342
        )
343

344
    # Propagate the orbits to the minimum time
345
    logger.info("Propagating orbits to the start time of the observations...")
×
346
    propagation_start_time = time.perf_counter()
×
347
    propagated_orbits = propagator.propagate_orbits(
×
348
        catalog,
349
        start_time,
350
        max_processes=max_processes,
351
        chunk_size=500,
352
    )
353
    propagation_end_time = time.perf_counter()
×
354
    logger.info(f"Propagation completed in {propagation_end_time - propagation_start_time:.3f} seconds.")
×
355

356
    # Create a geocentric observer for the observations
357
    logger.info("Generating ephemerides for the propagated orbits...")
×
358
    ephemeris_start_time = time.perf_counter()
×
359
    observers = Observers.from_code("500", start_time)
×
360

361
    # Generate ephemerides for the propagated orbits
362
    ephemeris = propagator.generate_ephemeris(
×
363
        propagated_orbits,
364
        observers,
365
        start_time,
366
        max_processes=max_processes,
367
        chunk_size=1000,
368
    )
369
    ephemeris_end_time = time.perf_counter()
×
370
    logger.info(f"Ephemeris generation completed in {ephemeris_end_time - ephemeris_start_time:.3f} seconds.")
×
371

372
    if isinstance(observations, str):
×
373
        table = pq.read_table(
×
374
            observations,
375
            columns=["coordinates.lon", "coordinates.lat"],
376
            memory_map=True,
377
        )
378
        lon = table["lon"].to_numpy(zero_copy_only=False)
×
379
        lat = table["lat"].to_numpy(zero_copy_only=False)
×
380
        del table
×
381

382
    else:
383
        lon = observations.coordinates.lon.to_numpy(zero_copy_only=False)
×
384
        lat = observations.coordinates.lat.to_numpy(zero_copy_only=False)
×
385

386
    # Calculate the healpixels for observations and ephemerides
387
    # Here we want the unique healpixels so we can cross match against our
388
    # catalog's predicted ephemeris
389
    observations_healpixels = calculate_healpixels(
×
390
        lon,
391
        lat,
392
        nside=nside,
393
    )
394
    observations_healpixels = pc.unique(pa.array(observations_healpixels))
×
395
    logger.info(f"Observations occur in {len(observations_healpixels)} unique healpixels.")
×
396

397
    # Calculate the healpixels for each ephemeris
398
    # We do not want unique healpixels here because we want to
399
    # select orbits from the same healpixel as the observations
400
    ephemeris_healpixels = calculate_healpixels(
×
401
        ephemeris.coordinates.lon.to_numpy(zero_copy_only=False),
402
        ephemeris.coordinates.lat.to_numpy(zero_copy_only=False),
403
        nside=nside,
404
    )
405
    ephemeris_healpixels = pa.array(ephemeris_healpixels)
×
406

407
    # Dynamically compute the chunk size based on the number of healpixels
408
    # and the number of processes
409
    if max_processes is None:
×
410
        max_processes = mp.cpu_count()
×
411

412
    chunk_size = np.minimum(np.ceil(len(observations_healpixels) / max_processes).astype(int), chunk_size)
×
413
    logger.info(f"Generating test orbits with a chunk size of {chunk_size} healpixels.")
×
414

415
    test_orbits = TestOrbits.empty()
×
416
    use_ray = initialize_use_ray(num_cpus=max_processes)
×
417
    if use_ray:
×
418

419
        ephemeris_ref = ray.put(ephemeris)
×
420
        ephemeris_healpixels_ref = ray.put(ephemeris_healpixels)
×
421
        propagated_orbits_ref = ray.put(propagated_orbits)
×
422

423
        futures = []
×
424
        for healpixel_chunk in _iterate_chunks(observations_healpixels, chunk_size):
×
425
            futures.append(
×
426
                generate_test_orbits_worker_remote.remote(
427
                    healpixel_chunk,
428
                    ephemeris_healpixels_ref,
429
                    propagated_orbits_ref,
430
                    ephemeris_ref,
431
                )
432
            )
433

434
        while futures:
×
435
            finished, futures = ray.wait(futures, num_returns=1)
×
436
            test_orbits = qv.concatenate([test_orbits, ray.get(finished[0])])
×
437
            if test_orbits.fragmented():
×
438
                test_orbits = qv.defragment(test_orbits)
×
439

440
    else:
441

442
        for healpixel_chunk in _iterate_chunks(observations_healpixels, chunk_size):
×
443
            test_orbits_chunk = generate_test_orbits_worker(
×
444
                healpixel_chunk,
445
                ephemeris_healpixels,
446
                propagated_orbits,
447
                ephemeris,
448
            )
449
            test_orbits = qv.concatenate([test_orbits, test_orbits_chunk])
×
450
            if test_orbits.fragmented():
×
451
                test_orbits = qv.defragment(test_orbits)
×
452

453
    time_end = time.perf_counter()
×
454
    logger.info(f"Selected {len(test_orbits)} test orbits.")
×
455
    logger.info(f"Test orbit generation completed in {time_end - time_start:.3f} seconds.")
×
456
    return test_orbits
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc