• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

moeyensj / thor / 6733525095

02 Nov 2023 01:52PM UTC coverage: 40.544% (+0.9%) from 39.595%
6733525095

push

github

web-flow
Merge pull request #123 from moeyensj/v2.0-link-aims-sample

Link AIMS sample

301 of 301 new or added lines in 12 files covered. (100.0%)

1878 of 4632 relevant lines covered (40.54%)

0.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

13.44
/thor/orbits/attribution.py
1
import os
1✔
2

3
os.environ["OMP_NUM_THREADS"] = "1"
1✔
4
os.environ["OPENBLAS_NUM_THREADS"] = "1"
1✔
5
os.environ["MKL_NUM_THREADS"] = "1"
1✔
6
os.environ["VECLIB_MAXIMUM_THREADS"] = "1"
1✔
7
os.environ["NUMEXPR_NUM_THREADS"] = "1"
1✔
8

9
import concurrent.futures as cf
1✔
10
import logging
1✔
11
import multiprocessing as mp
1✔
12
import time
1✔
13
from functools import partial
1✔
14

15
import numpy as np
1✔
16
import pandas as pd
1✔
17
from astropy.time import Time
1✔
18
from sklearn.neighbors import BallTree
1✔
19

20
from ..utils import (
1✔
21
    _checkParallel,
22
    _initWorker,
23
    calcChunkSize,
24
    removeDuplicateObservations,
25
    sortLinkages,
26
    yieldChunks,
27
)
28
from .ephemeris import generateEphemeris
1✔
29
from .od import differentialCorrection
1✔
30
from .orbits import Orbits
1✔
31
from .residuals import calcResiduals
1✔
32

33
logger = logging.getLogger(__name__)
1✔
34

35
__all__ = ["attribution_worker", "attributeObservations", "mergeAndExtendOrbits"]
1✔
36

37

38
def attribution_worker(
1✔
39
    orbits,
40
    observations,
41
    eps=1 / 3600,
42
    include_probabilistic=True,
43
    backend="PYOORB",
44
    backend_kwargs={},
45
):
46

47
    # Create observer's dictionary from observations
48
    observers = {}
×
49
    for observatory_code in observations["observatory_code"].unique():
×
50
        observers[observatory_code] = Time(
×
51
            observations[observations["observatory_code"].isin([observatory_code])][
52
                "mjd_utc"
53
            ].unique(),
54
            scale="utc",
55
            format="mjd",
56
        )
57

58
    # Genereate ephemerides for each orbit at the observation times
59
    ephemeris = generateEphemeris(
×
60
        orbits,
61
        observers,
62
        backend=backend,
63
        backend_kwargs=backend_kwargs,
64
        num_jobs=1,
65
        chunk_size=1,
66
    )
67

68
    # Group the predicted ephemerides and observations by visit / exposure
69
    ephemeris_grouped = ephemeris.groupby(by=["observatory_code", "mjd_utc"])
×
70
    ephemeris_visits = [
×
71
        ephemeris_grouped.get_group(g) for g in ephemeris_grouped.groups
72
    ]
73
    observations_grouped = observations.groupby(by=["observatory_code", "mjd_utc"])
×
74
    observations_visits = [
×
75
        observations_grouped.get_group(g) for g in observations_grouped.groups
76
    ]
77

78
    # Loop through each unique exposure and visit, find the nearest observations within
79
    # eps (haversine metric)
80
    distances = []
×
81
    orbit_ids_associated = []
×
82
    obs_ids_associated = []
×
83
    obs_times_associated = []
×
84
    eps_rad = np.radians(eps)
×
85
    residuals = []
×
86
    stats = []
×
87
    for ephemeris_visit, observations_visit in zip(
×
88
        ephemeris_visits, observations_visits
89
    ):
90

91
        assert len(ephemeris_visit["mjd_utc"].unique()) == 1
×
92
        assert len(observations_visit["mjd_utc"].unique()) == 1
×
93
        assert (
×
94
            observations_visit["mjd_utc"].unique()[0]
95
            == ephemeris_visit["mjd_utc"].unique()[0]
96
        )
97

98
        obs_ids = observations_visit[["obs_id"]].values
×
99
        obs_times = observations_visit[["mjd_utc"]].values
×
100
        orbit_ids = ephemeris_visit[["orbit_id"]].values
×
101
        coords = observations_visit[["RA_deg", "Dec_deg"]].values
×
102
        coords_latlon = observations_visit[["Dec_deg"]].values
×
103
        coords_predicted = ephemeris_visit[["RA_deg", "Dec_deg"]].values
×
104
        coords_sigma = observations_visit[["RA_sigma_deg", "Dec_sigma_deg"]].values
×
105

106
        # Haversine metric requires latitude first then longitude...
107
        coords_latlon = np.radians(observations_visit[["Dec_deg", "RA_deg"]].values)
×
108
        coords_predicted_latlon = np.radians(
×
109
            ephemeris_visit[["Dec_deg", "RA_deg"]].values
110
        )
111

112
        num_obs = len(coords_predicted)
×
113
        k = np.minimum(3, num_obs)
×
114

115
        # Build BallTree with a haversine metric on predicted ephemeris
116
        tree = BallTree(coords_predicted_latlon, metric="haversine")
×
117
        # Query tree using observed RA, Dec
118
        d, i = tree.query(
×
119
            coords_latlon,
120
            k=k,
121
            return_distance=True,
122
            dualtree=True,
123
            breadth_first=False,
124
            sort_results=False,
125
        )
126

127
        # Select all observations with distance smaller or equal
128
        # to the maximum given distance
129
        mask = np.where(d <= eps_rad)
×
130

131
        if len(d[mask]) > 0:
×
132
            orbit_ids_associated.append(orbit_ids[i[mask]])
×
133
            obs_ids_associated.append(obs_ids[mask[0]])
×
134
            obs_times_associated.append(obs_times[mask[0]])
×
135
            distances.append(d[mask].reshape(-1, 1))
×
136

137
            residuals_visit, stats_visit = calcResiduals(
×
138
                coords[mask[0]],
139
                coords_predicted[i[mask]],
140
                sigmas_actual=coords_sigma[mask[0]],
141
                include_probabilistic=True,
142
            )
143
            residuals.append(residuals_visit)
×
144
            stats.append(np.vstack(stats_visit).T)
×
145

146
    if len(distances) > 0:
×
147
        distances = np.degrees(np.vstack(distances))
×
148
        orbit_ids_associated = np.vstack(orbit_ids_associated)
×
149
        obs_ids_associated = np.vstack(obs_ids_associated)
×
150
        obs_times_associated = np.vstack(obs_times_associated)
×
151
        residuals = np.vstack(residuals)
×
152
        stats = np.vstack(stats)
×
153

154
        attributions = {
×
155
            "orbit_id": orbit_ids_associated[:, 0],
156
            "obs_id": obs_ids_associated[:, 0],
157
            "mjd_utc": obs_times_associated[:, 0],
158
            "distance": distances[:, 0],
159
            "residual_ra_arcsec": residuals[:, 0] * 3600,
160
            "residual_dec_arcsec": residuals[:, 1] * 3600,
161
            "chi2": stats[:, 0],
162
        }
163
        if include_probabilistic:
×
164
            attributions["probability"] = stats[:, 1]
×
165
            attributions["mahalanobis_distance"] = stats[:, 2]
×
166

167
        attributions = pd.DataFrame(attributions)
×
168

169
    else:
170
        columns = [
×
171
            "orbit_id",
172
            "obs_id",
173
            "mjd_utc",
174
            "distance",
175
            "residual_ra_arcsec",
176
            "residual_dec_arcsec",
177
            "chi2",
178
        ]
179
        if include_probabilistic:
×
180
            columns += ["probability", "mahalanobis_distance"]
×
181

182
        attributions = pd.DataFrame(columns=columns)
×
183

184
    return attributions
×
185

186

187
def attributeObservations(
1✔
188
    orbits,
189
    observations,
190
    eps=5 / 3600,
191
    include_probabilistic=True,
192
    backend="PYOORB",
193
    backend_kwargs={},
194
    orbits_chunk_size=10,
195
    observations_chunk_size=100000,
196
    num_jobs=1,
197
    parallel_backend="cf",
198
):
199
    logger.info("Running observation attribution...")
×
200
    time_start = time.time()
×
201

202
    num_orbits = len(orbits)
×
203

204
    attribution_dfs = []
×
205

206
    parallel, num_workers = _checkParallel(num_jobs, parallel_backend)
×
207
    if num_workers > 1:
×
208

209
        if parallel_backend == "ray":
×
210
            import ray
×
211

212
            if not ray.is_initialized():
×
213
                ray.init(address="auto")
×
214

215
            attribution_worker_ray = ray.remote(attribution_worker)
×
216
            attribution_worker_ray = attribution_worker_ray.options(
×
217
                num_returns=1, num_cpus=1
218
            )
219

220
            # Send up to orbits_chunk_size orbits to each OD worker for processing
221
            chunk_size_ = calcChunkSize(
×
222
                num_orbits, num_workers, orbits_chunk_size, min_chunk_size=1
223
            )
224
            orbits_split = orbits.split(chunk_size_)
×
225

226
            obs_oids = []
×
227
            for observations_c in yieldChunks(observations, observations_chunk_size):
×
228
                obs_oids.append(ray.put(observations_c))
×
229

230
            p = []
×
231
            for obs_oid in obs_oids:
×
232
                for orbit_i in orbits_split:
×
233
                    p.append(
×
234
                        attribution_worker_ray.remote(
235
                            orbit_i,
236
                            obs_oid,
237
                            eps=eps,
238
                            include_probabilistic=include_probabilistic,
239
                            backend=backend,
240
                            backend_kwargs=backend_kwargs,
241
                        )
242
                    )
243

244
                attribution_dfs_i = ray.get(p)
×
245
                attribution_dfs += attribution_dfs_i
×
246

247
        elif parallel_backend == "mp":
×
248
            p = mp.Pool(
×
249
                processes=num_workers,
250
                initializer=_initWorker,
251
            )
252

253
            # Send up to orbits_chunk_size orbits to each OD worker for processing
254
            chunk_size_ = calcChunkSize(
×
255
                num_orbits, num_workers, orbits_chunk_size, min_chunk_size=1
256
            )
257
            orbits_split = orbits.split(chunk_size_)
×
258

259
            for observations_c in yieldChunks(observations, observations_chunk_size):
×
260

261
                obs = [observations_c for i in range(len(orbits_split))]
×
262
                attribution_dfs_i = p.starmap(
×
263
                    partial(
264
                        attribution_worker,
265
                        eps=eps,
266
                        include_probabilistic=include_probabilistic,
267
                        backend=backend,
268
                        backend_kwargs=backend_kwargs,
269
                    ),
270
                    zip(
271
                        orbits_split,
272
                        obs,
273
                    ),
274
                )
275
                attribution_dfs += attribution_dfs_i
×
276

277
            p.close()
×
278

279
        elif parallel_backend == "cf":
×
280
            with cf.ProcessPoolExecutor(
×
281
                max_workers=num_workers, initializer=_initWorker
282
            ) as executor:
283
                futures = []
×
284
                for observations_c in yieldChunks(
×
285
                    observations, observations_chunk_size
286
                ):
287
                    for orbit_c in orbits.split(orbits_chunk_size):
×
288
                        futures.append(
×
289
                            executor.submit(
290
                                attribution_worker,
291
                                orbit_c,
292
                                observations_c,
293
                                eps=eps,
294
                                include_probabilistic=include_probabilistic,
295
                                backend=backend,
296
                                backend_kwargs=backend_kwargs,
297
                            )
298
                        )
299
                attribution_dfs = []
×
300
                for future in cf.as_completed(futures):
×
301
                    attribution_dfs.append(future.result())
×
302

303
        else:
304
            raise ValueError(
×
305
                "Invalid parallel_backend '{}'. Must be one of 'ray', 'mp', or 'cf'.".format(
306
                    parallel_backend
307
                )
308
            )
309

310
    else:
311
        for observations_c in yieldChunks(observations, observations_chunk_size):
×
312
            for orbit_c in orbits.split(orbits_chunk_size):
×
313
                attribution_df_i = attribution_worker(
×
314
                    orbit_c,
315
                    observations_c,
316
                    eps=eps,
317
                    include_probabilistic=include_probabilistic,
318
                    backend=backend,
319
                    backend_kwargs=backend_kwargs,
320
                )
321
                attribution_dfs.append(attribution_df_i)
×
322

323
    attributions = pd.concat(attribution_dfs)
×
324
    attributions.sort_values(
×
325
        by=["orbit_id", "mjd_utc", "distance"], inplace=True, ignore_index=True
326
    )
327

328
    time_end = time.time()
×
329
    logger.info(
×
330
        "Attributed {} observations to {} orbits.".format(
331
            attributions["obs_id"].nunique(), attributions["orbit_id"].nunique()
332
        )
333
    )
334
    logger.info(
×
335
        "Attribution completed in {:.3f} seconds.".format(time_end - time_start)
336
    )
337
    return attributions
×
338

339

340
def mergeAndExtendOrbits(
1✔
341
    orbits,
342
    orbit_members,
343
    observations,
344
    min_obs=6,
345
    min_arc_length=1.0,
346
    contamination_percentage=20.0,
347
    rchi2_threshold=5,
348
    eps=1 / 3600,
349
    delta=1e-8,
350
    max_iter=20,
351
    method="central",
352
    fit_epoch=False,
353
    backend="PYOORB",
354
    backend_kwargs={},
355
    orbits_chunk_size=10,
356
    observations_chunk_size=100000,
357
    num_jobs=60,
358
    parallel_backend="cf",
359
):
360
    """
361
    Attempt to extend an orbit's observational arc by running
362
    attribution on the observations. This is an iterative process: attribution
363
    is run, any observations found for each orbit are added to that orbit and differential correction is
364
    run. Orbits which are subset's of other orbits are removed. Iteration continues until there are no
365
    duplicate observation assignments.
366

367
    Parameters
368
    ----------
369
    orbit_chunk_size : int, optional
370
        Number of orbits to send to each job.
371
    observations_chunk_size : int, optional
372
        Number of observations to process per batch.
373
    num_jobs : int, optional
374
        Number of jobs to launch.
375
    parallel_backend : str, optional
376
        Which parallelization backend to use {'ray', 'mp', cf}. Defaults to using Python's concurrent.futures
377
        module ('cf').
378
    """
379
    time_start = time.time()
×
380
    logger.info("Running orbit extension and merging...")
×
381

382
    if len(observations) > 0:
×
383
        orbits_iter, orbit_members_iter = sortLinkages(
×
384
            orbits, orbit_members, observations, linkage_id_col="orbit_id"
385
        )
386

387
    else:
388
        orbits_iter = orbits.copy()
×
389
        orbit_members_iter = orbit_members.copy()
×
390

391
    iterations = 0
×
392
    odp_orbits_dfs = []
×
393
    odp_orbit_members_dfs = []
×
394
    observations_iter = observations.copy()
×
395
    if len(orbits_iter) > 0 and len(observations_iter) > 0:
×
396
        converged = False
×
397

398
        while not converged:
×
399
            # Run attribution
400
            attributions = attributeObservations(
×
401
                Orbits.from_df(orbits_iter),
402
                observations_iter,
403
                eps=eps,
404
                include_probabilistic=True,
405
                backend=backend,
406
                backend_kwargs=backend_kwargs,
407
                orbits_chunk_size=orbits_chunk_size,
408
                observations_chunk_size=observations_chunk_size,
409
                num_jobs=num_jobs,
410
                parallel_backend=parallel_backend,
411
            )
412

413
            assert np.all(
×
414
                np.isin(
415
                    orbit_members_iter["obs_id"].unique(),
416
                    observations_iter["obs_id"].unique(),
417
                )
418
            )
419

420
            # Attributions are sorted by orbit ID, observation time and
421
            # angular distance. Keep only the one observation with smallest distance
422
            # for any orbits that have multiple observations attributed at the same observation time.
423
            attributions.drop_duplicates(
×
424
                subset=["orbit_id", "mjd_utc"],
425
                keep="first",
426
                inplace=True,
427
                ignore_index=True,
428
            )
429
            orbit_members_iter = attributions[
×
430
                [
431
                    "orbit_id",
432
                    "obs_id",
433
                    "residual_ra_arcsec",
434
                    "residual_dec_arcsec",
435
                    "chi2",
436
                ]
437
            ]
438
            orbits_iter = orbits_iter[
×
439
                orbits_iter["orbit_id"].isin(orbit_members_iter["orbit_id"].unique())
440
            ]
441

442
            orbits_iter, orbit_members_iter = sortLinkages(
×
443
                orbits_iter,
444
                orbit_members_iter[["orbit_id", "obs_id"]],
445
                observations_iter,
446
            )
447

448
            # Run differential orbit correction on all orbits
449
            # with the newly added observations to the orbits
450
            # that had observations attributed to them
451
            orbits_iter, orbit_members_iter = differentialCorrection(
×
452
                orbits_iter,
453
                orbit_members_iter,
454
                observations_iter,
455
                rchi2_threshold=rchi2_threshold,
456
                min_obs=min_obs,
457
                min_arc_length=min_arc_length,
458
                contamination_percentage=contamination_percentage,
459
                delta=delta,
460
                method=method,
461
                max_iter=max_iter,
462
                fit_epoch=False,
463
                backend=backend,
464
                backend_kwargs=backend_kwargs,
465
                chunk_size=orbits_chunk_size,
466
                num_jobs=num_jobs,
467
                parallel_backend=parallel_backend,
468
            )
469
            orbit_members_iter = orbit_members_iter[orbit_members_iter["outlier"] == 0]
×
470
            orbit_members_iter.reset_index(inplace=True, drop=True)
×
471

472
            # Remove the orbits that were not improved from the pool of available orbits. Orbits that were not improved
473
            # are orbits that have already iterated to their best-fit solution given the observations available. These orbits
474
            # are unlikely to recover more observations in subsequent iterations and so can be saved for output.
475
            not_improved = orbits_iter[orbits_iter["improved"] == False][
×
476
                "orbit_id"
477
            ].values
478
            orbits_out = orbits_iter[orbits_iter["orbit_id"].isin(not_improved)].copy()
×
479
            orbit_members_out = orbit_members_iter[
×
480
                orbit_members_iter["orbit_id"].isin(not_improved)
481
            ].copy()
482
            not_improved_obs_ids = orbit_members_out["obs_id"].values
×
483

484
            # If some orbits that were not improved still share observations, keep the orbit with the lowest
485
            # reduced chi2 in the pool of orbits but delete the others.
486
            obs_id_occurences = orbit_members_out["obs_id"].value_counts()
×
487
            duplicate_obs_ids = obs_id_occurences.index.values[
×
488
                obs_id_occurences.values > 1
489
            ]
490

491
            logger.info(
×
492
                "There are {} observations that appear in more than one orbit.".format(
493
                    len(duplicate_obs_ids)
494
                )
495
            )
496
            orbits_out, orbit_members_out = removeDuplicateObservations(
×
497
                orbits_out,
498
                orbit_members_out,
499
                min_obs=min_obs,
500
                linkage_id_col="orbit_id",
501
                filter_cols=["num_obs", "arc_length", "r_sigma", "v_sigma"],
502
                ascending=[False, False, True, True],
503
            )
504

505
            observations_iter = observations_iter[
×
506
                ~observations_iter["obs_id"].isin(orbit_members_out["obs_id"].values)
507
            ]
508
            orbit_members_iter = orbit_members_iter[
×
509
                ~orbit_members_iter["orbit_id"].isin(orbits_out["orbit_id"].values)
510
            ]
511
            orbit_members_iter = orbit_members_iter[
×
512
                orbit_members_iter["obs_id"].isin(observations_iter["obs_id"].values)
513
            ]
514
            orbits_iter = orbits_iter[
×
515
                orbits_iter["orbit_id"].isin(orbit_members_iter["orbit_id"].unique())
516
            ]
517
            orbit_members_iter = orbit_members_iter[["orbit_id", "obs_id"]]
×
518

519
            odp_orbits_dfs.append(orbits_out)
×
520
            odp_orbit_members_dfs.append(orbit_members_out)
×
521

522
            iterations += 1
×
523
            if len(orbits_iter) == 0:
×
524
                converged = True
×
525

526
        odp_orbits = pd.concat(odp_orbits_dfs)
×
527
        odp_orbit_members = pd.concat(odp_orbit_members_dfs)
×
528

529
        odp_orbits.drop(columns=["improved"], inplace=True)
×
530
        odp_orbits, odp_orbit_members = sortLinkages(
×
531
            odp_orbits, odp_orbit_members, observations, linkage_id_col="orbit_id"
532
        )
533

534
    else:
535
        odp_orbits = pd.DataFrame(
×
536
            columns=[
537
                "orbit_id",
538
                "mjd_tdb",
539
                "x",
540
                "y",
541
                "z",
542
                "vx",
543
                "vy",
544
                "vz",
545
                "covariance",
546
                "arc_length",
547
                "num_obs",
548
                "chi2",
549
                "rchi2",
550
            ]
551
        )
552

553
        odp_orbit_members = pd.DataFrame(
×
554
            columns=[
555
                "orbit_id",
556
                "obs_id",
557
                "residual_ra_arcsec",
558
                "residual_dec_arcsec",
559
                "chi2",
560
                "outlier",
561
            ]
562
        )
563

564
    time_end = time.time()
×
565
    logger.info(
×
566
        "Number of attribution / differential correction iterations: {}".format(
567
            iterations
568
        )
569
    )
570
    logger.info(
×
571
        "Extended and/or merged {} orbits into {} orbits.".format(
572
            len(orbits), len(odp_orbits)
573
        )
574
    )
575
    logger.info(
×
576
        "Orbit extension and merging completed in {:.3f} seconds.".format(
577
            time_end - time_start
578
        )
579
    )
580

581
    return odp_orbits, odp_orbit_members
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc